nautilus_binance/common/sbe/stream/
depth_diff.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Depth diff stream event decoder.
17//!
18//! Message layout (after 8-byte header):
19//! - eventTime: i64 (microseconds)
20//! - firstBookUpdateId: i64
21//! - lastBookUpdateId: i64
22//! - priceExponent: i8
23//! - qtyExponent: i8
24//! - bids group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
25//!   - price: i64 (mantissa)
26//!   - qty: i64 (mantissa)
27//! - asks group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
28//!   - price: i64 (mantissa)
29//!   - qty: i64 (mantissa)
30//! - symbol: varString8
31
32use super::{
33    GroupSize16Encoding, MessageHeader, PriceLevel, StreamDecodeError, decode_var_string8, read_i8,
34    read_i64_le,
35};
36
37/// Depth diff stream event (incremental order book updates).
38#[derive(Debug, Clone)]
39pub struct DepthDiffStreamEvent {
40    /// Event timestamp in microseconds.
41    pub event_time_us: i64,
42    /// First book update ID in this diff.
43    pub first_book_update_id: i64,
44    /// Last book update ID in this diff.
45    pub last_book_update_id: i64,
46    /// Price exponent (prices = mantissa * 10^exponent).
47    pub price_exponent: i8,
48    /// Quantity exponent (quantities = mantissa * 10^exponent).
49    pub qty_exponent: i8,
50    /// Bid level updates (qty=0 means remove level).
51    pub bids: Vec<PriceLevel>,
52    /// Ask level updates (qty=0 means remove level).
53    pub asks: Vec<PriceLevel>,
54    /// Trading symbol.
55    pub symbol: String,
56}
57
58impl DepthDiffStreamEvent {
59    /// Fixed block length (excluding header, groups, and variable-length data).
60    pub const BLOCK_LENGTH: usize = 26;
61
62    /// Decode from SBE buffer (including 8-byte header).
63    ///
64    /// # Errors
65    ///
66    /// Returns error if buffer is too short, group size exceeds limits,
67    /// or data is otherwise invalid.
68    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
69        let header = MessageHeader::decode(buf)?;
70        header.validate_schema()?;
71
72        let body = &buf[MessageHeader::ENCODED_LENGTH..];
73
74        let min_body_size = Self::BLOCK_LENGTH + GroupSize16Encoding::ENCODED_LENGTH;
75        if body.len() < min_body_size {
76            return Err(StreamDecodeError::BufferTooShort {
77                expected: MessageHeader::ENCODED_LENGTH + min_body_size,
78                actual: buf.len(),
79            });
80        }
81
82        let event_time_us = read_i64_le(body, 0)?;
83        let first_book_update_id = read_i64_le(body, 8)?;
84        let last_book_update_id = read_i64_le(body, 16)?;
85        let price_exponent = read_i8(body, 24)?;
86        let qty_exponent = read_i8(body, 25)?;
87
88        let mut offset = Self::BLOCK_LENGTH;
89
90        // Group size limit enforced inside GroupSize16Encoding::decode
91        let bids_group = GroupSize16Encoding::decode(&body[offset..])?;
92        let num_bids = bids_group.num_in_group as usize;
93        let bid_block_length = bids_group.block_length as usize;
94        offset += GroupSize16Encoding::ENCODED_LENGTH;
95
96        let bids_data_size = num_bids * bid_block_length;
97        if body.len() < offset + bids_data_size + GroupSize16Encoding::ENCODED_LENGTH {
98            return Err(StreamDecodeError::BufferTooShort {
99                expected: MessageHeader::ENCODED_LENGTH
100                    + offset
101                    + bids_data_size
102                    + GroupSize16Encoding::ENCODED_LENGTH,
103                actual: buf.len(),
104            });
105        }
106
107        let mut bids = Vec::with_capacity(num_bids);
108        for _ in 0..num_bids {
109            bids.push(PriceLevel::decode(&body[offset..])?);
110            offset += bid_block_length;
111        }
112
113        let asks_group = GroupSize16Encoding::decode(&body[offset..])?;
114        let num_asks = asks_group.num_in_group as usize;
115        let ask_block_length = asks_group.block_length as usize;
116        offset += GroupSize16Encoding::ENCODED_LENGTH;
117
118        let asks_data_size = num_asks * ask_block_length;
119        if body.len() < offset + asks_data_size + 1 {
120            return Err(StreamDecodeError::BufferTooShort {
121                expected: MessageHeader::ENCODED_LENGTH + offset + asks_data_size + 1,
122                actual: buf.len(),
123            });
124        }
125
126        let mut asks = Vec::with_capacity(num_asks);
127        for _ in 0..num_asks {
128            asks.push(PriceLevel::decode(&body[offset..])?);
129            offset += ask_block_length;
130        }
131
132        let (symbol, _) = decode_var_string8(&body[offset..])?;
133
134        Ok(Self {
135            event_time_us,
136            first_book_update_id,
137            last_book_update_id,
138            price_exponent,
139            qty_exponent,
140            bids,
141            asks,
142            symbol,
143        })
144    }
145
146    /// Get price as f64 for a level.
147    #[inline]
148    #[must_use]
149    pub fn level_price(&self, level: &PriceLevel) -> f64 {
150        super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
151    }
152
153    /// Get quantity as f64 for a level.
154    #[inline]
155    #[must_use]
156    pub fn level_qty(&self, level: &PriceLevel) -> f64 {
157        super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use rstest::rstest;
164
165    use super::*;
166    use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
167
168    fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
169        let level_block_len = 16u16;
170        let body_size = 26
171            + 4
172            + (num_bids * level_block_len as usize)
173            + 4
174            + (num_asks * level_block_len as usize)
175            + 8;
176        let mut buf = vec![0u8; 8 + body_size];
177
178        // Header
179        buf[0..2].copy_from_slice(&26u16.to_le_bytes()); // block_length
180        buf[2..4].copy_from_slice(&template_id::DEPTH_DIFF_STREAM_EVENT.to_le_bytes());
181        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
182        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
183
184        // Body
185        let body = &mut buf[8..];
186        body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); // event_time_us
187        body[8..16].copy_from_slice(&12345i64.to_le_bytes()); // first_book_update_id
188        body[16..24].copy_from_slice(&12350i64.to_le_bytes()); // last_book_update_id
189        body[24] = (-2i8) as u8; // price_exponent
190        body[25] = (-8i8) as u8; // qty_exponent
191
192        let mut offset = 26;
193
194        // Bids group header
195        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
196        body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
197        offset += 4;
198
199        // Bids
200        for i in 0..num_bids {
201            body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
202            body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
203            offset += level_block_len as usize;
204        }
205
206        // Asks group header
207        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
208        body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
209        offset += 4;
210
211        // Asks
212        for i in 0..num_asks {
213            body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
214            body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
215            offset += level_block_len as usize;
216        }
217
218        // Symbol: "BTCUSDT"
219        body[offset] = 7;
220        body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
221
222        buf
223    }
224
225    #[rstest]
226    fn test_decode_valid() {
227        let buf = make_valid_buffer(3, 2);
228        let event = DepthDiffStreamEvent::decode(&buf).unwrap();
229
230        assert_eq!(event.event_time_us, 1000000);
231        assert_eq!(event.first_book_update_id, 12345);
232        assert_eq!(event.last_book_update_id, 12350);
233        assert_eq!(event.bids.len(), 3);
234        assert_eq!(event.asks.len(), 2);
235        assert_eq!(event.symbol, "BTCUSDT");
236    }
237
238    #[rstest]
239    fn test_decode_empty_updates() {
240        let buf = make_valid_buffer(0, 0);
241        let event = DepthDiffStreamEvent::decode(&buf).unwrap();
242
243        assert!(event.bids.is_empty());
244        assert!(event.asks.is_empty());
245    }
246
247    #[rstest]
248    fn test_decode_truncated() {
249        let mut buf = make_valid_buffer(5, 5);
250        buf.truncate(60); // Truncate in the middle
251        let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
252        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
253    }
254
255    #[rstest]
256    fn test_decode_wrong_schema() {
257        let mut buf = make_valid_buffer(3, 2);
258        buf[4..6].copy_from_slice(&99u16.to_le_bytes());
259        let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
260        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
261    }
262}