nautilus_binance/common/sbe/stream/
mod.rs1mod best_bid_ask;
28mod depth_diff;
29mod depth_snapshot;
30mod trades;
31
32pub use best_bid_ask::BestBidAskStreamEvent;
33pub use depth_diff::DepthDiffStreamEvent;
34pub use depth_snapshot::DepthSnapshotStreamEvent;
35pub use trades::{Trade, TradesStreamEvent};
36
37pub const STREAM_SCHEMA_ID: u16 = 1;
39
40pub const STREAM_SCHEMA_VERSION: u16 = 0;
42
43pub const MAX_GROUP_SIZE: usize = 10_000;
46
47pub mod template_id {
49 pub const TRADES_STREAM_EVENT: u16 = 10000;
50 pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
51 pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
52 pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum StreamDecodeError {
58 BufferTooShort { expected: usize, actual: usize },
60 GroupSizeTooLarge { count: usize, max: usize },
62 InvalidUtf8,
64 SchemaMismatch { expected: u16, actual: u16 },
66 UnknownTemplateId(u16),
68}
69
70impl std::fmt::Display for StreamDecodeError {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 Self::BufferTooShort { expected, actual } => {
74 write!(
75 f,
76 "Buffer too short: expected {expected} bytes, got {actual}"
77 )
78 }
79 Self::GroupSizeTooLarge { count, max } => {
80 write!(f, "Group size {count} exceeds maximum {max}")
81 }
82 Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
83 Self::SchemaMismatch { expected, actual } => {
84 write!(f, "Schema mismatch: expected {expected}, got {actual}")
85 }
86 Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
87 }
88 }
89}
90
91impl std::error::Error for StreamDecodeError {}
92
93#[derive(Debug, Clone, Copy)]
95pub struct MessageHeader {
96 pub block_length: u16,
97 pub template_id: u16,
98 pub schema_id: u16,
99 pub version: u16,
100}
101
102impl MessageHeader {
103 pub const ENCODED_LENGTH: usize = 8;
104
105 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
111 if buf.len() < Self::ENCODED_LENGTH {
112 return Err(StreamDecodeError::BufferTooShort {
113 expected: Self::ENCODED_LENGTH,
114 actual: buf.len(),
115 });
116 }
117 Ok(Self {
118 block_length: u16::from_le_bytes([buf[0], buf[1]]),
119 template_id: u16::from_le_bytes([buf[2], buf[3]]),
120 schema_id: u16::from_le_bytes([buf[4], buf[5]]),
121 version: u16::from_le_bytes([buf[6], buf[7]]),
122 })
123 }
124
125 pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
127 if self.schema_id != STREAM_SCHEMA_ID {
128 return Err(StreamDecodeError::SchemaMismatch {
129 expected: STREAM_SCHEMA_ID,
130 actual: self.schema_id,
131 });
132 }
133 Ok(())
134 }
135}
136
137#[derive(Debug, Clone, Copy)]
139pub struct PriceLevel {
140 pub price_mantissa: i64,
142 pub qty_mantissa: i64,
144}
145
146impl PriceLevel {
147 pub const ENCODED_LENGTH: usize = 16;
148
149 #[allow(clippy::missing_panics_doc)] pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
156 if buf.len() < Self::ENCODED_LENGTH {
157 return Err(StreamDecodeError::BufferTooShort {
158 expected: Self::ENCODED_LENGTH,
159 actual: buf.len(),
160 });
161 }
162
163 Ok(Self {
165 price_mantissa: i64::from_le_bytes(buf[0..8].try_into().unwrap()),
166 qty_mantissa: i64::from_le_bytes(buf[8..16].try_into().unwrap()),
167 })
168 }
169}
170
171#[inline]
173#[must_use]
174pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
175 mantissa as f64 * 10_f64.powi(exponent as i32)
176}
177
178pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
186 if buf.is_empty() {
187 return Err(StreamDecodeError::BufferTooShort {
188 expected: 1,
189 actual: 0,
190 });
191 }
192
193 let len = buf[0] as usize;
194 let total_len = 1 + len;
195
196 if buf.len() < total_len {
197 return Err(StreamDecodeError::BufferTooShort {
198 expected: total_len,
199 actual: buf.len(),
200 });
201 }
202
203 let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
204
205 Ok((s.to_string(), total_len))
206}
207
208#[derive(Debug, Clone, Copy)]
210pub struct GroupSizeEncoding {
211 pub block_length: u16,
212 pub num_in_group: u32,
213}
214
215impl GroupSizeEncoding {
216 pub const ENCODED_LENGTH: usize = 6;
217
218 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
224 if buf.len() < Self::ENCODED_LENGTH {
225 return Err(StreamDecodeError::BufferTooShort {
226 expected: Self::ENCODED_LENGTH,
227 actual: buf.len(),
228 });
229 }
230
231 let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
232
233 if num_in_group as usize > MAX_GROUP_SIZE {
234 return Err(StreamDecodeError::GroupSizeTooLarge {
235 count: num_in_group as usize,
236 max: MAX_GROUP_SIZE,
237 });
238 }
239
240 Ok(Self {
241 block_length: u16::from_le_bytes([buf[0], buf[1]]),
242 num_in_group,
243 })
244 }
245}
246
247#[derive(Debug, Clone, Copy)]
249pub struct GroupSize16Encoding {
250 pub block_length: u16,
251 pub num_in_group: u16,
252}
253
254impl GroupSize16Encoding {
255 pub const ENCODED_LENGTH: usize = 4;
256
257 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
263 if buf.len() < Self::ENCODED_LENGTH {
264 return Err(StreamDecodeError::BufferTooShort {
265 expected: Self::ENCODED_LENGTH,
266 actual: buf.len(),
267 });
268 }
269
270 let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
271
272 if num_in_group as usize > MAX_GROUP_SIZE {
273 return Err(StreamDecodeError::GroupSizeTooLarge {
274 count: num_in_group as usize,
275 max: MAX_GROUP_SIZE,
276 });
277 }
278
279 Ok(Self {
280 block_length: u16::from_le_bytes([buf[0], buf[1]]),
281 num_in_group,
282 })
283 }
284}
285
286#[inline]
288fn read_i64_le(buf: &[u8], offset: usize) -> Result<i64, StreamDecodeError> {
289 let end = offset + 8;
290 if buf.len() < end {
291 return Err(StreamDecodeError::BufferTooShort {
292 expected: end,
293 actual: buf.len(),
294 });
295 }
296 Ok(i64::from_le_bytes(buf[offset..end].try_into().unwrap()))
297}
298
299#[inline]
301fn read_i8(buf: &[u8], offset: usize) -> Result<i8, StreamDecodeError> {
302 if buf.len() <= offset {
303 return Err(StreamDecodeError::BufferTooShort {
304 expected: offset + 1,
305 actual: buf.len(),
306 });
307 }
308 Ok(buf[offset] as i8)
309}
310
311#[inline]
313fn read_u8(buf: &[u8], offset: usize) -> Result<u8, StreamDecodeError> {
314 if buf.len() <= offset {
315 return Err(StreamDecodeError::BufferTooShort {
316 expected: offset + 1,
317 actual: buf.len(),
318 });
319 }
320 Ok(buf[offset])
321}
322
323#[cfg(test)]
324mod tests {
325 use rstest::rstest;
326
327 use super::*;
328
329 #[rstest]
330 fn test_mantissa_to_f64() {
331 assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
332 assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
333 assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
334 }
335
336 #[rstest]
337 fn test_message_header_too_short() {
338 let buf = [0u8; 7];
339 let err = MessageHeader::decode(&buf).unwrap_err();
340 assert_eq!(
341 err,
342 StreamDecodeError::BufferTooShort {
343 expected: 8,
344 actual: 7
345 }
346 );
347 }
348
349 #[rstest]
350 fn test_group_size_too_large() {
351 let mut buf = [0u8; 6];
353 let count = (MAX_GROUP_SIZE + 1) as u32;
354 buf[2..6].copy_from_slice(&count.to_le_bytes());
355
356 let err = GroupSizeEncoding::decode(&buf).unwrap_err();
357 assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
358 }
359
360 #[rstest]
361 fn test_decode_var_string8_empty_buffer() {
362 let err = decode_var_string8(&[]).unwrap_err();
363 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
364 }
365
366 #[rstest]
367 fn test_decode_var_string8_truncated() {
368 let buf = [10u8, b'H', b'E', b'L', b'L'];
370 let err = decode_var_string8(&buf).unwrap_err();
371 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
372 }
373
374 #[rstest]
375 fn test_decode_var_string8_valid() {
376 let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
377 let (s, consumed) = decode_var_string8(&buf).unwrap();
378 assert_eq!(s, "HELLO");
379 assert_eq!(consumed, 6);
380 }
381
382 #[rstest]
383 fn test_schema_validation() {
384 let header = MessageHeader {
385 block_length: 50,
386 template_id: 10001,
387 schema_id: 99, version: 0,
389 };
390 let err = header.validate_schema().unwrap_err();
391 assert_eq!(
392 err,
393 StreamDecodeError::SchemaMismatch {
394 expected: STREAM_SCHEMA_ID,
395 actual: 99
396 }
397 );
398 }
399}