Skip to main content

nautilus_binance/common/sbe/stream/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// SBE stream decoders - all methods return StreamDecodeError on decode failure
17#![allow(clippy::missing_errors_doc)]
18
19//! Binance SBE market data stream decoders (schema 1:0).
20//!
21//! These decoders are hand-written for the 4 market data stream message types:
22//! - [`TradesStreamEvent`] - Real-time trade data
23//! - [`BestBidAskStreamEvent`] - Best bid/ask (BBO) updates
24//! - [`DepthSnapshotStreamEvent`] - Order book snapshots (top N levels)
25//! - [`DepthDiffStreamEvent`] - Incremental order book updates
26//!
27//! All decoders return `Result<T, StreamDecodeError>` to safely handle malformed
28//! or truncated network data without panicking.
29
30use std::fmt::Display;
31
32use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
33
34mod best_bid_ask;
35mod depth_diff;
36mod depth_snapshot;
37mod trades;
38
39pub use best_bid_ask::BestBidAskStreamEvent;
40pub use depth_diff::DepthDiffStreamEvent;
41pub use depth_snapshot::DepthSnapshotStreamEvent;
42pub use trades::{Trade, TradesStreamEvent};
43
44/// Stream schema ID (from stream_1_0.xml).
45pub const STREAM_SCHEMA_ID: u16 = 1;
46
47/// Stream schema version.
48pub const STREAM_SCHEMA_VERSION: u16 = 0;
49
50/// Maximum allowed group size to prevent OOM from malicious payloads.
51/// Binance depth streams typically have at most 5000 levels.
52pub const MAX_GROUP_SIZE: usize = 10_000;
53
54/// Message template IDs for stream events.
55pub mod template_id {
56    pub const TRADES_STREAM_EVENT: u16 = 10000;
57    pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
58    pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
59    pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
60}
61
62/// Stream decode error.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum StreamDecodeError {
65    /// Buffer too short to decode expected data.
66    BufferTooShort { expected: usize, actual: usize },
67    /// Group count exceeds safety limit.
68    GroupSizeTooLarge { count: usize, max: usize },
69    /// Invalid UTF-8 in symbol string.
70    InvalidUtf8,
71    /// Schema ID mismatch.
72    SchemaMismatch { expected: u16, actual: u16 },
73    /// Unknown template ID.
74    UnknownTemplateId(u16),
75}
76
77impl Display for StreamDecodeError {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            Self::BufferTooShort { expected, actual } => {
81                write!(
82                    f,
83                    "Buffer too short: expected {expected} bytes, was {actual}"
84                )
85            }
86            Self::GroupSizeTooLarge { count, max } => {
87                write!(f, "Group size {count} exceeds maximum {max}")
88            }
89            Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
90            Self::SchemaMismatch { expected, actual } => {
91                write!(f, "Schema mismatch: expected {expected}, was {actual}")
92            }
93            Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
94        }
95    }
96}
97
98impl std::error::Error for StreamDecodeError {}
99
100impl From<SbeDecodeError> for StreamDecodeError {
101    fn from(err: SbeDecodeError) -> Self {
102        match err {
103            SbeDecodeError::BufferTooShort { expected, actual } => {
104                Self::BufferTooShort { expected, actual }
105            }
106            SbeDecodeError::SchemaMismatch { expected, actual } => {
107                Self::SchemaMismatch { expected, actual }
108            }
109            SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
110                expected: STREAM_SCHEMA_VERSION,
111                actual: 0,
112            },
113            SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
114            SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
115                count: count as usize,
116                max: max as usize,
117            },
118            SbeDecodeError::InvalidBlockLength { .. } => Self::BufferTooShort {
119                expected: 0,
120                actual: 0,
121            },
122            SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
123        }
124    }
125}
126
127/// SBE message header (8 bytes).
128#[derive(Debug, Clone, Copy)]
129pub struct MessageHeader {
130    pub block_length: u16,
131    pub template_id: u16,
132    pub schema_id: u16,
133    pub version: u16,
134}
135
136impl MessageHeader {
137    pub const ENCODED_LENGTH: usize = 8;
138
139    /// Decode message header from buffer.
140    ///
141    /// # Errors
142    ///
143    /// Returns error if buffer is less than 8 bytes.
144    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
145        if buf.len() < Self::ENCODED_LENGTH {
146            return Err(StreamDecodeError::BufferTooShort {
147                expected: Self::ENCODED_LENGTH,
148                actual: buf.len(),
149            });
150        }
151        Ok(Self {
152            block_length: u16::from_le_bytes([buf[0], buf[1]]),
153            template_id: u16::from_le_bytes([buf[2], buf[3]]),
154            schema_id: u16::from_le_bytes([buf[4], buf[5]]),
155            version: u16::from_le_bytes([buf[6], buf[7]]),
156        })
157    }
158
159    /// Validate schema ID matches expected stream schema.
160    pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
161        if self.schema_id != STREAM_SCHEMA_ID {
162            return Err(StreamDecodeError::SchemaMismatch {
163                expected: STREAM_SCHEMA_ID,
164                actual: self.schema_id,
165            });
166        }
167        Ok(())
168    }
169}
170
171/// Price/quantity level in order book.
172#[derive(Debug, Clone, Copy)]
173pub struct PriceLevel {
174    /// Price mantissa (multiply by 10^exponent to get actual price).
175    pub price_mantissa: i64,
176    /// Quantity mantissa (multiply by 10^exponent to get actual quantity).
177    pub qty_mantissa: i64,
178}
179
180impl PriceLevel {
181    pub const ENCODED_LENGTH: usize = 16;
182
183    /// Decode price level from cursor.
184    ///
185    /// # Errors
186    ///
187    /// Returns error if buffer is too short.
188    pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
189        Ok(Self {
190            price_mantissa: cursor.read_i64_le()?,
191            qty_mantissa: cursor.read_i64_le()?,
192        })
193    }
194}
195
196/// Convert mantissa and exponent to f64.
197#[inline]
198#[must_use]
199pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
200    mantissa as f64 * 10_f64.powi(exponent as i32)
201}
202
203/// Decode a varString8 (1-byte length prefix + UTF-8 data).
204///
205/// Returns (string, bytes_consumed) on success.
206///
207/// # Errors
208///
209/// Returns error if buffer is too short or contains invalid UTF-8.
210pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
211    if buf.is_empty() {
212        return Err(StreamDecodeError::BufferTooShort {
213            expected: 1,
214            actual: 0,
215        });
216    }
217
218    let len = buf[0] as usize;
219    let total_len = 1 + len;
220
221    if buf.len() < total_len {
222        return Err(StreamDecodeError::BufferTooShort {
223            expected: total_len,
224            actual: buf.len(),
225        });
226    }
227
228    let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
229
230    Ok((s.to_string(), total_len))
231}
232
233/// Group size encoding (6 bytes: u16 block_length + u32 num_in_group).
234#[derive(Debug, Clone, Copy)]
235pub struct GroupSizeEncoding {
236    pub block_length: u16,
237    pub num_in_group: u32,
238}
239
240impl GroupSizeEncoding {
241    pub const ENCODED_LENGTH: usize = 6;
242
243    /// Decode group size encoding from buffer.
244    ///
245    /// # Errors
246    ///
247    /// Returns error if buffer is too short or group count exceeds safety limit.
248    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
249        if buf.len() < Self::ENCODED_LENGTH {
250            return Err(StreamDecodeError::BufferTooShort {
251                expected: Self::ENCODED_LENGTH,
252                actual: buf.len(),
253            });
254        }
255
256        let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
257
258        if num_in_group as usize > MAX_GROUP_SIZE {
259            return Err(StreamDecodeError::GroupSizeTooLarge {
260                count: num_in_group as usize,
261                max: MAX_GROUP_SIZE,
262            });
263        }
264
265        Ok(Self {
266            block_length: u16::from_le_bytes([buf[0], buf[1]]),
267            num_in_group,
268        })
269    }
270}
271
272/// Group size 16 encoding (4 bytes: u16 block_length + u16 num_in_group).
273#[derive(Debug, Clone, Copy)]
274pub struct GroupSize16Encoding {
275    pub block_length: u16,
276    pub num_in_group: u16,
277}
278
279impl GroupSize16Encoding {
280    pub const ENCODED_LENGTH: usize = 4;
281
282    /// Decode group size 16 encoding from buffer.
283    ///
284    /// # Errors
285    ///
286    /// Returns error if buffer is too short or group count exceeds safety limit.
287    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
288        if buf.len() < Self::ENCODED_LENGTH {
289            return Err(StreamDecodeError::BufferTooShort {
290                expected: Self::ENCODED_LENGTH,
291                actual: buf.len(),
292            });
293        }
294
295        let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
296
297        if num_in_group as usize > MAX_GROUP_SIZE {
298            return Err(StreamDecodeError::GroupSizeTooLarge {
299                count: num_in_group as usize,
300                max: MAX_GROUP_SIZE,
301            });
302        }
303
304        Ok(Self {
305            block_length: u16::from_le_bytes([buf[0], buf[1]]),
306            num_in_group,
307        })
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use rstest::rstest;
314
315    use super::*;
316
317    #[rstest]
318    fn test_mantissa_to_f64() {
319        assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
320        assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
321        assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
322    }
323
324    #[rstest]
325    fn test_message_header_too_short() {
326        let buf = [0u8; 7];
327        let err = MessageHeader::decode(&buf).unwrap_err();
328        assert_eq!(
329            err,
330            StreamDecodeError::BufferTooShort {
331                expected: 8,
332                actual: 7
333            }
334        );
335    }
336
337    #[rstest]
338    fn test_group_size_too_large() {
339        // Craft a buffer with num_in_group = MAX_GROUP_SIZE + 1
340        let mut buf = [0u8; 6];
341        let count = (MAX_GROUP_SIZE + 1) as u32;
342        buf[2..6].copy_from_slice(&count.to_le_bytes());
343
344        let err = GroupSizeEncoding::decode(&buf).unwrap_err();
345        assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
346    }
347
348    #[rstest]
349    fn test_decode_var_string8_empty_buffer() {
350        let err = decode_var_string8(&[]).unwrap_err();
351        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
352    }
353
354    #[rstest]
355    fn test_decode_var_string8_truncated() {
356        // Length says 10 bytes, but only 5 available
357        let buf = [10u8, b'H', b'E', b'L', b'L'];
358        let err = decode_var_string8(&buf).unwrap_err();
359        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
360    }
361
362    #[rstest]
363    fn test_decode_var_string8_valid() {
364        let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
365        let (s, consumed) = decode_var_string8(&buf).unwrap();
366        assert_eq!(s, "HELLO");
367        assert_eq!(consumed, 6);
368    }
369
370    #[rstest]
371    fn test_schema_validation() {
372        let header = MessageHeader {
373            block_length: 50,
374            template_id: 10001,
375            schema_id: 99, // Wrong schema
376            version: 0,
377        };
378        let err = header.validate_schema().unwrap_err();
379        assert_eq!(
380            err,
381            StreamDecodeError::SchemaMismatch {
382                expected: STREAM_SCHEMA_ID,
383                actual: 99
384            }
385        );
386    }
387}