nautilus_binance/common/sbe/stream/
depth_diff.rs1use ustr::Ustr;
33
34use super::{MessageHeader, PriceLevel, StreamDecodeError};
35use crate::common::sbe::cursor::SbeCursor;
36
37#[derive(Debug, Clone)]
39pub struct DepthDiffStreamEvent {
40 pub event_time_us: i64,
42 pub first_book_update_id: i64,
44 pub last_book_update_id: i64,
46 pub price_exponent: i8,
48 pub qty_exponent: i8,
50 pub bids: Vec<PriceLevel>,
52 pub asks: Vec<PriceLevel>,
54 pub symbol: Ustr,
56}
57
58impl DepthDiffStreamEvent {
59 pub const BLOCK_LENGTH: usize = 26;
61
62 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
69 let header = MessageHeader::decode(buf)?;
70 header.validate_schema()?;
71
72 let mut cursor = SbeCursor::new_at(buf, MessageHeader::ENCODED_LENGTH);
73
74 let event_time_us = cursor.read_i64_le()?;
75 let first_book_update_id = cursor.read_i64_le()?;
76 let last_book_update_id = cursor.read_i64_le()?;
77 let price_exponent = cursor.read_i8()?;
78 let qty_exponent = cursor.read_i8()?;
79
80 let (bid_block_length, num_bids) = cursor.read_group_header_16()?;
81 let bids = cursor.read_group(bid_block_length, u32::from(num_bids), PriceLevel::decode)?;
82
83 let (ask_block_length, num_asks) = cursor.read_group_header_16()?;
84 let asks = cursor.read_group(ask_block_length, u32::from(num_asks), PriceLevel::decode)?;
85
86 let symbol_str = cursor.read_var_string8()?;
87
88 Ok(Self {
89 event_time_us,
90 first_book_update_id,
91 last_book_update_id,
92 price_exponent,
93 qty_exponent,
94 bids,
95 asks,
96 symbol: Ustr::from(&symbol_str),
97 })
98 }
99
100 #[inline]
102 #[must_use]
103 pub fn level_price(&self, level: &PriceLevel) -> f64 {
104 super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
105 }
106
107 #[inline]
109 #[must_use]
110 pub fn level_qty(&self, level: &PriceLevel) -> f64 {
111 super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use rstest::rstest;
118
119 use super::*;
120 use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
121
122 fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
123 let level_block_len = 16u16;
124 let body_size = 26
125 + 4
126 + (num_bids * level_block_len as usize)
127 + 4
128 + (num_asks * level_block_len as usize)
129 + 8;
130 let mut buf = vec![0u8; 8 + body_size];
131
132 buf[0..2].copy_from_slice(&26u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::DEPTH_DIFF_STREAM_EVENT.to_le_bytes());
135 buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
136 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let body = &mut buf[8..];
140 body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&12345i64.to_le_bytes()); body[16..24].copy_from_slice(&12350i64.to_le_bytes()); body[24] = (-2i8) as u8; body[25] = (-8i8) as u8; let mut offset = 26;
147
148 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
150 body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
151 offset += 4;
152
153 for i in 0..num_bids {
155 body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
156 body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
157 offset += level_block_len as usize;
158 }
159
160 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
162 body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
163 offset += 4;
164
165 for i in 0..num_asks {
167 body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
168 body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
169 offset += level_block_len as usize;
170 }
171
172 body[offset] = 7;
174 body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
175
176 buf
177 }
178
179 #[rstest]
180 fn test_decode_valid() {
181 let buf = make_valid_buffer(3, 2);
182 let event = DepthDiffStreamEvent::decode(&buf).unwrap();
183
184 assert_eq!(event.event_time_us, 1000000);
185 assert_eq!(event.first_book_update_id, 12345);
186 assert_eq!(event.last_book_update_id, 12350);
187 assert_eq!(event.bids.len(), 3);
188 assert_eq!(event.asks.len(), 2);
189 assert_eq!(event.symbol, "BTCUSDT");
190 }
191
192 #[rstest]
193 fn test_decode_empty_updates() {
194 let buf = make_valid_buffer(0, 0);
195 let event = DepthDiffStreamEvent::decode(&buf).unwrap();
196
197 assert!(event.bids.is_empty());
198 assert!(event.asks.is_empty());
199 }
200
201 #[rstest]
202 fn test_decode_truncated() {
203 let mut buf = make_valid_buffer(5, 5);
204 buf.truncate(60); let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
206 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
207 }
208
209 #[rstest]
210 fn test_decode_wrong_schema() {
211 let mut buf = make_valid_buffer(3, 2);
212 buf[4..6].copy_from_slice(&99u16.to_le_bytes());
213 let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
214 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
215 }
216}