nautilus_binance/common/sbe/stream/
depth_diff.rs1use super::{
33 GroupSize16Encoding, MessageHeader, PriceLevel, StreamDecodeError, decode_var_string8, read_i8,
34 read_i64_le,
35};
36
37#[derive(Debug, Clone)]
39pub struct DepthDiffStreamEvent {
40 pub event_time_us: i64,
42 pub first_book_update_id: i64,
44 pub last_book_update_id: i64,
46 pub price_exponent: i8,
48 pub qty_exponent: i8,
50 pub bids: Vec<PriceLevel>,
52 pub asks: Vec<PriceLevel>,
54 pub symbol: String,
56}
57
58impl DepthDiffStreamEvent {
59 pub const BLOCK_LENGTH: usize = 26;
61
62 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
69 let header = MessageHeader::decode(buf)?;
70 header.validate_schema()?;
71
72 let body = &buf[MessageHeader::ENCODED_LENGTH..];
73
74 let min_body_size = Self::BLOCK_LENGTH + GroupSize16Encoding::ENCODED_LENGTH;
75 if body.len() < min_body_size {
76 return Err(StreamDecodeError::BufferTooShort {
77 expected: MessageHeader::ENCODED_LENGTH + min_body_size,
78 actual: buf.len(),
79 });
80 }
81
82 let event_time_us = read_i64_le(body, 0)?;
83 let first_book_update_id = read_i64_le(body, 8)?;
84 let last_book_update_id = read_i64_le(body, 16)?;
85 let price_exponent = read_i8(body, 24)?;
86 let qty_exponent = read_i8(body, 25)?;
87
88 let mut offset = Self::BLOCK_LENGTH;
89
90 let bids_group = GroupSize16Encoding::decode(&body[offset..])?;
92 let num_bids = bids_group.num_in_group as usize;
93 let bid_block_length = bids_group.block_length as usize;
94 offset += GroupSize16Encoding::ENCODED_LENGTH;
95
96 let bids_data_size = num_bids * bid_block_length;
97 if body.len() < offset + bids_data_size + GroupSize16Encoding::ENCODED_LENGTH {
98 return Err(StreamDecodeError::BufferTooShort {
99 expected: MessageHeader::ENCODED_LENGTH
100 + offset
101 + bids_data_size
102 + GroupSize16Encoding::ENCODED_LENGTH,
103 actual: buf.len(),
104 });
105 }
106
107 let mut bids = Vec::with_capacity(num_bids);
108 for _ in 0..num_bids {
109 bids.push(PriceLevel::decode(&body[offset..])?);
110 offset += bid_block_length;
111 }
112
113 let asks_group = GroupSize16Encoding::decode(&body[offset..])?;
114 let num_asks = asks_group.num_in_group as usize;
115 let ask_block_length = asks_group.block_length as usize;
116 offset += GroupSize16Encoding::ENCODED_LENGTH;
117
118 let asks_data_size = num_asks * ask_block_length;
119 if body.len() < offset + asks_data_size + 1 {
120 return Err(StreamDecodeError::BufferTooShort {
121 expected: MessageHeader::ENCODED_LENGTH + offset + asks_data_size + 1,
122 actual: buf.len(),
123 });
124 }
125
126 let mut asks = Vec::with_capacity(num_asks);
127 for _ in 0..num_asks {
128 asks.push(PriceLevel::decode(&body[offset..])?);
129 offset += ask_block_length;
130 }
131
132 let (symbol, _) = decode_var_string8(&body[offset..])?;
133
134 Ok(Self {
135 event_time_us,
136 first_book_update_id,
137 last_book_update_id,
138 price_exponent,
139 qty_exponent,
140 bids,
141 asks,
142 symbol,
143 })
144 }
145
146 #[inline]
148 #[must_use]
149 pub fn level_price(&self, level: &PriceLevel) -> f64 {
150 super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
151 }
152
153 #[inline]
155 #[must_use]
156 pub fn level_qty(&self, level: &PriceLevel) -> f64 {
157 super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use rstest::rstest;
164
165 use super::*;
166 use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
167
168 fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
169 let level_block_len = 16u16;
170 let body_size = 26
171 + 4
172 + (num_bids * level_block_len as usize)
173 + 4
174 + (num_asks * level_block_len as usize)
175 + 8;
176 let mut buf = vec![0u8; 8 + body_size];
177
178 buf[0..2].copy_from_slice(&26u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::DEPTH_DIFF_STREAM_EVENT.to_le_bytes());
181 buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
182 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let body = &mut buf[8..];
186 body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&12345i64.to_le_bytes()); body[16..24].copy_from_slice(&12350i64.to_le_bytes()); body[24] = (-2i8) as u8; body[25] = (-8i8) as u8; let mut offset = 26;
193
194 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
196 body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
197 offset += 4;
198
199 for i in 0..num_bids {
201 body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
202 body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
203 offset += level_block_len as usize;
204 }
205
206 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
208 body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
209 offset += 4;
210
211 for i in 0..num_asks {
213 body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
214 body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
215 offset += level_block_len as usize;
216 }
217
218 body[offset] = 7;
220 body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
221
222 buf
223 }
224
225 #[rstest]
226 fn test_decode_valid() {
227 let buf = make_valid_buffer(3, 2);
228 let event = DepthDiffStreamEvent::decode(&buf).unwrap();
229
230 assert_eq!(event.event_time_us, 1000000);
231 assert_eq!(event.first_book_update_id, 12345);
232 assert_eq!(event.last_book_update_id, 12350);
233 assert_eq!(event.bids.len(), 3);
234 assert_eq!(event.asks.len(), 2);
235 assert_eq!(event.symbol, "BTCUSDT");
236 }
237
238 #[rstest]
239 fn test_decode_empty_updates() {
240 let buf = make_valid_buffer(0, 0);
241 let event = DepthDiffStreamEvent::decode(&buf).unwrap();
242
243 assert!(event.bids.is_empty());
244 assert!(event.asks.is_empty());
245 }
246
247 #[rstest]
248 fn test_decode_truncated() {
249 let mut buf = make_valid_buffer(5, 5);
250 buf.truncate(60); let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
252 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
253 }
254
255 #[rstest]
256 fn test_decode_wrong_schema() {
257 let mut buf = make_valid_buffer(3, 2);
258 buf[4..6].copy_from_slice(&99u16.to_le_bytes());
259 let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
260 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
261 }
262}