nautilus_binance/common/sbe/stream/
depth_snapshot.rs1use super::{
32 GroupSize16Encoding, MessageHeader, PriceLevel, StreamDecodeError, decode_var_string8, read_i8,
33 read_i64_le,
34};
35
36#[derive(Debug, Clone)]
38pub struct DepthSnapshotStreamEvent {
39 pub event_time_us: i64,
41 pub book_update_id: i64,
43 pub price_exponent: i8,
45 pub qty_exponent: i8,
47 pub bids: Vec<PriceLevel>,
49 pub asks: Vec<PriceLevel>,
51 pub symbol: String,
53}
54
55impl DepthSnapshotStreamEvent {
56 pub const BLOCK_LENGTH: usize = 18;
58
59 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
66 let header = MessageHeader::decode(buf)?;
67 header.validate_schema()?;
68
69 let body = &buf[MessageHeader::ENCODED_LENGTH..];
70
71 let min_body_size = Self::BLOCK_LENGTH + GroupSize16Encoding::ENCODED_LENGTH;
72 if body.len() < min_body_size {
73 return Err(StreamDecodeError::BufferTooShort {
74 expected: MessageHeader::ENCODED_LENGTH + min_body_size,
75 actual: buf.len(),
76 });
77 }
78
79 let event_time_us = read_i64_le(body, 0)?;
80 let book_update_id = read_i64_le(body, 8)?;
81 let price_exponent = read_i8(body, 16)?;
82 let qty_exponent = read_i8(body, 17)?;
83
84 let mut offset = Self::BLOCK_LENGTH;
85
86 let bids_group = GroupSize16Encoding::decode(&body[offset..])?;
88 let num_bids = bids_group.num_in_group as usize;
89 let bid_block_length = bids_group.block_length as usize;
90 offset += GroupSize16Encoding::ENCODED_LENGTH;
91
92 let bids_data_size = num_bids * bid_block_length;
93 if body.len() < offset + bids_data_size + GroupSize16Encoding::ENCODED_LENGTH {
94 return Err(StreamDecodeError::BufferTooShort {
95 expected: MessageHeader::ENCODED_LENGTH
96 + offset
97 + bids_data_size
98 + GroupSize16Encoding::ENCODED_LENGTH,
99 actual: buf.len(),
100 });
101 }
102
103 let mut bids = Vec::with_capacity(num_bids);
104 for _ in 0..num_bids {
105 bids.push(PriceLevel::decode(&body[offset..])?);
106 offset += bid_block_length;
107 }
108
109 let asks_group = GroupSize16Encoding::decode(&body[offset..])?;
110 let num_asks = asks_group.num_in_group as usize;
111 let ask_block_length = asks_group.block_length as usize;
112 offset += GroupSize16Encoding::ENCODED_LENGTH;
113
114 let asks_data_size = num_asks * ask_block_length;
115 if body.len() < offset + asks_data_size + 1 {
116 return Err(StreamDecodeError::BufferTooShort {
117 expected: MessageHeader::ENCODED_LENGTH + offset + asks_data_size + 1,
118 actual: buf.len(),
119 });
120 }
121
122 let mut asks = Vec::with_capacity(num_asks);
123 for _ in 0..num_asks {
124 asks.push(PriceLevel::decode(&body[offset..])?);
125 offset += ask_block_length;
126 }
127
128 let (symbol, _) = decode_var_string8(&body[offset..])?;
129
130 Ok(Self {
131 event_time_us,
132 book_update_id,
133 price_exponent,
134 qty_exponent,
135 bids,
136 asks,
137 symbol,
138 })
139 }
140
141 #[inline]
143 #[must_use]
144 pub fn level_price(&self, level: &PriceLevel) -> f64 {
145 super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
146 }
147
148 #[inline]
150 #[must_use]
151 pub fn level_qty(&self, level: &PriceLevel) -> f64 {
152 super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use rstest::rstest;
159
160 use super::*;
161 use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
162
163 fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
164 let level_block_len = 16u16;
165 let body_size = 18
166 + 4
167 + (num_bids * level_block_len as usize)
168 + 4
169 + (num_asks * level_block_len as usize)
170 + 8;
171 let mut buf = vec![0u8; 8 + body_size];
172
173 buf[0..2].copy_from_slice(&18u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::DEPTH_SNAPSHOT_STREAM_EVENT.to_le_bytes());
176 buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
177 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let body = &mut buf[8..];
181 body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&12345i64.to_le_bytes()); body[16] = (-2i8) as u8; body[17] = (-8i8) as u8; let mut offset = 18;
187
188 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
190 body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
191 offset += 4;
192
193 for i in 0..num_bids {
195 body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
196 body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
197 offset += level_block_len as usize;
198 }
199
200 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
202 body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
203 offset += 4;
204
205 for i in 0..num_asks {
207 body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
208 body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
209 offset += level_block_len as usize;
210 }
211
212 body[offset] = 7;
214 body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
215
216 buf
217 }
218
219 #[rstest]
220 fn test_decode_valid() {
221 let buf = make_valid_buffer(5, 5);
222 let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
223
224 assert_eq!(event.event_time_us, 1000000);
225 assert_eq!(event.book_update_id, 12345);
226 assert_eq!(event.bids.len(), 5);
227 assert_eq!(event.asks.len(), 5);
228 assert_eq!(event.symbol, "BTCUSDT");
229 }
230
231 #[rstest]
232 fn test_decode_empty_books() {
233 let buf = make_valid_buffer(0, 0);
234 let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
235
236 assert!(event.bids.is_empty());
237 assert!(event.asks.is_empty());
238 }
239
240 #[rstest]
241 fn test_decode_truncated() {
242 let mut buf = make_valid_buffer(10, 10);
243 buf.truncate(100); let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
245 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
246 }
247
248 #[rstest]
249 fn test_decode_wrong_schema() {
250 let mut buf = make_valid_buffer(5, 5);
251 buf[4..6].copy_from_slice(&99u16.to_le_bytes());
252 let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
253 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
254 }
255}