nautilus_binance/common/sbe/stream/
mod.rs1use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
28
29mod best_bid_ask;
30mod depth_diff;
31mod depth_snapshot;
32mod trades;
33
34pub use best_bid_ask::BestBidAskStreamEvent;
35pub use depth_diff::DepthDiffStreamEvent;
36pub use depth_snapshot::DepthSnapshotStreamEvent;
37pub use trades::{Trade, TradesStreamEvent};
38
39pub const STREAM_SCHEMA_ID: u16 = 1;
41
42pub const STREAM_SCHEMA_VERSION: u16 = 0;
44
45pub const MAX_GROUP_SIZE: usize = 10_000;
48
49pub mod template_id {
51 pub const TRADES_STREAM_EVENT: u16 = 10000;
52 pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
53 pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
54 pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum StreamDecodeError {
60 BufferTooShort { expected: usize, actual: usize },
62 GroupSizeTooLarge { count: usize, max: usize },
64 InvalidUtf8,
66 SchemaMismatch { expected: u16, actual: u16 },
68 UnknownTemplateId(u16),
70}
71
72impl std::fmt::Display for StreamDecodeError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 Self::BufferTooShort { expected, actual } => {
76 write!(
77 f,
78 "Buffer too short: expected {expected} bytes, got {actual}"
79 )
80 }
81 Self::GroupSizeTooLarge { count, max } => {
82 write!(f, "Group size {count} exceeds maximum {max}")
83 }
84 Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
85 Self::SchemaMismatch { expected, actual } => {
86 write!(f, "Schema mismatch: expected {expected}, got {actual}")
87 }
88 Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
89 }
90 }
91}
92
93impl std::error::Error for StreamDecodeError {}
94
95impl From<SbeDecodeError> for StreamDecodeError {
96 fn from(err: SbeDecodeError) -> Self {
97 match err {
98 SbeDecodeError::BufferTooShort { expected, actual } => {
99 Self::BufferTooShort { expected, actual }
100 }
101 SbeDecodeError::SchemaMismatch { expected, actual } => {
102 Self::SchemaMismatch { expected, actual }
103 }
104 SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
105 expected: STREAM_SCHEMA_VERSION,
106 actual: 0,
107 },
108 SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
109 SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
110 count: count as usize,
111 max: max as usize,
112 },
113 SbeDecodeError::InvalidBlockLength { .. } => Self::BufferTooShort {
114 expected: 0,
115 actual: 0,
116 },
117 SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
118 }
119 }
120}
121
122#[derive(Debug, Clone, Copy)]
124pub struct MessageHeader {
125 pub block_length: u16,
126 pub template_id: u16,
127 pub schema_id: u16,
128 pub version: u16,
129}
130
131impl MessageHeader {
132 pub const ENCODED_LENGTH: usize = 8;
133
134 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
140 if buf.len() < Self::ENCODED_LENGTH {
141 return Err(StreamDecodeError::BufferTooShort {
142 expected: Self::ENCODED_LENGTH,
143 actual: buf.len(),
144 });
145 }
146 Ok(Self {
147 block_length: u16::from_le_bytes([buf[0], buf[1]]),
148 template_id: u16::from_le_bytes([buf[2], buf[3]]),
149 schema_id: u16::from_le_bytes([buf[4], buf[5]]),
150 version: u16::from_le_bytes([buf[6], buf[7]]),
151 })
152 }
153
154 pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
156 if self.schema_id != STREAM_SCHEMA_ID {
157 return Err(StreamDecodeError::SchemaMismatch {
158 expected: STREAM_SCHEMA_ID,
159 actual: self.schema_id,
160 });
161 }
162 Ok(())
163 }
164}
165
166#[derive(Debug, Clone, Copy)]
168pub struct PriceLevel {
169 pub price_mantissa: i64,
171 pub qty_mantissa: i64,
173}
174
175impl PriceLevel {
176 pub const ENCODED_LENGTH: usize = 16;
177
178 pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
184 Ok(Self {
185 price_mantissa: cursor.read_i64_le()?,
186 qty_mantissa: cursor.read_i64_le()?,
187 })
188 }
189}
190
191#[inline]
193#[must_use]
194pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
195 mantissa as f64 * 10_f64.powi(exponent as i32)
196}
197
198pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
206 if buf.is_empty() {
207 return Err(StreamDecodeError::BufferTooShort {
208 expected: 1,
209 actual: 0,
210 });
211 }
212
213 let len = buf[0] as usize;
214 let total_len = 1 + len;
215
216 if buf.len() < total_len {
217 return Err(StreamDecodeError::BufferTooShort {
218 expected: total_len,
219 actual: buf.len(),
220 });
221 }
222
223 let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
224
225 Ok((s.to_string(), total_len))
226}
227
228#[derive(Debug, Clone, Copy)]
230pub struct GroupSizeEncoding {
231 pub block_length: u16,
232 pub num_in_group: u32,
233}
234
235impl GroupSizeEncoding {
236 pub const ENCODED_LENGTH: usize = 6;
237
238 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
244 if buf.len() < Self::ENCODED_LENGTH {
245 return Err(StreamDecodeError::BufferTooShort {
246 expected: Self::ENCODED_LENGTH,
247 actual: buf.len(),
248 });
249 }
250
251 let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
252
253 if num_in_group as usize > MAX_GROUP_SIZE {
254 return Err(StreamDecodeError::GroupSizeTooLarge {
255 count: num_in_group as usize,
256 max: MAX_GROUP_SIZE,
257 });
258 }
259
260 Ok(Self {
261 block_length: u16::from_le_bytes([buf[0], buf[1]]),
262 num_in_group,
263 })
264 }
265}
266
267#[derive(Debug, Clone, Copy)]
269pub struct GroupSize16Encoding {
270 pub block_length: u16,
271 pub num_in_group: u16,
272}
273
274impl GroupSize16Encoding {
275 pub const ENCODED_LENGTH: usize = 4;
276
277 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
283 if buf.len() < Self::ENCODED_LENGTH {
284 return Err(StreamDecodeError::BufferTooShort {
285 expected: Self::ENCODED_LENGTH,
286 actual: buf.len(),
287 });
288 }
289
290 let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
291
292 if num_in_group as usize > MAX_GROUP_SIZE {
293 return Err(StreamDecodeError::GroupSizeTooLarge {
294 count: num_in_group as usize,
295 max: MAX_GROUP_SIZE,
296 });
297 }
298
299 Ok(Self {
300 block_length: u16::from_le_bytes([buf[0], buf[1]]),
301 num_in_group,
302 })
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use rstest::rstest;
309
310 use super::*;
311
312 #[rstest]
313 fn test_mantissa_to_f64() {
314 assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
315 assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
316 assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
317 }
318
319 #[rstest]
320 fn test_message_header_too_short() {
321 let buf = [0u8; 7];
322 let err = MessageHeader::decode(&buf).unwrap_err();
323 assert_eq!(
324 err,
325 StreamDecodeError::BufferTooShort {
326 expected: 8,
327 actual: 7
328 }
329 );
330 }
331
332 #[rstest]
333 fn test_group_size_too_large() {
334 let mut buf = [0u8; 6];
336 let count = (MAX_GROUP_SIZE + 1) as u32;
337 buf[2..6].copy_from_slice(&count.to_le_bytes());
338
339 let err = GroupSizeEncoding::decode(&buf).unwrap_err();
340 assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
341 }
342
343 #[rstest]
344 fn test_decode_var_string8_empty_buffer() {
345 let err = decode_var_string8(&[]).unwrap_err();
346 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
347 }
348
349 #[rstest]
350 fn test_decode_var_string8_truncated() {
351 let buf = [10u8, b'H', b'E', b'L', b'L'];
353 let err = decode_var_string8(&buf).unwrap_err();
354 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
355 }
356
357 #[rstest]
358 fn test_decode_var_string8_valid() {
359 let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
360 let (s, consumed) = decode_var_string8(&buf).unwrap();
361 assert_eq!(s, "HELLO");
362 assert_eq!(consumed, 6);
363 }
364
365 #[rstest]
366 fn test_schema_validation() {
367 let header = MessageHeader {
368 block_length: 50,
369 template_id: 10001,
370 schema_id: 99, version: 0,
372 };
373 let err = header.validate_schema().unwrap_err();
374 assert_eq!(
375 err,
376 StreamDecodeError::SchemaMismatch {
377 expected: STREAM_SCHEMA_ID,
378 actual: 99
379 }
380 );
381 }
382}