nautilus_binance/common/sbe/stream/
mod.rs1#![allow(clippy::missing_errors_doc)]
18
19use std::fmt::Display;
31
32use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
33
34mod best_bid_ask;
35mod depth_diff;
36mod depth_snapshot;
37mod trades;
38
39pub use best_bid_ask::BestBidAskStreamEvent;
40pub use depth_diff::DepthDiffStreamEvent;
41pub use depth_snapshot::DepthSnapshotStreamEvent;
42pub use trades::{Trade, TradesStreamEvent};
43
44pub const STREAM_SCHEMA_ID: u16 = 1;
46
47pub const STREAM_SCHEMA_VERSION: u16 = 0;
49
50pub const MAX_GROUP_SIZE: usize = 10_000;
53
54pub mod template_id {
56 pub const TRADES_STREAM_EVENT: u16 = 10000;
57 pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
58 pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
59 pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum StreamDecodeError {
65 BufferTooShort { expected: usize, actual: usize },
67 GroupSizeTooLarge { count: usize, max: usize },
69 InvalidUtf8,
71 SchemaMismatch { expected: u16, actual: u16 },
73 UnknownTemplateId(u16),
75}
76
77impl Display for StreamDecodeError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 Self::BufferTooShort { expected, actual } => {
81 write!(
82 f,
83 "Buffer too short: expected {expected} bytes, was {actual}"
84 )
85 }
86 Self::GroupSizeTooLarge { count, max } => {
87 write!(f, "Group size {count} exceeds maximum {max}")
88 }
89 Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
90 Self::SchemaMismatch { expected, actual } => {
91 write!(f, "Schema mismatch: expected {expected}, was {actual}")
92 }
93 Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
94 }
95 }
96}
97
98impl std::error::Error for StreamDecodeError {}
99
100impl From<SbeDecodeError> for StreamDecodeError {
101 fn from(err: SbeDecodeError) -> Self {
102 match err {
103 SbeDecodeError::BufferTooShort { expected, actual } => {
104 Self::BufferTooShort { expected, actual }
105 }
106 SbeDecodeError::SchemaMismatch { expected, actual } => {
107 Self::SchemaMismatch { expected, actual }
108 }
109 SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
110 expected: STREAM_SCHEMA_VERSION,
111 actual: 0,
112 },
113 SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
114 SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
115 count: count as usize,
116 max: max as usize,
117 },
118 SbeDecodeError::InvalidBlockLength { .. } => Self::BufferTooShort {
119 expected: 0,
120 actual: 0,
121 },
122 SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
123 }
124 }
125}
126
127#[derive(Debug, Clone, Copy)]
129pub struct MessageHeader {
130 pub block_length: u16,
131 pub template_id: u16,
132 pub schema_id: u16,
133 pub version: u16,
134}
135
136impl MessageHeader {
137 pub const ENCODED_LENGTH: usize = 8;
138
139 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
145 if buf.len() < Self::ENCODED_LENGTH {
146 return Err(StreamDecodeError::BufferTooShort {
147 expected: Self::ENCODED_LENGTH,
148 actual: buf.len(),
149 });
150 }
151 Ok(Self {
152 block_length: u16::from_le_bytes([buf[0], buf[1]]),
153 template_id: u16::from_le_bytes([buf[2], buf[3]]),
154 schema_id: u16::from_le_bytes([buf[4], buf[5]]),
155 version: u16::from_le_bytes([buf[6], buf[7]]),
156 })
157 }
158
159 pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
161 if self.schema_id != STREAM_SCHEMA_ID {
162 return Err(StreamDecodeError::SchemaMismatch {
163 expected: STREAM_SCHEMA_ID,
164 actual: self.schema_id,
165 });
166 }
167 Ok(())
168 }
169}
170
171#[derive(Debug, Clone, Copy)]
173pub struct PriceLevel {
174 pub price_mantissa: i64,
176 pub qty_mantissa: i64,
178}
179
180impl PriceLevel {
181 pub const ENCODED_LENGTH: usize = 16;
182
183 pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
189 Ok(Self {
190 price_mantissa: cursor.read_i64_le()?,
191 qty_mantissa: cursor.read_i64_le()?,
192 })
193 }
194}
195
196#[inline]
198#[must_use]
199pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
200 mantissa as f64 * 10_f64.powi(exponent as i32)
201}
202
203pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
211 if buf.is_empty() {
212 return Err(StreamDecodeError::BufferTooShort {
213 expected: 1,
214 actual: 0,
215 });
216 }
217
218 let len = buf[0] as usize;
219 let total_len = 1 + len;
220
221 if buf.len() < total_len {
222 return Err(StreamDecodeError::BufferTooShort {
223 expected: total_len,
224 actual: buf.len(),
225 });
226 }
227
228 let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
229
230 Ok((s.to_string(), total_len))
231}
232
233#[derive(Debug, Clone, Copy)]
235pub struct GroupSizeEncoding {
236 pub block_length: u16,
237 pub num_in_group: u32,
238}
239
240impl GroupSizeEncoding {
241 pub const ENCODED_LENGTH: usize = 6;
242
243 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
249 if buf.len() < Self::ENCODED_LENGTH {
250 return Err(StreamDecodeError::BufferTooShort {
251 expected: Self::ENCODED_LENGTH,
252 actual: buf.len(),
253 });
254 }
255
256 let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
257
258 if num_in_group as usize > MAX_GROUP_SIZE {
259 return Err(StreamDecodeError::GroupSizeTooLarge {
260 count: num_in_group as usize,
261 max: MAX_GROUP_SIZE,
262 });
263 }
264
265 Ok(Self {
266 block_length: u16::from_le_bytes([buf[0], buf[1]]),
267 num_in_group,
268 })
269 }
270}
271
272#[derive(Debug, Clone, Copy)]
274pub struct GroupSize16Encoding {
275 pub block_length: u16,
276 pub num_in_group: u16,
277}
278
279impl GroupSize16Encoding {
280 pub const ENCODED_LENGTH: usize = 4;
281
282 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
288 if buf.len() < Self::ENCODED_LENGTH {
289 return Err(StreamDecodeError::BufferTooShort {
290 expected: Self::ENCODED_LENGTH,
291 actual: buf.len(),
292 });
293 }
294
295 let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
296
297 if num_in_group as usize > MAX_GROUP_SIZE {
298 return Err(StreamDecodeError::GroupSizeTooLarge {
299 count: num_in_group as usize,
300 max: MAX_GROUP_SIZE,
301 });
302 }
303
304 Ok(Self {
305 block_length: u16::from_le_bytes([buf[0], buf[1]]),
306 num_in_group,
307 })
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use rstest::rstest;
314
315 use super::*;
316
317 #[rstest]
318 fn test_mantissa_to_f64() {
319 assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
320 assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
321 assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
322 }
323
324 #[rstest]
325 fn test_message_header_too_short() {
326 let buf = [0u8; 7];
327 let err = MessageHeader::decode(&buf).unwrap_err();
328 assert_eq!(
329 err,
330 StreamDecodeError::BufferTooShort {
331 expected: 8,
332 actual: 7
333 }
334 );
335 }
336
337 #[rstest]
338 fn test_group_size_too_large() {
339 let mut buf = [0u8; 6];
341 let count = (MAX_GROUP_SIZE + 1) as u32;
342 buf[2..6].copy_from_slice(&count.to_le_bytes());
343
344 let err = GroupSizeEncoding::decode(&buf).unwrap_err();
345 assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
346 }
347
348 #[rstest]
349 fn test_decode_var_string8_empty_buffer() {
350 let err = decode_var_string8(&[]).unwrap_err();
351 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
352 }
353
354 #[rstest]
355 fn test_decode_var_string8_truncated() {
356 let buf = [10u8, b'H', b'E', b'L', b'L'];
358 let err = decode_var_string8(&buf).unwrap_err();
359 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
360 }
361
362 #[rstest]
363 fn test_decode_var_string8_valid() {
364 let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
365 let (s, consumed) = decode_var_string8(&buf).unwrap();
366 assert_eq!(s, "HELLO");
367 assert_eq!(consumed, 6);
368 }
369
370 #[rstest]
371 fn test_schema_validation() {
372 let header = MessageHeader {
373 block_length: 50,
374 template_id: 10001,
375 schema_id: 99, version: 0,
377 };
378 let err = header.validate_schema().unwrap_err();
379 assert_eq!(
380 err,
381 StreamDecodeError::SchemaMismatch {
382 expected: STREAM_SCHEMA_ID,
383 actual: 99
384 }
385 );
386 }
387}