nautilus_binance/common/sbe/stream/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance SBE market data stream decoders (schema 1:0).
17//!
18//! These decoders are hand-written for the 4 market data stream message types:
19//! - [`TradesStreamEvent`] - Real-time trade data
20//! - [`BestBidAskStreamEvent`] - Best bid/ask (BBO) updates
21//! - [`DepthSnapshotStreamEvent`] - Order book snapshots (top N levels)
22//! - [`DepthDiffStreamEvent`] - Incremental order book updates
23//!
24//! All decoders return `Result<T, StreamDecodeError>` to safely handle malformed
25//! or truncated network data without panicking.
26
27use std::fmt::Display;
28
29use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
30
31mod best_bid_ask;
32mod depth_diff;
33mod depth_snapshot;
34mod trades;
35
36pub use best_bid_ask::BestBidAskStreamEvent;
37pub use depth_diff::DepthDiffStreamEvent;
38pub use depth_snapshot::DepthSnapshotStreamEvent;
39pub use trades::{Trade, TradesStreamEvent};
40
41/// Stream schema ID (from stream_1_0.xml).
42pub const STREAM_SCHEMA_ID: u16 = 1;
43
44/// Stream schema version.
45pub const STREAM_SCHEMA_VERSION: u16 = 0;
46
47/// Maximum allowed group size to prevent OOM from malicious payloads.
48/// Binance depth streams typically have at most 5000 levels.
49pub const MAX_GROUP_SIZE: usize = 10_000;
50
51/// Message template IDs for stream events.
52pub mod template_id {
53    pub const TRADES_STREAM_EVENT: u16 = 10000;
54    pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
55    pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
56    pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
57}
58
59/// Stream decode error.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum StreamDecodeError {
62    /// Buffer too short to decode expected data.
63    BufferTooShort { expected: usize, actual: usize },
64    /// Group count exceeds safety limit.
65    GroupSizeTooLarge { count: usize, max: usize },
66    /// Invalid UTF-8 in symbol string.
67    InvalidUtf8,
68    /// Schema ID mismatch.
69    SchemaMismatch { expected: u16, actual: u16 },
70    /// Unknown template ID.
71    UnknownTemplateId(u16),
72}
73
74impl Display for StreamDecodeError {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        match self {
77            Self::BufferTooShort { expected, actual } => {
78                write!(
79                    f,
80                    "Buffer too short: expected {expected} bytes, got {actual}"
81                )
82            }
83            Self::GroupSizeTooLarge { count, max } => {
84                write!(f, "Group size {count} exceeds maximum {max}")
85            }
86            Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
87            Self::SchemaMismatch { expected, actual } => {
88                write!(f, "Schema mismatch: expected {expected}, got {actual}")
89            }
90            Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
91        }
92    }
93}
94
95impl std::error::Error for StreamDecodeError {}
96
97impl From<SbeDecodeError> for StreamDecodeError {
98    fn from(err: SbeDecodeError) -> Self {
99        match err {
100            SbeDecodeError::BufferTooShort { expected, actual } => {
101                Self::BufferTooShort { expected, actual }
102            }
103            SbeDecodeError::SchemaMismatch { expected, actual } => {
104                Self::SchemaMismatch { expected, actual }
105            }
106            SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
107                expected: STREAM_SCHEMA_VERSION,
108                actual: 0,
109            },
110            SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
111            SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
112                count: count as usize,
113                max: max as usize,
114            },
115            SbeDecodeError::InvalidBlockLength { .. } => Self::BufferTooShort {
116                expected: 0,
117                actual: 0,
118            },
119            SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
120        }
121    }
122}
123
124/// SBE message header (8 bytes).
125#[derive(Debug, Clone, Copy)]
126pub struct MessageHeader {
127    pub block_length: u16,
128    pub template_id: u16,
129    pub schema_id: u16,
130    pub version: u16,
131}
132
133impl MessageHeader {
134    pub const ENCODED_LENGTH: usize = 8;
135
136    /// Decode message header from buffer.
137    ///
138    /// # Errors
139    ///
140    /// Returns error if buffer is less than 8 bytes.
141    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
142        if buf.len() < Self::ENCODED_LENGTH {
143            return Err(StreamDecodeError::BufferTooShort {
144                expected: Self::ENCODED_LENGTH,
145                actual: buf.len(),
146            });
147        }
148        Ok(Self {
149            block_length: u16::from_le_bytes([buf[0], buf[1]]),
150            template_id: u16::from_le_bytes([buf[2], buf[3]]),
151            schema_id: u16::from_le_bytes([buf[4], buf[5]]),
152            version: u16::from_le_bytes([buf[6], buf[7]]),
153        })
154    }
155
156    /// Validate schema ID matches expected stream schema.
157    pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
158        if self.schema_id != STREAM_SCHEMA_ID {
159            return Err(StreamDecodeError::SchemaMismatch {
160                expected: STREAM_SCHEMA_ID,
161                actual: self.schema_id,
162            });
163        }
164        Ok(())
165    }
166}
167
168/// Price/quantity level in order book.
169#[derive(Debug, Clone, Copy)]
170pub struct PriceLevel {
171    /// Price mantissa (multiply by 10^exponent to get actual price).
172    pub price_mantissa: i64,
173    /// Quantity mantissa (multiply by 10^exponent to get actual quantity).
174    pub qty_mantissa: i64,
175}
176
177impl PriceLevel {
178    pub const ENCODED_LENGTH: usize = 16;
179
180    /// Decode price level from cursor.
181    ///
182    /// # Errors
183    ///
184    /// Returns error if buffer is too short.
185    pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
186        Ok(Self {
187            price_mantissa: cursor.read_i64_le()?,
188            qty_mantissa: cursor.read_i64_le()?,
189        })
190    }
191}
192
193/// Convert mantissa and exponent to f64.
194#[inline]
195#[must_use]
196pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
197    mantissa as f64 * 10_f64.powi(exponent as i32)
198}
199
200/// Decode a varString8 (1-byte length prefix + UTF-8 data).
201///
202/// Returns (string, bytes_consumed) on success.
203///
204/// # Errors
205///
206/// Returns error if buffer is too short or contains invalid UTF-8.
207pub fn decode_var_string8(buf: &[u8]) -> Result<(String, usize), StreamDecodeError> {
208    if buf.is_empty() {
209        return Err(StreamDecodeError::BufferTooShort {
210            expected: 1,
211            actual: 0,
212        });
213    }
214
215    let len = buf[0] as usize;
216    let total_len = 1 + len;
217
218    if buf.len() < total_len {
219        return Err(StreamDecodeError::BufferTooShort {
220            expected: total_len,
221            actual: buf.len(),
222        });
223    }
224
225    let s = std::str::from_utf8(&buf[1..total_len]).map_err(|_| StreamDecodeError::InvalidUtf8)?;
226
227    Ok((s.to_string(), total_len))
228}
229
230/// Group size encoding (6 bytes: u16 block_length + u32 num_in_group).
231#[derive(Debug, Clone, Copy)]
232pub struct GroupSizeEncoding {
233    pub block_length: u16,
234    pub num_in_group: u32,
235}
236
237impl GroupSizeEncoding {
238    pub const ENCODED_LENGTH: usize = 6;
239
240    /// Decode group size encoding from buffer.
241    ///
242    /// # Errors
243    ///
244    /// Returns error if buffer is too short or group count exceeds safety limit.
245    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
246        if buf.len() < Self::ENCODED_LENGTH {
247            return Err(StreamDecodeError::BufferTooShort {
248                expected: Self::ENCODED_LENGTH,
249                actual: buf.len(),
250            });
251        }
252
253        let num_in_group = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
254
255        if num_in_group as usize > MAX_GROUP_SIZE {
256            return Err(StreamDecodeError::GroupSizeTooLarge {
257                count: num_in_group as usize,
258                max: MAX_GROUP_SIZE,
259            });
260        }
261
262        Ok(Self {
263            block_length: u16::from_le_bytes([buf[0], buf[1]]),
264            num_in_group,
265        })
266    }
267}
268
269/// Group size 16 encoding (4 bytes: u16 block_length + u16 num_in_group).
270#[derive(Debug, Clone, Copy)]
271pub struct GroupSize16Encoding {
272    pub block_length: u16,
273    pub num_in_group: u16,
274}
275
276impl GroupSize16Encoding {
277    pub const ENCODED_LENGTH: usize = 4;
278
279    /// Decode group size 16 encoding from buffer.
280    ///
281    /// # Errors
282    ///
283    /// Returns error if buffer is too short or group count exceeds safety limit.
284    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
285        if buf.len() < Self::ENCODED_LENGTH {
286            return Err(StreamDecodeError::BufferTooShort {
287                expected: Self::ENCODED_LENGTH,
288                actual: buf.len(),
289            });
290        }
291
292        let num_in_group = u16::from_le_bytes([buf[2], buf[3]]);
293
294        if num_in_group as usize > MAX_GROUP_SIZE {
295            return Err(StreamDecodeError::GroupSizeTooLarge {
296                count: num_in_group as usize,
297                max: MAX_GROUP_SIZE,
298            });
299        }
300
301        Ok(Self {
302            block_length: u16::from_le_bytes([buf[0], buf[1]]),
303            num_in_group,
304        })
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use rstest::rstest;
311
312    use super::*;
313
314    #[rstest]
315    fn test_mantissa_to_f64() {
316        assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
317        assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
318        assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
319    }
320
321    #[rstest]
322    fn test_message_header_too_short() {
323        let buf = [0u8; 7];
324        let err = MessageHeader::decode(&buf).unwrap_err();
325        assert_eq!(
326            err,
327            StreamDecodeError::BufferTooShort {
328                expected: 8,
329                actual: 7
330            }
331        );
332    }
333
334    #[rstest]
335    fn test_group_size_too_large() {
336        // Craft a buffer with num_in_group = MAX_GROUP_SIZE + 1
337        let mut buf = [0u8; 6];
338        let count = (MAX_GROUP_SIZE + 1) as u32;
339        buf[2..6].copy_from_slice(&count.to_le_bytes());
340
341        let err = GroupSizeEncoding::decode(&buf).unwrap_err();
342        assert!(matches!(err, StreamDecodeError::GroupSizeTooLarge { .. }));
343    }
344
345    #[rstest]
346    fn test_decode_var_string8_empty_buffer() {
347        let err = decode_var_string8(&[]).unwrap_err();
348        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
349    }
350
351    #[rstest]
352    fn test_decode_var_string8_truncated() {
353        // Length says 10 bytes, but only 5 available
354        let buf = [10u8, b'H', b'E', b'L', b'L'];
355        let err = decode_var_string8(&buf).unwrap_err();
356        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
357    }
358
359    #[rstest]
360    fn test_decode_var_string8_valid() {
361        let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
362        let (s, consumed) = decode_var_string8(&buf).unwrap();
363        assert_eq!(s, "HELLO");
364        assert_eq!(consumed, 6);
365    }
366
367    #[rstest]
368    fn test_schema_validation() {
369        let header = MessageHeader {
370            block_length: 50,
371            template_id: 10001,
372            schema_id: 99, // Wrong schema
373            version: 0,
374        };
375        let err = header.validate_schema().unwrap_err();
376        assert_eq!(
377            err,
378            StreamDecodeError::SchemaMismatch {
379                expected: STREAM_SCHEMA_ID,
380                actual: 99
381            }
382        );
383    }
384}