nautilus_binance/common/sbe/stream/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance SBE market data stream decoders (schema 1:0).
17//!
18//! These decoders are hand-written for the 4 market data stream message types:
19//! - [`TradesStreamEvent`] - Real-time trade data
20//! - [`BestBidAskStreamEvent`] - Best bid/ask (BBO) updates
21//! - [`DepthSnapshotStreamEvent`] - Order book snapshots (top N levels)
22//! - [`DepthDiffStreamEvent`] - Incremental order book updates
23//!
24//! All decoders return `Result<T, StreamDecodeError>` to safely handle malformed
25//! or truncated network data without panicking.
26
27use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
28
29mod best_bid_ask;
30mod depth_diff;
31mod depth_snapshot;
32mod trades;
33
34pub use best_bid_ask::BestBidAskStreamEvent;
35pub use depth_diff::DepthDiffStreamEvent;
36pub use depth_snapshot::DepthSnapshotStreamEvent;
37pub use trades::{Trade, TradesStreamEvent};
38
39/// Stream schema ID (from stream_1_0.xml).
40pub const STREAM_SCHEMA_ID: u16 = 1;
41
42/// Stream schema version.
43pub const STREAM_SCHEMA_VERSION: u16 = 0;
44
45/// Maximum allowed group size to prevent OOM from malicious payloads.
46/// Binance depth streams typically have at most 5000 levels.
47pub const MAX_GROUP_SIZE: usize = 10_000;
48
49/// Message template IDs for stream events.
50pub mod template_id {
51    pub const TRADES_STREAM_EVENT: u16 = 10000;
52    pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
53    pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
54    pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
55}
56
57/// Stream decode error.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum StreamDecodeError {
60    /// Buffer too short to decode expected data.
61    BufferTooShort { expected: usize, actual: usize },
62    /// Group count exceeds safety limit.
63    GroupSizeTooLarge { count: usize, max: usize },
64    /// Invalid UTF-8 in symbol string.
65    InvalidUtf8,
66    /// Schema ID mismatch.
67    SchemaMismatch { expected: u16, actual: u16 },
68    /// Unknown template ID.
69    UnknownTemplateId(u16),
70}
71
72impl std::fmt::Display for StreamDecodeError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            Self::BufferTooShort { expected, actual } => {
76                write!(
77                    f,
78                    "Buffer too short: expected {expected} bytes, got {actual}"
79                )
80            }
81            Self::GroupSizeTooLarge { count, max } => {
82                write!(f, "Group size {count} exceeds maximum {max}")
83            }
84            Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
85            Self::SchemaMismatch { expected, actual } => {
86                write!(f, "Schema mismatch: expected {expected}, got {actual}")
87            }
88            Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
89        }
90    }
91}
92
93impl std::error::Error for StreamDecodeError {}
94
95impl From<SbeDecodeError> for StreamDecodeError {
96    fn from(err: SbeDecodeError) -> Self {
97        match err {
98            SbeDecodeError::BufferTooShort { expected, actual } => {
99                Self::BufferTooShort { expected, actual }
100            }
101            SbeDecodeError::SchemaMismatch { expected, actual } => {
102                Self::SchemaMismatch { expected, actual }
103            }
104            SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
105                expected: STREAM_SCHEMA_VERSION,
106                actual: 0,
107            },
108            SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
109            SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
110                count: count as usize,
111                max: max as usize,
112            },
113            SbeDecodeError::InvalidBlockLength { .. } => Self::BufferTooShort {
114                expected: 0,
115                actual: 0,
116            },
117            SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
118        }
119    }
120}
121
122/// SBE message header (8 bytes).
123#[derive(Debug, Clone, Copy)]
124pub struct MessageHeader {
125    pub block_length: u16,
126    pub template_id: u16,
127    pub schema_id: u16,
128    pub version: u16,
129}
130
131impl MessageHeader {
132    pub const ENCODED_LENGTH: usize = 8;
133
134    /// Decode message header from buffer.
135    ///
136    /// # Errors
137    ///
138    /// Returns error if buffer is less than 8 bytes.
139    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
140        if buf.len() < Self::ENCODED_LENGTH {
141            return Err(StreamDecodeError::BufferTooShort {
142                expected: Self::ENCODED_LENGTH,
143                actual: buf.len(),
144            });
145        }
146        Ok(Self {
147            block_length: u16::from_le_bytes([buf[0], buf[1]]),
148            template_id: u16::from_le_bytes([buf[2], buf[3]]),
149            schema_id: u16::from_le_bytes([buf[4], buf[5]]),
150            version: u16::from_le_bytes([buf[6], buf[7]]),
151        })
152    }
153
154    /// Validate schema ID matches expected stream schema.
155    pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
156        if self.schema_id != STREAM_SCHEMA_ID {
157            return Err(StreamDecodeError::SchemaMismatch {
158                expected: STREAM_SCHEMA_ID,
159                actual: self.schema_id,
160            });
161        }
162        Ok(())
163    }
164}
165
166/// Price/quantity level in order book.
167#[derive(Debug, Clone, Copy)]
168pub struct PriceLevel {
169    /// Price mantissa (multiply by 10^exponent to get actual price).
170    pub price_mantissa: i64,
171    /// Quantity mantissa (multiply by 10^exponent to get actual quantity).
172    pub qty_mantissa: i64,
173}
174
175impl PriceLevel {
176    pub const ENCODED_LENGTH: usize = 16;
177
178    /// Decode price level from cursor.
179    ///
180    /// # Errors
181    ///
182    /// Returns error if buffer is too short.
183    pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
184        Ok(Self {
185            price_mantissa: cursor.read_i64_le()?,
186            qty_mantissa: cursor.read_i64_le()?,
187        })
188    }
189}
190
191/// Convert mantissa and exponent to f64.
192#[inline]
193#[must_use]
194pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
195    mantissa as f64 * 10_f64.powi(exponent as i32)
196}
197
198/// Decode a varString8 (1-byte length prefix + UTF-8 data).
199///
200/// Returns (string, bytes_consumed) on success.
201///
202/// # Errors
203///
204/// Returns error if buffer is too short or contains invalid UTF-8.
205pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
206    if buf.is_empty() {
207        return Err(StreamDecodeError::BufferTooShort {
208            expected: 1,
209            actual: 0,
210        });
211    }
212
213    let len = buf[0] as usize;
214    let total_len = 1 + len;
215
216    if buf.len() < total_len {
217        return Err(StreamDecodeError::BufferTooShort {
218            expected: total_len,
219            actual: buf.len(),
220        });
221    }
222
223    let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
224
225    Ok((s.to_string(), total_len))
226}
227
228/// Group size encoding (6 bytes: u16 block_length + u32 num_in_group).
229#[derive(Debug, Clone, Copy)]
230pub struct GroupSizeEncoding {
231    pub block_length: u16,
232    pub num_in_group: u32,
233}
234
235impl GroupSizeEncoding {
236    pub const ENCODED_LENGTH: usize = 6;
237
238    /// Decode group size encoding from buffer.
239    ///
240    /// # Errors
241    ///
242    /// Returns error if buffer is too short or group count exceeds safety limit.
243    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
244        if buf.len() < Self::ENCODED_LENGTH {
245            return Err(StreamDecodeError::BufferTooShort {
246                expected: Self::ENCODED_LENGTH,
247                actual: buf.len(),
248            });
249        }
250
251        let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
252
253        if num_in_group as usize > MAX_GROUP_SIZE {
254            return Err(StreamDecodeError::GroupSizeTooLarge {
255                count: num_in_group as usize,
256                max: MAX_GROUP_SIZE,
257            });
258        }
259
260        Ok(Self {
261            block_length: u16::from_le_bytes([buf[0], buf[1]]),
262            num_in_group,
263        })
264    }
265}
266
267/// Group size 16 encoding (4 bytes: u16 block_length + u16 num_in_group).
268#[derive(Debug, Clone, Copy)]
269pub struct GroupSize16Encoding {
270    pub block_length: u16,
271    pub num_in_group: u16,
272}
273
274impl GroupSize16Encoding {
275    pub const ENCODED_LENGTH: usize = 4;
276
277    /// Decode group size 16 encoding from buffer.
278    ///
279    /// # Errors
280    ///
281    /// Returns error if buffer is too short or group count exceeds safety limit.
282    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
283        if buf.len() < Self::ENCODED_LENGTH {
284            return Err(StreamDecodeError::BufferTooShort {
285                expected: Self::ENCODED_LENGTH,
286                actual: buf.len(),
287            });
288        }
289
290        let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
291
292        if num_in_group as usize > MAX_GROUP_SIZE {
293            return Err(StreamDecodeError::GroupSizeTooLarge {
294                count: num_in_group as usize,
295                max: MAX_GROUP_SIZE,
296            });
297        }
298
299        Ok(Self {
300            block_length: u16::from_le_bytes([buf[0], buf[1]]),
301            num_in_group,
302        })
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use rstest::rstest;
309
310    use super::*;
311
312    #[rstest]
313    fn test_mantissa_to_f64() {
314        assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
315        assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
316        assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
317    }
318
319    #[rstest]
320    fn test_message_header_too_short() {
321        let buf = [0u8; 7];
322        let err = MessageHeader::decode(&buf).unwrap_err();
323        assert_eq!(
324            err,
325            StreamDecodeError::BufferTooShort {
326                expected: 8,
327                actual: 7
328            }
329        );
330    }
331
332    #[rstest]
333    fn test_group_size_too_large() {
334        // Craft a buffer with num_in_group = MAX_GROUP_SIZE + 1
335        let mut buf = [0u8; 6];
336        let count = (MAX_GROUP_SIZE + 1) as u32;
337        buf[2..6].copy_from_slice(&count.to_le_bytes());
338
339        let err = GroupSizeEncoding::decode(&buf).unwrap_err();
340        assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
341    }
342
343    #[rstest]
344    fn test_decode_var_string8_empty_buffer() {
345        let err = decode_var_string8(&[]).unwrap_err();
346        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
347    }
348
349    #[rstest]
350    fn test_decode_var_string8_truncated() {
351        // Length says 10 bytes, but only 5 available
352        let buf = [10u8, b'H', b'E', b'L', b'L'];
353        let err = decode_var_string8(&buf).unwrap_err();
354        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
355    }
356
357    #[rstest]
358    fn test_decode_var_string8_valid() {
359        let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
360        let (s, consumed) = decode_var_string8(&buf).unwrap();
361        assert_eq!(s, "HELLO");
362        assert_eq!(consumed, 6);
363    }
364
365    #[rstest]
366    fn test_schema_validation() {
367        let header = MessageHeader {
368            block_length: 50,
369            template_id: 10001,
370            schema_id: 99, // Wrong schema
371            version: 0,
372        };
373        let err = header.validate_schema().unwrap_err();
374        assert_eq!(
375            err,
376            StreamDecodeError::SchemaMismatch {
377                expected: STREAM_SCHEMA_ID,
378                actual: 99
379            }
380        );
381    }
382}