nautilus_binance/common/sbe/stream/
mod.rs1use std::fmt::Display;
28
29use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
30
31mod best_bid_ask;
32mod depth_diff;
33mod depth_snapshot;
34mod trades;
35
36pub use best_bid_ask::BestBidAskStreamEvent;
37pub use depth_diff::DepthDiffStreamEvent;
38pub use depth_snapshot::DepthSnapshotStreamEvent;
39pub use trades::{Trade, TradesStreamEvent};
40
41pub const STREAM_SCHEMA_ID: u16 = 1;
43
44pub const STREAM_SCHEMA_VERSION: u16 = 0;
46
47pub const MAX_GROUP_SIZE: usize = 10_000;
50
51pub mod template_id {
53 pub const TRADES_STREAM_EVENT: u16 = 10000;
54 pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
55 pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
56 pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum StreamDecodeError {
62 BufferTooShort { expected: usize, actual: usize },
64 GroupSizeTooLarge { count: usize, max: usize },
66 InvalidUtf8,
68 SchemaMismatch { expected: u16, actual: u16 },
70 UnknownTemplateId(u16),
72}
73
74impl Display for StreamDecodeError {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 Self::BufferTooShort { expected, actual } => {
78 write!(
79 f,
80 "Buffer too short: expected {expected} bytes, got {actual}"
81 )
82 }
83 Self::GroupSizeTooLarge { count, max } => {
84 write!(f, "Group size {count} exceeds maximum {max}")
85 }
86 Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
87 Self::SchemaMismatch { expected, actual } => {
88 write!(f, "Schema mismatch: expected {expected}, got {actual}")
89 }
90 Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
91 }
92 }
93}
94
95impl std::error::Error for StreamDecodeError {}
96
97impl From<SbeDecodeError> for StreamDecodeError {
98 fn from(err: SbeDecodeError) -> Self {
99 match err {
100 SbeDecodeError::BufferTooShort { expected, actual } => {
101 Self::BufferTooShort { expected, actual }
102 }
103 SbeDecodeError::SchemaMismatch { expected, actual } => {
104 Self::SchemaMismatch { expected, actual }
105 }
106 SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
107 expected: STREAM_SCHEMA_VERSION,
108 actual: 0,
109 },
110 SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
111 SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
112 count: count as usize,
113 max: max as usize,
114 },
115 SbeDecodeError::InvalidBlockLength { .. } => Self::BufferTooShort {
116 expected: 0,
117 actual: 0,
118 },
119 SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
120 }
121 }
122}
123
124#[derive(Debug, Clone, Copy)]
126pub struct MessageHeader {
127 pub block_length: u16,
128 pub template_id: u16,
129 pub schema_id: u16,
130 pub version: u16,
131}
132
133impl MessageHeader {
134 pub const ENCODED_LENGTH: usize = 8;
135
136 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
142 if buf.len() < Self::ENCODED_LENGTH {
143 return Err(StreamDecodeError::BufferTooShort {
144 expected: Self::ENCODED_LENGTH,
145 actual: buf.len(),
146 });
147 }
148 Ok(Self {
149 block_length: u16::from_le_bytes([buf[0], buf[1]]),
150 template_id: u16::from_le_bytes([buf[2], buf[3]]),
151 schema_id: u16::from_le_bytes([buf[4], buf[5]]),
152 version: u16::from_le_bytes([buf[6], buf[7]]),
153 })
154 }
155
156 pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
158 if self.schema_id != STREAM_SCHEMA_ID {
159 return Err(StreamDecodeError::SchemaMismatch {
160 expected: STREAM_SCHEMA_ID,
161 actual: self.schema_id,
162 });
163 }
164 Ok(())
165 }
166}
167
168#[derive(Debug, Clone, Copy)]
170pub struct PriceLevel {
171 pub price_mantissa: i64,
173 pub qty_mantissa: i64,
175}
176
177impl PriceLevel {
178 pub const ENCODED_LENGTH: usize = 16;
179
180 pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
186 Ok(Self {
187 price_mantissa: cursor.read_i64_le()?,
188 qty_mantissa: cursor.read_i64_le()?,
189 })
190 }
191}
192
193#[inline]
195#[must_use]
196pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
197 mantissa as f64 * 10_f64.powi(exponent as i32)
198}
199
200pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
208 if buf.is_empty() {
209 return Err(StreamDecodeError::BufferTooShort {
210 expected: 1,
211 actual: 0,
212 });
213 }
214
215 let len = buf[0] as usize;
216 let total_len = 1 + len;
217
218 if buf.len() < total_len {
219 return Err(StreamDecodeError::BufferTooShort {
220 expected: total_len,
221 actual: buf.len(),
222 });
223 }
224
225 let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
226
227 Ok((s.to_string(), total_len))
228}
229
230#[derive(Debug, Clone, Copy)]
232pub struct GroupSizeEncoding {
233 pub block_length: u16,
234 pub num_in_group: u32,
235}
236
237impl GroupSizeEncoding {
238 pub const ENCODED_LENGTH: usize = 6;
239
240 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
246 if buf.len() < Self::ENCODED_LENGTH {
247 return Err(StreamDecodeError::BufferTooShort {
248 expected: Self::ENCODED_LENGTH,
249 actual: buf.len(),
250 });
251 }
252
253 let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
254
255 if num_in_group as usize > MAX_GROUP_SIZE {
256 return Err(StreamDecodeError::GroupSizeTooLarge {
257 count: num_in_group as usize,
258 max: MAX_GROUP_SIZE,
259 });
260 }
261
262 Ok(Self {
263 block_length: u16::from_le_bytes([buf[0], buf[1]]),
264 num_in_group,
265 })
266 }
267}
268
269#[derive(Debug, Clone, Copy)]
271pub struct GroupSize16Encoding {
272 pub block_length: u16,
273 pub num_in_group: u16,
274}
275
276impl GroupSize16Encoding {
277 pub const ENCODED_LENGTH: usize = 4;
278
279 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
285 if buf.len() < Self::ENCODED_LENGTH {
286 return Err(StreamDecodeError::BufferTooShort {
287 expected: Self::ENCODED_LENGTH,
288 actual: buf.len(),
289 });
290 }
291
292 let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
293
294 if num_in_group as usize > MAX_GROUP_SIZE {
295 return Err(StreamDecodeError::GroupSizeTooLarge {
296 count: num_in_group as usize,
297 max: MAX_GROUP_SIZE,
298 });
299 }
300
301 Ok(Self {
302 block_length: u16::from_le_bytes([buf[0], buf[1]]),
303 num_in_group,
304 })
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use rstest::rstest;
311
312 use super::*;
313
314 #[rstest]
315 fn test_mantissa_to_f64() {
316 assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
317 assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
318 assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
319 }
320
321 #[rstest]
322 fn test_message_header_too_short() {
323 let buf = [0u8; 7];
324 let err = MessageHeader::decode(&buf).unwrap_err();
325 assert_eq!(
326 err,
327 StreamDecodeError::BufferTooShort {
328 expected: 8,
329 actual: 7
330 }
331 );
332 }
333
334 #[rstest]
335 fn test_group_size_too_large() {
336 let mut buf = [0u8; 6];
338 let count = (MAX_GROUP_SIZE + 1) as u32;
339 buf[2..6].copy_from_slice(&count.to_le_bytes());
340
341 let err = GroupSizeEncoding::decode(&buf).unwrap_err();
342 assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
343 }
344
345 #[rstest]
346 fn test_decode_var_string8_empty_buffer() {
347 let err = decode_var_string8(&[]).unwrap_err();
348 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
349 }
350
351 #[rstest]
352 fn test_decode_var_string8_truncated() {
353 let buf = [10u8, b'H', b'E', b'L', b'L'];
355 let err = decode_var_string8(&buf).unwrap_err();
356 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
357 }
358
359 #[rstest]
360 fn test_decode_var_string8_valid() {
361 let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
362 let (s, consumed) = decode_var_string8(&buf).unwrap();
363 assert_eq!(s, "HELLO");
364 assert_eq!(consumed, 6);
365 }
366
367 #[rstest]
368 fn test_schema_validation() {
369 let header = MessageHeader {
370 block_length: 50,
371 template_id: 10001,
372 schema_id: 99, version: 0,
374 };
375 let err = header.validate_schema().unwrap_err();
376 assert_eq!(
377 err,
378 StreamDecodeError::SchemaMismatch {
379 expected: STREAM_SCHEMA_ID,
380 actual: 99
381 }
382 );
383 }
384}