nautilus_binance/common/sbe/stream/
depth_snapshot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Depth snapshot stream event decoder.
17//!
18//! Message layout (after 8-byte header):
19//! - eventTime: i64 (microseconds)
20//! - bookUpdateId: i64
21//! - priceExponent: i8
22//! - qtyExponent: i8
23//! - bids group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
24//!   - price: i64 (mantissa)
25//!   - qty: i64 (mantissa)
26//! - asks group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
27//!   - price: i64 (mantissa)
28//!   - qty: i64 (mantissa)
29//! - symbol: varString8
30
31use super::{
32    GroupSize16Encoding, MessageHeader, PriceLevel, StreamDecodeError, decode_var_string8, read_i8,
33    read_i64_le,
34};
35
36/// Depth snapshot stream event (top N levels of order book).
37#[derive(Debug, Clone)]
38pub struct DepthSnapshotStreamEvent {
39    /// Event timestamp in microseconds.
40    pub event_time_us: i64,
41    /// Book update ID for sequencing.
42    pub book_update_id: i64,
43    /// Price exponent (prices = mantissa * 10^exponent).
44    pub price_exponent: i8,
45    /// Quantity exponent (quantities = mantissa * 10^exponent).
46    pub qty_exponent: i8,
47    /// Bid levels (best bid first).
48    pub bids: Vec<PriceLevel>,
49    /// Ask levels (best ask first).
50    pub asks: Vec<PriceLevel>,
51    /// Trading symbol.
52    pub symbol: String,
53}
54
55impl DepthSnapshotStreamEvent {
56    /// Fixed block length (excluding header, groups, and variable-length data).
57    pub const BLOCK_LENGTH: usize = 18;
58
59    /// Decode from SBE buffer (including 8-byte header).
60    ///
61    /// # Errors
62    ///
63    /// Returns error if buffer is too short, group size exceeds limits,
64    /// or data is otherwise invalid.
65    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
66        let header = MessageHeader::decode(buf)?;
67        header.validate_schema()?;
68
69        let body = &buf[MessageHeader::ENCODED_LENGTH..];
70
71        let min_body_size = Self::BLOCK_LENGTH + GroupSize16Encoding::ENCODED_LENGTH;
72        if body.len() < min_body_size {
73            return Err(StreamDecodeError::BufferTooShort {
74                expected: MessageHeader::ENCODED_LENGTH + min_body_size,
75                actual: buf.len(),
76            });
77        }
78
79        let event_time_us = read_i64_le(body, 0)?;
80        let book_update_id = read_i64_le(body, 8)?;
81        let price_exponent = read_i8(body, 16)?;
82        let qty_exponent = read_i8(body, 17)?;
83
84        let mut offset = Self::BLOCK_LENGTH;
85
86        // Group size limit enforced inside GroupSize16Encoding::decode
87        let bids_group = GroupSize16Encoding::decode(&body[offset..])?;
88        let num_bids = bids_group.num_in_group as usize;
89        let bid_block_length = bids_group.block_length as usize;
90        offset += GroupSize16Encoding::ENCODED_LENGTH;
91
92        let bids_data_size = num_bids * bid_block_length;
93        if body.len() < offset + bids_data_size + GroupSize16Encoding::ENCODED_LENGTH {
94            return Err(StreamDecodeError::BufferTooShort {
95                expected: MessageHeader::ENCODED_LENGTH
96                    + offset
97                    + bids_data_size
98                    + GroupSize16Encoding::ENCODED_LENGTH,
99                actual: buf.len(),
100            });
101        }
102
103        let mut bids = Vec::with_capacity(num_bids);
104        for _ in 0..num_bids {
105            bids.push(PriceLevel::decode(&body[offset..])?);
106            offset += bid_block_length;
107        }
108
109        let asks_group = GroupSize16Encoding::decode(&body[offset..])?;
110        let num_asks = asks_group.num_in_group as usize;
111        let ask_block_length = asks_group.block_length as usize;
112        offset += GroupSize16Encoding::ENCODED_LENGTH;
113
114        let asks_data_size = num_asks * ask_block_length;
115        if body.len() < offset + asks_data_size + 1 {
116            return Err(StreamDecodeError::BufferTooShort {
117                expected: MessageHeader::ENCODED_LENGTH + offset + asks_data_size + 1,
118                actual: buf.len(),
119            });
120        }
121
122        let mut asks = Vec::with_capacity(num_asks);
123        for _ in 0..num_asks {
124            asks.push(PriceLevel::decode(&body[offset..])?);
125            offset += ask_block_length;
126        }
127
128        let (symbol, _) = decode_var_string8(&body[offset..])?;
129
130        Ok(Self {
131            event_time_us,
132            book_update_id,
133            price_exponent,
134            qty_exponent,
135            bids,
136            asks,
137            symbol,
138        })
139    }
140
141    /// Get price as f64 for a level.
142    #[inline]
143    #[must_use]
144    pub fn level_price(&self, level: &PriceLevel) -> f64 {
145        super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
146    }
147
148    /// Get quantity as f64 for a level.
149    #[inline]
150    #[must_use]
151    pub fn level_qty(&self, level: &PriceLevel) -> f64 {
152        super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use rstest::rstest;
159
160    use super::*;
161    use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
162
163    fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
164        let level_block_len = 16u16;
165        let body_size = 18
166            + 4
167            + (num_bids * level_block_len as usize)
168            + 4
169            + (num_asks * level_block_len as usize)
170            + 8;
171        let mut buf = vec![0u8; 8 + body_size];
172
173        // Header
174        buf[0..2].copy_from_slice(&18u16.to_le_bytes()); // block_length
175        buf[2..4].copy_from_slice(&template_id::DEPTH_SNAPSHOT_STREAM_EVENT.to_le_bytes());
176        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
177        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
178
179        // Body
180        let body = &mut buf[8..];
181        body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); // event_time_us
182        body[8..16].copy_from_slice(&12345i64.to_le_bytes()); // book_update_id
183        body[16] = (-2i8) as u8; // price_exponent
184        body[17] = (-8i8) as u8; // qty_exponent
185
186        let mut offset = 18;
187
188        // Bids group header
189        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
190        body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
191        offset += 4;
192
193        // Bids
194        for i in 0..num_bids {
195            body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
196            body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
197            offset += level_block_len as usize;
198        }
199
200        // Asks group header
201        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
202        body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
203        offset += 4;
204
205        // Asks
206        for i in 0..num_asks {
207            body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
208            body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
209            offset += level_block_len as usize;
210        }
211
212        // Symbol: "BTCUSDT"
213        body[offset] = 7;
214        body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
215
216        buf
217    }
218
219    #[rstest]
220    fn test_decode_valid() {
221        let buf = make_valid_buffer(5, 5);
222        let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
223
224        assert_eq!(event.event_time_us, 1000000);
225        assert_eq!(event.book_update_id, 12345);
226        assert_eq!(event.bids.len(), 5);
227        assert_eq!(event.asks.len(), 5);
228        assert_eq!(event.symbol, "BTCUSDT");
229    }
230
231    #[rstest]
232    fn test_decode_empty_books() {
233        let buf = make_valid_buffer(0, 0);
234        let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
235
236        assert!(event.bids.is_empty());
237        assert!(event.asks.is_empty());
238    }
239
240    #[rstest]
241    fn test_decode_truncated() {
242        let mut buf = make_valid_buffer(10, 10);
243        buf.truncate(100); // Truncate in the middle
244        let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
245        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
246    }
247
248    #[rstest]
249    fn test_decode_wrong_schema() {
250        let mut buf = make_valid_buffer(5, 5);
251        buf[4..6].copy_from_slice(&99u16.to_le_bytes());
252        let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
253        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
254    }
255}