1use super::{
22 error::SbeDecodeError,
23 models::{BinanceDepth, BinancePriceLevel, BinanceTrade, BinanceTrades},
24};
25use crate::common::sbe::spot::{
26 SBE_SCHEMA_ID, SBE_SCHEMA_VERSION, bool_enum::BoolEnum,
27 depth_response_codec::SBE_TEMPLATE_ID as DEPTH_TEMPLATE_ID,
28 message_header_codec::ENCODED_LENGTH as HEADER_LENGTH,
29 ping_response_codec::SBE_TEMPLATE_ID as PING_TEMPLATE_ID,
30 server_time_response_codec::SBE_TEMPLATE_ID as SERVER_TIME_TEMPLATE_ID,
31 trades_response_codec::SBE_TEMPLATE_ID as TRADES_TEMPLATE_ID,
32};
33
34const GROUP_SIZE_LENGTH: usize = 6;
36
37const MAX_GROUP_SIZE: u32 = 10_000;
39
40#[derive(Debug, Clone, Copy)]
42struct MessageHeader {
43 #[allow(dead_code)]
44 block_length: u16,
45 template_id: u16,
46 schema_id: u16,
47 version: u16,
48}
49
50impl MessageHeader {
51 fn decode(buf: &[u8]) -> Result<Self, SbeDecodeError> {
53 if buf.len() < HEADER_LENGTH {
54 return Err(SbeDecodeError::BufferTooShort {
55 expected: HEADER_LENGTH,
56 actual: buf.len(),
57 });
58 }
59 Ok(Self {
60 block_length: u16::from_le_bytes([buf[0], buf[1]]),
61 template_id: u16::from_le_bytes([buf[2], buf[3]]),
62 schema_id: u16::from_le_bytes([buf[4], buf[5]]),
63 version: u16::from_le_bytes([buf[6], buf[7]]),
64 })
65 }
66
67 fn validate(&self) -> Result<(), SbeDecodeError> {
69 if self.schema_id != SBE_SCHEMA_ID {
70 return Err(SbeDecodeError::SchemaMismatch {
71 expected: SBE_SCHEMA_ID,
72 actual: self.schema_id,
73 });
74 }
75 if self.version != SBE_SCHEMA_VERSION {
76 return Err(SbeDecodeError::VersionMismatch {
77 expected: SBE_SCHEMA_VERSION,
78 actual: self.version,
79 });
80 }
81 Ok(())
82 }
83}
84
85pub fn decode_ping(buf: &[u8]) -> Result<(), SbeDecodeError> {
93 let header = MessageHeader::decode(buf)?;
94 header.validate()?;
95
96 if header.template_id != PING_TEMPLATE_ID {
97 return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
98 }
99
100 Ok(())
101}
102
103pub fn decode_server_time(buf: &[u8]) -> Result<i64, SbeDecodeError> {
116 let header = MessageHeader::decode(buf)?;
117 header.validate()?;
118
119 if header.template_id != SERVER_TIME_TEMPLATE_ID {
120 return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
121 }
122
123 let body_start = HEADER_LENGTH;
124 let body_end = body_start + 8;
125
126 if buf.len() < body_end {
127 return Err(SbeDecodeError::BufferTooShort {
128 expected: body_end,
129 actual: buf.len(),
130 });
131 }
132
133 let server_time = i64::from_le_bytes(buf[body_start..body_end].try_into().expect("slice len"));
135 Ok(server_time)
136}
137
138pub fn decode_depth(buf: &[u8]) -> Result<BinanceDepth, SbeDecodeError> {
150 let header = MessageHeader::decode(buf)?;
151 header.validate()?;
152
153 if header.template_id != DEPTH_TEMPLATE_ID {
154 return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
155 }
156
157 let block_start = HEADER_LENGTH;
159 let block_end = block_start + 10;
160
161 if buf.len() < block_end {
162 return Err(SbeDecodeError::BufferTooShort {
163 expected: block_end,
164 actual: buf.len(),
165 });
166 }
167
168 let last_update_id = i64::from_le_bytes(
170 buf[block_start..block_start + 8]
171 .try_into()
172 .expect("slice len"),
173 );
174 let price_exponent = buf[block_start + 8] as i8;
175 let qty_exponent = buf[block_start + 9] as i8;
176
177 let (bids, bids_end) = decode_price_levels(&buf[block_end..])?;
178 let (asks, _asks_end) = decode_price_levels(&buf[block_end + bids_end..])?;
179
180 Ok(BinanceDepth {
181 last_update_id,
182 price_exponent,
183 qty_exponent,
184 bids,
185 asks,
186 })
187}
188
189pub fn decode_trades(buf: &[u8]) -> Result<BinanceTrades, SbeDecodeError> {
197 let header = MessageHeader::decode(buf)?;
198 header.validate()?;
199
200 if header.template_id != TRADES_TEMPLATE_ID {
201 return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
202 }
203
204 let block_start = HEADER_LENGTH;
206 let block_end = block_start + 2;
207
208 if buf.len() < block_end {
209 return Err(SbeDecodeError::BufferTooShort {
210 expected: block_end,
211 actual: buf.len(),
212 });
213 }
214
215 let price_exponent = buf[block_start] as i8;
216 let qty_exponent = buf[block_start + 1] as i8;
217
218 let trades = decode_trades_group(&buf[block_end..])?;
219
220 Ok(BinanceTrades {
221 price_exponent,
222 qty_exponent,
223 trades,
224 })
225}
226
227fn decode_price_levels(buf: &[u8]) -> Result<(Vec<BinancePriceLevel>, usize), SbeDecodeError> {
231 if buf.len() < GROUP_SIZE_LENGTH {
232 return Err(SbeDecodeError::BufferTooShort {
233 expected: GROUP_SIZE_LENGTH,
234 actual: buf.len(),
235 });
236 }
237
238 let _block_length = u16::from_le_bytes([buf[0], buf[1]]);
239 let count = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
240
241 if count > MAX_GROUP_SIZE {
242 return Err(SbeDecodeError::GroupSizeTooLarge {
243 count,
244 max: MAX_GROUP_SIZE,
245 });
246 }
247
248 let level_size = 16; let total_size = GROUP_SIZE_LENGTH + (count as usize * level_size);
250
251 if buf.len() < total_size {
252 return Err(SbeDecodeError::BufferTooShort {
253 expected: total_size,
254 actual: buf.len(),
255 });
256 }
257
258 let mut levels = Vec::with_capacity(count as usize);
259 let mut offset = GROUP_SIZE_LENGTH;
260
261 for _ in 0..count {
262 let price_mantissa = i64::from_le_bytes(buf[offset..offset + 8].try_into().unwrap());
263 let qty_mantissa = i64::from_le_bytes(buf[offset + 8..offset + 16].try_into().unwrap());
264
265 levels.push(BinancePriceLevel {
266 price_mantissa,
267 qty_mantissa,
268 });
269
270 offset += level_size;
271 }
272
273 Ok((levels, total_size))
274}
275
276fn decode_trades_group(buf: &[u8]) -> Result<Vec<BinanceTrade>, SbeDecodeError> {
278 if buf.len() < GROUP_SIZE_LENGTH {
279 return Err(SbeDecodeError::BufferTooShort {
280 expected: GROUP_SIZE_LENGTH,
281 actual: buf.len(),
282 });
283 }
284
285 let _block_length = u16::from_le_bytes([buf[0], buf[1]]);
286 let count = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
287
288 if count > MAX_GROUP_SIZE {
289 return Err(SbeDecodeError::GroupSizeTooLarge {
290 count,
291 max: MAX_GROUP_SIZE,
292 });
293 }
294
295 let trade_size = 42;
297 let total_size = GROUP_SIZE_LENGTH + (count as usize * trade_size);
298
299 if buf.len() < total_size {
300 return Err(SbeDecodeError::BufferTooShort {
301 expected: total_size,
302 actual: buf.len(),
303 });
304 }
305
306 let mut trades = Vec::with_capacity(count as usize);
307 let mut offset = GROUP_SIZE_LENGTH;
308
309 for _ in 0..count {
310 let id = i64::from_le_bytes(buf[offset..offset + 8].try_into().unwrap());
311 let price_mantissa = i64::from_le_bytes(buf[offset + 8..offset + 16].try_into().unwrap());
312 let qty_mantissa = i64::from_le_bytes(buf[offset + 16..offset + 24].try_into().unwrap());
313 let quote_qty_mantissa =
314 i64::from_le_bytes(buf[offset + 24..offset + 32].try_into().unwrap());
315 let time = i64::from_le_bytes(buf[offset + 32..offset + 40].try_into().unwrap());
316 let is_buyer_maker = BoolEnum::from(buf[offset + 40]) == BoolEnum::True;
317 let is_best_match = BoolEnum::from(buf[offset + 41]) == BoolEnum::True;
318
319 trades.push(BinanceTrade {
320 id,
321 price_mantissa,
322 qty_mantissa,
323 quote_qty_mantissa,
324 time,
325 is_buyer_maker,
326 is_best_match,
327 });
328
329 offset += trade_size;
330 }
331
332 Ok(trades)
333}
334
335#[cfg(test)]
336mod tests {
337 use rstest::rstest;
338
339 use super::*;
340
341 fn create_header(block_length: u16, template_id: u16, schema_id: u16, version: u16) -> [u8; 8] {
342 let mut buf = [0u8; 8];
343 buf[0..2].copy_from_slice(&block_length.to_le_bytes());
344 buf[2..4].copy_from_slice(&template_id.to_le_bytes());
345 buf[4..6].copy_from_slice(&schema_id.to_le_bytes());
346 buf[6..8].copy_from_slice(&version.to_le_bytes());
347 buf
348 }
349
350 #[rstest]
351 fn test_decode_ping_valid() {
352 let buf = create_header(0, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
354 assert!(decode_ping(&buf).is_ok());
355 }
356
357 #[rstest]
358 fn test_decode_ping_buffer_too_short() {
359 let buf = [0u8; 4];
360 let err = decode_ping(&buf).unwrap_err();
361 assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
362 }
363
364 #[rstest]
365 fn test_decode_ping_schema_mismatch() {
366 let buf = create_header(0, PING_TEMPLATE_ID, 99, SBE_SCHEMA_VERSION);
367 let err = decode_ping(&buf).unwrap_err();
368 assert!(matches!(err, SbeDecodeError::SchemaMismatch { .. }));
369 }
370
371 #[rstest]
372 fn test_decode_ping_wrong_template() {
373 let buf = create_header(0, 999, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
374 let err = decode_ping(&buf).unwrap_err();
375 assert!(matches!(err, SbeDecodeError::UnknownTemplateId(999)));
376 }
377
378 #[rstest]
379 fn test_decode_server_time_valid() {
380 let header = create_header(
382 8,
383 SERVER_TIME_TEMPLATE_ID,
384 SBE_SCHEMA_ID,
385 SBE_SCHEMA_VERSION,
386 );
387 let timestamp: i64 = 1734300000000; let mut buf = Vec::with_capacity(16);
390 buf.extend_from_slice(&header);
391 buf.extend_from_slice(×tamp.to_le_bytes());
392
393 let result = decode_server_time(&buf).unwrap();
394 assert_eq!(result, timestamp);
395 }
396
397 #[rstest]
398 fn test_decode_server_time_buffer_too_short() {
399 let buf = create_header(
401 8,
402 SERVER_TIME_TEMPLATE_ID,
403 SBE_SCHEMA_ID,
404 SBE_SCHEMA_VERSION,
405 );
406 let err = decode_server_time(&buf).unwrap_err();
407 assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
408 }
409
410 #[rstest]
411 fn test_decode_server_time_wrong_template() {
412 let header = create_header(8, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
413 let mut buf = Vec::with_capacity(16);
414 buf.extend_from_slice(&header);
415 buf.extend_from_slice(&0i64.to_le_bytes());
416
417 let err = decode_server_time(&buf).unwrap_err();
418 assert!(matches!(err, SbeDecodeError::UnknownTemplateId(101)));
419 }
420
421 #[rstest]
422 fn test_decode_server_time_version_mismatch() {
423 let header = create_header(8, SERVER_TIME_TEMPLATE_ID, SBE_SCHEMA_ID, 99);
424 let mut buf = Vec::with_capacity(16);
425 buf.extend_from_slice(&header);
426 buf.extend_from_slice(&0i64.to_le_bytes());
427
428 let err = decode_server_time(&buf).unwrap_err();
429 assert!(matches!(err, SbeDecodeError::VersionMismatch { .. }));
430 }
431
432 fn create_group_header(block_length: u16, count: u32) -> [u8; 6] {
433 let mut buf = [0u8; 6];
434 buf[0..2].copy_from_slice(&block_length.to_le_bytes());
435 buf[2..6].copy_from_slice(&count.to_le_bytes());
436 buf
437 }
438
439 #[rstest]
440 fn test_decode_depth_valid() {
441 let header = create_header(10, DEPTH_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
443
444 let mut buf = Vec::new();
445 buf.extend_from_slice(&header);
446
447 let last_update_id: i64 = 123456789;
449 let price_exponent: i8 = -8;
450 let qty_exponent: i8 = -8;
451 buf.extend_from_slice(&last_update_id.to_le_bytes());
452 buf.push(price_exponent as u8);
453 buf.push(qty_exponent as u8);
454
455 buf.extend_from_slice(&create_group_header(16, 2));
457 buf.extend_from_slice(&100_000_000_000i64.to_le_bytes());
459 buf.extend_from_slice(&50_000_000i64.to_le_bytes());
460 buf.extend_from_slice(&99_900_000_000i64.to_le_bytes());
462 buf.extend_from_slice(&30_000_000i64.to_le_bytes());
463
464 buf.extend_from_slice(&create_group_header(16, 1));
466 buf.extend_from_slice(&100_100_000_000i64.to_le_bytes());
468 buf.extend_from_slice(&25_000_000i64.to_le_bytes());
469
470 let depth = decode_depth(&buf).unwrap();
471
472 assert_eq!(depth.last_update_id, 123456789);
473 assert_eq!(depth.price_exponent, -8);
474 assert_eq!(depth.qty_exponent, -8);
475 assert_eq!(depth.bids.len(), 2);
476 assert_eq!(depth.asks.len(), 1);
477 assert_eq!(depth.bids[0].price_mantissa, 100_000_000_000);
478 assert_eq!(depth.bids[0].qty_mantissa, 50_000_000);
479 assert_eq!(depth.asks[0].price_mantissa, 100_100_000_000);
480 }
481
482 #[rstest]
483 fn test_decode_depth_empty_book() {
484 let header = create_header(10, DEPTH_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
485
486 let mut buf = Vec::new();
487 buf.extend_from_slice(&header);
488 buf.extend_from_slice(&0i64.to_le_bytes()); buf.push(0); buf.push(0); buf.extend_from_slice(&create_group_header(16, 0));
494 buf.extend_from_slice(&create_group_header(16, 0));
496
497 let depth = decode_depth(&buf).unwrap();
498
499 assert!(depth.bids.is_empty());
500 assert!(depth.asks.is_empty());
501 }
502
503 #[rstest]
504 fn test_decode_trades_valid() {
505 let header = create_header(2, TRADES_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
507
508 let mut buf = Vec::new();
509 buf.extend_from_slice(&header);
510
511 let price_exponent: i8 = -8;
513 let qty_exponent: i8 = -8;
514 buf.push(price_exponent as u8);
515 buf.push(qty_exponent as u8);
516
517 buf.extend_from_slice(&create_group_header(42, 1));
519
520 let trade_id: i64 = 999;
522 let price: i64 = 100_000_000_000;
523 let qty: i64 = 10_000_000;
524 let quote_qty: i64 = 1_000_000_000_000;
525 let time: i64 = 1734300000000;
526 let is_buyer_maker: u8 = 1; let is_best_match: u8 = 1; buf.extend_from_slice(&trade_id.to_le_bytes());
530 buf.extend_from_slice(&price.to_le_bytes());
531 buf.extend_from_slice(&qty.to_le_bytes());
532 buf.extend_from_slice("e_qty.to_le_bytes());
533 buf.extend_from_slice(&time.to_le_bytes());
534 buf.push(is_buyer_maker);
535 buf.push(is_best_match);
536
537 let trades = decode_trades(&buf).unwrap();
538
539 assert_eq!(trades.price_exponent, -8);
540 assert_eq!(trades.qty_exponent, -8);
541 assert_eq!(trades.trades.len(), 1);
542 assert_eq!(trades.trades[0].id, 999);
543 assert_eq!(trades.trades[0].price_mantissa, 100_000_000_000);
544 assert!(trades.trades[0].is_buyer_maker);
545 assert!(trades.trades[0].is_best_match);
546 }
547
548 #[rstest]
549 fn test_decode_trades_empty() {
550 let header = create_header(2, TRADES_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
551
552 let mut buf = Vec::new();
553 buf.extend_from_slice(&header);
554 buf.push(0); buf.push(0); buf.extend_from_slice(&create_group_header(42, 0));
559
560 let trades = decode_trades(&buf).unwrap();
561
562 assert!(trades.trades.is_empty());
563 }
564
565 #[rstest]
566 fn test_decode_depth_wrong_template() {
567 let header = create_header(10, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
568
569 let mut buf = Vec::new();
570 buf.extend_from_slice(&header);
571 buf.extend_from_slice(&[0u8; 10]); let err = decode_depth(&buf).unwrap_err();
574 assert!(matches!(err, SbeDecodeError::UnknownTemplateId(101)));
575 }
576
577 #[rstest]
578 fn test_decode_trades_wrong_template() {
579 let header = create_header(2, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
580
581 let mut buf = Vec::new();
582 buf.extend_from_slice(&header);
583 buf.extend_from_slice(&[0u8; 2]); let err = decode_trades(&buf).unwrap_err();
586 assert!(matches!(err, SbeDecodeError::UnknownTemplateId(101)));
587 }
588}