nautilus_binance/common/sbe/stream/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance SBE market data stream decoders (schema 1:0).
17//!
18//! These decoders are hand-written for the 4 market data stream message types:
19//! - [`TradesStreamEvent`] - Real-time trade data
20//! - [`BestBidAskStreamEvent`] - Best bid/ask (BBO) updates
21//! - [`DepthSnapshotStreamEvent`] - Order book snapshots (top N levels)
22//! - [`DepthDiffStreamEvent`] - Incremental order book updates
23//!
24//! All decoders return `Result<T, StreamDecodeError>` to safely handle malformed
25//! or truncated network data without panicking.
26
27mod best_bid_ask;
28mod depth_diff;
29mod depth_snapshot;
30mod trades;
31
32pub use best_bid_ask::BestBidAskStreamEvent;
33pub use depth_diff::DepthDiffStreamEvent;
34pub use depth_snapshot::DepthSnapshotStreamEvent;
35pub use trades::{Trade, TradesStreamEvent};
36
37/// Stream schema ID.
38pub const STREAM_SCHEMA_ID: u16 = 1;
39
40/// Stream schema version.
41pub const STREAM_SCHEMA_VERSION: u16 = 0;
42
43/// Maximum allowed group size to prevent OOM from malicious payloads.
44/// Binance depth streams typically have at most 5000 levels.
45pub const MAX_GROUP_SIZE: usize = 10_000;
46
47/// Message template IDs for stream events.
48pub mod template_id {
49    pub const TRADES_STREAM_EVENT: u16 = 10000;
50    pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
51    pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
52    pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
53}
54
55/// Stream decode error.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum StreamDecodeError {
58    /// Buffer too short to decode expected data.
59    BufferTooShort { expected: usize, actual: usize },
60    /// Group count exceeds safety limit.
61    GroupSizeTooLarge { count: usize, max: usize },
62    /// Invalid UTF-8 in symbol string.
63    InvalidUtf8,
64    /// Schema ID mismatch.
65    SchemaMismatch { expected: u16, actual: u16 },
66    /// Unknown template ID.
67    UnknownTemplateId(u16),
68}
69
70impl std::fmt::Display for StreamDecodeError {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Self::BufferTooShort { expected, actual } => {
74                write!(
75                    f,
76                    "Buffer too short: expected {expected} bytes, got {actual}"
77                )
78            }
79            Self::GroupSizeTooLarge { count, max } => {
80                write!(f, "Group size {count} exceeds maximum {max}")
81            }
82            Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
83            Self::SchemaMismatch { expected, actual } => {
84                write!(f, "Schema mismatch: expected {expected}, got {actual}")
85            }
86            Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
87        }
88    }
89}
90
91impl std::error::Error for StreamDecodeError {}
92
93/// SBE message header (8 bytes).
94#[derive(Debug, Clone, Copy)]
95pub struct MessageHeader {
96    pub block_length: u16,
97    pub template_id: u16,
98    pub schema_id: u16,
99    pub version: u16,
100}
101
102impl MessageHeader {
103    pub const ENCODED_LENGTH: usize = 8;
104
105    /// Decode message header from buffer.
106    ///
107    /// # Errors
108    ///
109    /// Returns error if buffer is less than 8 bytes.
110    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
111        if buf.len() < Self::ENCODED_LENGTH {
112            return Err(StreamDecodeError::BufferTooShort {
113                expected: Self::ENCODED_LENGTH,
114                actual: buf.len(),
115            });
116        }
117        Ok(Self {
118            block_length: u16::from_le_bytes([buf[0], buf[1]]),
119            template_id: u16::from_le_bytes([buf[2], buf[3]]),
120            schema_id: u16::from_le_bytes([buf[4], buf[5]]),
121            version: u16::from_le_bytes([buf[6], buf[7]]),
122        })
123    }
124
125    /// Validate schema ID matches expected stream schema.
126    pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
127        if self.schema_id != STREAM_SCHEMA_ID {
128            return Err(StreamDecodeError::SchemaMismatch {
129                expected: STREAM_SCHEMA_ID,
130                actual: self.schema_id,
131            });
132        }
133        Ok(())
134    }
135}
136
137/// Price/quantity level in order book.
138#[derive(Debug, Clone, Copy)]
139pub struct PriceLevel {
140    /// Price mantissa (multiply by 10^exponent to get actual price).
141    pub price_mantissa: i64,
142    /// Quantity mantissa (multiply by 10^exponent to get actual quantity).
143    pub qty_mantissa: i64,
144}
145
146impl PriceLevel {
147    pub const ENCODED_LENGTH: usize = 16;
148
149    /// Decode price level from buffer.
150    ///
151    /// # Errors
152    ///
153    /// Returns error if buffer is less than 16 bytes.
154    #[allow(clippy::missing_panics_doc)] // Panic unreachable after length check
155    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
156        if buf.len() < Self::ENCODED_LENGTH {
157            return Err(StreamDecodeError::BufferTooShort {
158                expected: Self::ENCODED_LENGTH,
159                actual: buf.len(),
160            });
161        }
162
163        // SAFETY: Length checked above
164        Ok(Self {
165            price_mantissa: i64::from_le_bytes(buf[0..8].try_into().unwrap()),
166            qty_mantissa: i64::from_le_bytes(buf[8..16].try_into().unwrap()),
167        })
168    }
169}
170
171/// Convert mantissa and exponent to f64.
172#[inline]
173#[must_use]
174pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
175    mantissa as f64 * 10_f64.powi(exponent as i32)
176}
177
178/// Decode a varString8 (1-byte length prefix + UTF-8 data).
179///
180/// Returns (string, bytes_consumed) on success.
181///
182/// # Errors
183///
184/// Returns error if buffer is too short or contains invalid UTF-8.
185pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
186    if buf.is_empty() {
187        return Err(StreamDecodeError::BufferTooShort {
188            expected: 1,
189            actual: 0,
190        });
191    }
192
193    let len = buf[0] as usize;
194    let total_len = 1 + len;
195
196    if buf.len() < total_len {
197        return Err(StreamDecodeError::BufferTooShort {
198            expected: total_len,
199            actual: buf.len(),
200        });
201    }
202
203    let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
204
205    Ok((s.to_string(), total_len))
206}
207
208/// Group size encoding (6 bytes: u16 block_length + u32 num_in_group).
209#[derive(Debug, Clone, Copy)]
210pub struct GroupSizeEncoding {
211    pub block_length: u16,
212    pub num_in_group: u32,
213}
214
215impl GroupSizeEncoding {
216    pub const ENCODED_LENGTH: usize = 6;
217
218    /// Decode group size encoding from buffer.
219    ///
220    /// # Errors
221    ///
222    /// Returns error if buffer is too short or group count exceeds safety limit.
223    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
224        if buf.len() < Self::ENCODED_LENGTH {
225            return Err(StreamDecodeError::BufferTooShort {
226                expected: Self::ENCODED_LENGTH,
227                actual: buf.len(),
228            });
229        }
230
231        let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
232
233        if num_in_group as usize > MAX_GROUP_SIZE {
234            return Err(StreamDecodeError::GroupSizeTooLarge {
235                count: num_in_group as usize,
236                max: MAX_GROUP_SIZE,
237            });
238        }
239
240        Ok(Self {
241            block_length: u16::from_le_bytes([buf[0], buf[1]]),
242            num_in_group,
243        })
244    }
245}
246
247/// Group size 16 encoding (4 bytes: u16 block_length + u16 num_in_group).
248#[derive(Debug, Clone, Copy)]
249pub struct GroupSize16Encoding {
250    pub block_length: u16,
251    pub num_in_group: u16,
252}
253
254impl GroupSize16Encoding {
255    pub const ENCODED_LENGTH: usize = 4;
256
257    /// Decode group size 16 encoding from buffer.
258    ///
259    /// # Errors
260    ///
261    /// Returns error if buffer is too short or group count exceeds safety limit.
262    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
263        if buf.len() < Self::ENCODED_LENGTH {
264            return Err(StreamDecodeError::BufferTooShort {
265                expected: Self::ENCODED_LENGTH,
266                actual: buf.len(),
267            });
268        }
269
270        let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
271
272        if num_in_group as usize > MAX_GROUP_SIZE {
273            return Err(StreamDecodeError::GroupSizeTooLarge {
274                count: num_in_group as usize,
275                max: MAX_GROUP_SIZE,
276            });
277        }
278
279        Ok(Self {
280            block_length: u16::from_le_bytes([buf[0], buf[1]]),
281            num_in_group,
282        })
283    }
284}
285
286/// Helper to safely read bytes from buffer with bounds checking.
287#[inline]
288fn read_i64_le(buf: &[u8], offset: usize) -> Result<i64, StreamDecodeError> {
289    let end = offset + 8;
290    if buf.len() < end {
291        return Err(StreamDecodeError::BufferTooShort {
292            expected: end,
293            actual: buf.len(),
294        });
295    }
296    Ok(i64::from_le_bytes(buf[offset..end].try_into().unwrap()))
297}
298
299/// Helper to safely read i8 from buffer.
300#[inline]
301fn read_i8(buf: &[u8], offset: usize) -> Result<i8, StreamDecodeError> {
302    if buf.len() <= offset {
303        return Err(StreamDecodeError::BufferTooShort {
304            expected: offset + 1,
305            actual: buf.len(),
306        });
307    }
308    Ok(buf[offset] as i8)
309}
310
311/// Helper to safely read u8 from buffer.
312#[inline]
313fn read_u8(buf: &[u8], offset: usize) -> Result<u8, StreamDecodeError> {
314    if buf.len() <= offset {
315        return Err(StreamDecodeError::BufferTooShort {
316            expected: offset + 1,
317            actual: buf.len(),
318        });
319    }
320    Ok(buf[offset])
321}
322
323#[cfg(test)]
324mod tests {
325    use rstest::rstest;
326
327    use super::*;
328
329    #[rstest]
330    fn test_mantissa_to_f64() {
331        assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
332        assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
333        assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
334    }
335
336    #[rstest]
337    fn test_message_header_too_short() {
338        let buf = [0u8; 7];
339        let err = MessageHeader::decode(&buf).unwrap_err();
340        assert_eq!(
341            err,
342            StreamDecodeError::BufferTooShort {
343                expected: 8,
344                actual: 7
345            }
346        );
347    }
348
349    #[rstest]
350    fn test_group_size_too_large() {
351        // Craft a buffer with num_in_group = MAX_GROUP_SIZE + 1
352        let mut buf = [0u8; 6];
353        let count = (MAX_GROUP_SIZE + 1) as u32;
354        buf[2..6].copy_from_slice(&count.to_le_bytes());
355
356        let err = GroupSizeEncoding::decode(&buf).unwrap_err();
357        assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
358    }
359
360    #[rstest]
361    fn test_decode_var_string8_empty_buffer() {
362        let err = decode_var_string8(&[]).unwrap_err();
363        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
364    }
365
366    #[rstest]
367    fn test_decode_var_string8_truncated() {
368        // Length says 10 bytes, but only 5 available
369        let buf = [10u8, b'H', b'E', b'L', b'L'];
370        let err = decode_var_string8(&buf).unwrap_err();
371        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
372    }
373
374    #[rstest]
375    fn test_decode_var_string8_valid() {
376        let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
377        let (s, consumed) = decode_var_string8(&buf).unwrap();
378        assert_eq!(s, "HELLO");
379        assert_eq!(consumed, 6);
380    }
381
382    #[rstest]
383    fn test_schema_validation() {
384        let header = MessageHeader {
385            block_length: 50,
386            template_id: 10001,
387            schema_id: 99, // Wrong schema
388            version: 0,
389        };
390        let err = header.validate_schema().unwrap_err();
391        assert_eq!(
392            err,
393            StreamDecodeError::SchemaMismatch {
394                expected: STREAM_SCHEMA_ID,
395                actual: 99
396            }
397        );
398    }
399}