nautilus_binance/spot/http/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! SBE decode functions for Binance Spot HTTP responses.
17//!
18//! Each function decodes raw SBE bytes into domain types, validating the
19//! message header (schema ID, version, template ID) before extracting fields.
20
21use super::{
22    error::SbeDecodeError,
23    models::{BinanceDepth, BinancePriceLevel, BinanceTrade, BinanceTrades},
24};
25use crate::common::sbe::spot::{
26    SBE_SCHEMA_ID, SBE_SCHEMA_VERSION, bool_enum::BoolEnum,
27    depth_response_codec::SBE_TEMPLATE_ID as DEPTH_TEMPLATE_ID,
28    message_header_codec::ENCODED_LENGTH as HEADER_LENGTH,
29    ping_response_codec::SBE_TEMPLATE_ID as PING_TEMPLATE_ID,
30    server_time_response_codec::SBE_TEMPLATE_ID as SERVER_TIME_TEMPLATE_ID,
31    trades_response_codec::SBE_TEMPLATE_ID as TRADES_TEMPLATE_ID,
32};
33
34/// Group size encoding length (u16 block_length + u32 num_in_group).
35const GROUP_SIZE_LENGTH: usize = 6;
36
37/// Maximum allowed group size to prevent OOM from malicious payloads.
38const MAX_GROUP_SIZE: u32 = 10_000;
39
40/// SBE message header.
41#[derive(Debug, Clone, Copy)]
42struct MessageHeader {
43    #[allow(dead_code)]
44    block_length: u16,
45    template_id: u16,
46    schema_id: u16,
47    version: u16,
48}
49
50impl MessageHeader {
51    /// Decode message header from buffer.
52    fn decode(buf: &[u8]) -> Result<Self, SbeDecodeError> {
53        if buf.len() < HEADER_LENGTH {
54            return Err(SbeDecodeError::BufferTooShort {
55                expected: HEADER_LENGTH,
56                actual: buf.len(),
57            });
58        }
59        Ok(Self {
60            block_length: u16::from_le_bytes([buf[0], buf[1]]),
61            template_id: u16::from_le_bytes([buf[2], buf[3]]),
62            schema_id: u16::from_le_bytes([buf[4], buf[5]]),
63            version: u16::from_le_bytes([buf[6], buf[7]]),
64        })
65    }
66
67    /// Validate schema ID and version.
68    fn validate(&self) -> Result<(), SbeDecodeError> {
69        if self.schema_id != SBE_SCHEMA_ID {
70            return Err(SbeDecodeError::SchemaMismatch {
71                expected: SBE_SCHEMA_ID,
72                actual: self.schema_id,
73            });
74        }
75        if self.version != SBE_SCHEMA_VERSION {
76            return Err(SbeDecodeError::VersionMismatch {
77                expected: SBE_SCHEMA_VERSION,
78                actual: self.version,
79            });
80        }
81        Ok(())
82    }
83}
84
85/// Decode a ping response.
86///
87/// Ping response has no body (block_length = 0), just validates the header.
88///
89/// # Errors
90///
91/// Returns error if buffer is too short or schema mismatch.
92pub fn decode_ping(buf: &[u8]) -> Result<(), SbeDecodeError> {
93    let header = MessageHeader::decode(buf)?;
94    header.validate()?;
95
96    if header.template_id != PING_TEMPLATE_ID {
97        return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
98    }
99
100    Ok(())
101}
102
103/// Decode a server time response.
104///
105/// Returns the server time as **microseconds** since epoch (SBE provides
106/// microsecond precision vs JSON's milliseconds).
107///
108/// # Errors
109///
110/// Returns error if buffer is too short or schema mismatch.
111///
112/// # Panics
113///
114/// This function will not panic as buffer lengths are validated before slicing.
115pub fn decode_server_time(buf: &[u8]) -> Result<i64, SbeDecodeError> {
116    let header = MessageHeader::decode(buf)?;
117    header.validate()?;
118
119    if header.template_id != SERVER_TIME_TEMPLATE_ID {
120        return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
121    }
122
123    let body_start = HEADER_LENGTH;
124    let body_end = body_start + 8;
125
126    if buf.len() < body_end {
127        return Err(SbeDecodeError::BufferTooShort {
128            expected: body_end,
129            actual: buf.len(),
130        });
131    }
132
133    // SAFETY: Length validated above
134    let server_time = i64::from_le_bytes(buf[body_start..body_end].try_into().expect("slice len"));
135    Ok(server_time)
136}
137
138/// Decode a depth response.
139///
140/// Returns the order book depth with bids and asks.
141///
142/// # Errors
143///
144/// Returns error if buffer is too short, schema mismatch, or group size exceeded.
145///
146/// # Panics
147///
148/// This function will not panic as buffer lengths are validated before slicing.
149pub fn decode_depth(buf: &[u8]) -> Result<BinanceDepth, SbeDecodeError> {
150    let header = MessageHeader::decode(buf)?;
151    header.validate()?;
152
153    if header.template_id != DEPTH_TEMPLATE_ID {
154        return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
155    }
156
157    // Depth block: last_update_id (8) + price_exponent (1) + qty_exponent (1) = 10 bytes
158    let block_start = HEADER_LENGTH;
159    let block_end = block_start + 10;
160
161    if buf.len() < block_end {
162        return Err(SbeDecodeError::BufferTooShort {
163            expected: block_end,
164            actual: buf.len(),
165        });
166    }
167
168    // SAFETY: Length validated above
169    let last_update_id = i64::from_le_bytes(
170        buf[block_start..block_start + 8]
171            .try_into()
172            .expect("slice len"),
173    );
174    let price_exponent = buf[block_start + 8] as i8;
175    let qty_exponent = buf[block_start + 9] as i8;
176
177    let (bids, bids_end) = decode_price_levels(&buf[block_end..])?;
178    let (asks, _asks_end) = decode_price_levels(&buf[block_end + bids_end..])?;
179
180    Ok(BinanceDepth {
181        last_update_id,
182        price_exponent,
183        qty_exponent,
184        bids,
185        asks,
186    })
187}
188
189/// Decode a trades response.
190///
191/// Returns the list of trades.
192///
193/// # Errors
194///
195/// Returns error if buffer is too short, schema mismatch, or group size exceeded.
196pub fn decode_trades(buf: &[u8]) -> Result<BinanceTrades, SbeDecodeError> {
197    let header = MessageHeader::decode(buf)?;
198    header.validate()?;
199
200    if header.template_id != TRADES_TEMPLATE_ID {
201        return Err(SbeDecodeError::UnknownTemplateId(header.template_id));
202    }
203
204    // Trades block: price_exponent (1) + qty_exponent (1) = 2 bytes
205    let block_start = HEADER_LENGTH;
206    let block_end = block_start + 2;
207
208    if buf.len() < block_end {
209        return Err(SbeDecodeError::BufferTooShort {
210            expected: block_end,
211            actual: buf.len(),
212        });
213    }
214
215    let price_exponent = buf[block_start] as i8;
216    let qty_exponent = buf[block_start + 1] as i8;
217
218    let trades = decode_trades_group(&buf[block_end..])?;
219
220    Ok(BinanceTrades {
221        price_exponent,
222        qty_exponent,
223        trades,
224    })
225}
226
227/// Decode a group of price levels (bids or asks).
228///
229/// Returns (levels, bytes_consumed).
230fn decode_price_levels(buf: &[u8]) -> Result<(Vec<BinancePriceLevel>, usize), SbeDecodeError> {
231    if buf.len() < GROUP_SIZE_LENGTH {
232        return Err(SbeDecodeError::BufferTooShort {
233            expected: GROUP_SIZE_LENGTH,
234            actual: buf.len(),
235        });
236    }
237
238    let _block_length = u16::from_le_bytes([buf[0], buf[1]]);
239    let count = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
240
241    if count > MAX_GROUP_SIZE {
242        return Err(SbeDecodeError::GroupSizeTooLarge {
243            count,
244            max: MAX_GROUP_SIZE,
245        });
246    }
247
248    let level_size = 16; // price (8) + qty (8)
249    let total_size = GROUP_SIZE_LENGTH + (count as usize * level_size);
250
251    if buf.len() < total_size {
252        return Err(SbeDecodeError::BufferTooShort {
253            expected: total_size,
254            actual: buf.len(),
255        });
256    }
257
258    let mut levels = Vec::with_capacity(count as usize);
259    let mut offset = GROUP_SIZE_LENGTH;
260
261    for _ in 0..count {
262        let price_mantissa = i64::from_le_bytes(buf[offset..offset + 8].try_into().unwrap());
263        let qty_mantissa = i64::from_le_bytes(buf[offset + 8..offset + 16].try_into().unwrap());
264
265        levels.push(BinancePriceLevel {
266            price_mantissa,
267            qty_mantissa,
268        });
269
270        offset += level_size;
271    }
272
273    Ok((levels, total_size))
274}
275
276/// Decode a group of trades.
277fn decode_trades_group(buf: &[u8]) -> Result<Vec<BinanceTrade>, SbeDecodeError> {
278    if buf.len() < GROUP_SIZE_LENGTH {
279        return Err(SbeDecodeError::BufferTooShort {
280            expected: GROUP_SIZE_LENGTH,
281            actual: buf.len(),
282        });
283    }
284
285    let _block_length = u16::from_le_bytes([buf[0], buf[1]]);
286    let count = u32::from_le_bytes([buf[2], buf[3], buf[4], buf[5]]);
287
288    if count > MAX_GROUP_SIZE {
289        return Err(SbeDecodeError::GroupSizeTooLarge {
290            count,
291            max: MAX_GROUP_SIZE,
292        });
293    }
294
295    // Trade: id(8) + price(8) + qty(8) + quoteQty(8) + time(8) + isBuyerMaker(1) + isBestMatch(1) = 42
296    let trade_size = 42;
297    let total_size = GROUP_SIZE_LENGTH + (count as usize * trade_size);
298
299    if buf.len() < total_size {
300        return Err(SbeDecodeError::BufferTooShort {
301            expected: total_size,
302            actual: buf.len(),
303        });
304    }
305
306    let mut trades = Vec::with_capacity(count as usize);
307    let mut offset = GROUP_SIZE_LENGTH;
308
309    for _ in 0..count {
310        let id = i64::from_le_bytes(buf[offset..offset + 8].try_into().unwrap());
311        let price_mantissa = i64::from_le_bytes(buf[offset + 8..offset + 16].try_into().unwrap());
312        let qty_mantissa = i64::from_le_bytes(buf[offset + 16..offset + 24].try_into().unwrap());
313        let quote_qty_mantissa =
314            i64::from_le_bytes(buf[offset + 24..offset + 32].try_into().unwrap());
315        let time = i64::from_le_bytes(buf[offset + 32..offset + 40].try_into().unwrap());
316        let is_buyer_maker = BoolEnum::from(buf[offset + 40]) == BoolEnum::True;
317        let is_best_match = BoolEnum::from(buf[offset + 41]) == BoolEnum::True;
318
319        trades.push(BinanceTrade {
320            id,
321            price_mantissa,
322            qty_mantissa,
323            quote_qty_mantissa,
324            time,
325            is_buyer_maker,
326            is_best_match,
327        });
328
329        offset += trade_size;
330    }
331
332    Ok(trades)
333}
334
335#[cfg(test)]
336mod tests {
337    use rstest::rstest;
338
339    use super::*;
340
341    fn create_header(block_length: u16, template_id: u16, schema_id: u16, version: u16) -> [u8; 8] {
342        let mut buf = [0u8; 8];
343        buf[0..2].copy_from_slice(&block_length.to_le_bytes());
344        buf[2..4].copy_from_slice(&template_id.to_le_bytes());
345        buf[4..6].copy_from_slice(&schema_id.to_le_bytes());
346        buf[6..8].copy_from_slice(&version.to_le_bytes());
347        buf
348    }
349
350    #[rstest]
351    fn test_decode_ping_valid() {
352        // Ping: block_length=0, template_id=101, schema_id=3, version=1
353        let buf = create_header(0, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
354        assert!(decode_ping(&buf).is_ok());
355    }
356
357    #[rstest]
358    fn test_decode_ping_buffer_too_short() {
359        let buf = [0u8; 4];
360        let err = decode_ping(&buf).unwrap_err();
361        assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
362    }
363
364    #[rstest]
365    fn test_decode_ping_schema_mismatch() {
366        let buf = create_header(0, PING_TEMPLATE_ID, 99, SBE_SCHEMA_VERSION);
367        let err = decode_ping(&buf).unwrap_err();
368        assert!(matches!(err, SbeDecodeError::SchemaMismatch { .. }));
369    }
370
371    #[rstest]
372    fn test_decode_ping_wrong_template() {
373        let buf = create_header(0, 999, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
374        let err = decode_ping(&buf).unwrap_err();
375        assert!(matches!(err, SbeDecodeError::UnknownTemplateId(999)));
376    }
377
378    #[rstest]
379    fn test_decode_server_time_valid() {
380        // ServerTime: block_length=8, template_id=102, schema_id=3, version=1
381        let header = create_header(
382            8,
383            SERVER_TIME_TEMPLATE_ID,
384            SBE_SCHEMA_ID,
385            SBE_SCHEMA_VERSION,
386        );
387        let timestamp: i64 = 1734300000000; // Example timestamp
388
389        let mut buf = Vec::with_capacity(16);
390        buf.extend_from_slice(&header);
391        buf.extend_from_slice(&timestamp.to_le_bytes());
392
393        let result = decode_server_time(&buf).unwrap();
394        assert_eq!(result, timestamp);
395    }
396
397    #[rstest]
398    fn test_decode_server_time_buffer_too_short() {
399        // Header only, missing body
400        let buf = create_header(
401            8,
402            SERVER_TIME_TEMPLATE_ID,
403            SBE_SCHEMA_ID,
404            SBE_SCHEMA_VERSION,
405        );
406        let err = decode_server_time(&buf).unwrap_err();
407        assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
408    }
409
410    #[rstest]
411    fn test_decode_server_time_wrong_template() {
412        let header = create_header(8, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
413        let mut buf = Vec::with_capacity(16);
414        buf.extend_from_slice(&header);
415        buf.extend_from_slice(&0i64.to_le_bytes());
416
417        let err = decode_server_time(&buf).unwrap_err();
418        assert!(matches!(err, SbeDecodeError::UnknownTemplateId(101)));
419    }
420
421    #[rstest]
422    fn test_decode_server_time_version_mismatch() {
423        let header = create_header(8, SERVER_TIME_TEMPLATE_ID, SBE_SCHEMA_ID, 99);
424        let mut buf = Vec::with_capacity(16);
425        buf.extend_from_slice(&header);
426        buf.extend_from_slice(&0i64.to_le_bytes());
427
428        let err = decode_server_time(&buf).unwrap_err();
429        assert!(matches!(err, SbeDecodeError::VersionMismatch { .. }));
430    }
431
432    fn create_group_header(block_length: u16, count: u32) -> [u8; 6] {
433        let mut buf = [0u8; 6];
434        buf[0..2].copy_from_slice(&block_length.to_le_bytes());
435        buf[2..6].copy_from_slice(&count.to_le_bytes());
436        buf
437    }
438
439    #[rstest]
440    fn test_decode_depth_valid() {
441        // Depth: block_length=10, template_id=200
442        let header = create_header(10, DEPTH_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
443
444        let mut buf = Vec::new();
445        buf.extend_from_slice(&header);
446
447        // Block: last_update_id (8) + price_exponent (1) + qty_exponent (1)
448        let last_update_id: i64 = 123456789;
449        let price_exponent: i8 = -8;
450        let qty_exponent: i8 = -8;
451        buf.extend_from_slice(&last_update_id.to_le_bytes());
452        buf.push(price_exponent as u8);
453        buf.push(qty_exponent as u8);
454
455        // Bids group: 2 levels
456        buf.extend_from_slice(&create_group_header(16, 2));
457        // Bid 1: price=100000000000, qty=50000000
458        buf.extend_from_slice(&100_000_000_000i64.to_le_bytes());
459        buf.extend_from_slice(&50_000_000i64.to_le_bytes());
460        // Bid 2: price=99900000000, qty=30000000
461        buf.extend_from_slice(&99_900_000_000i64.to_le_bytes());
462        buf.extend_from_slice(&30_000_000i64.to_le_bytes());
463
464        // Asks group: 1 level
465        buf.extend_from_slice(&create_group_header(16, 1));
466        // Ask 1: price=100100000000, qty=25000000
467        buf.extend_from_slice(&100_100_000_000i64.to_le_bytes());
468        buf.extend_from_slice(&25_000_000i64.to_le_bytes());
469
470        let depth = decode_depth(&buf).unwrap();
471
472        assert_eq!(depth.last_update_id, 123456789);
473        assert_eq!(depth.price_exponent, -8);
474        assert_eq!(depth.qty_exponent, -8);
475        assert_eq!(depth.bids.len(), 2);
476        assert_eq!(depth.asks.len(), 1);
477        assert_eq!(depth.bids[0].price_mantissa, 100_000_000_000);
478        assert_eq!(depth.bids[0].qty_mantissa, 50_000_000);
479        assert_eq!(depth.asks[0].price_mantissa, 100_100_000_000);
480    }
481
482    #[rstest]
483    fn test_decode_depth_empty_book() {
484        let header = create_header(10, DEPTH_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
485
486        let mut buf = Vec::new();
487        buf.extend_from_slice(&header);
488        buf.extend_from_slice(&0i64.to_le_bytes()); // last_update_id
489        buf.push(0); // price_exponent
490        buf.push(0); // qty_exponent
491
492        // Empty bids
493        buf.extend_from_slice(&create_group_header(16, 0));
494        // Empty asks
495        buf.extend_from_slice(&create_group_header(16, 0));
496
497        let depth = decode_depth(&buf).unwrap();
498
499        assert!(depth.bids.is_empty());
500        assert!(depth.asks.is_empty());
501    }
502
503    #[rstest]
504    fn test_decode_trades_valid() {
505        // Trades: block_length=2, template_id=201
506        let header = create_header(2, TRADES_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
507
508        let mut buf = Vec::new();
509        buf.extend_from_slice(&header);
510
511        // Block: price_exponent (1) + qty_exponent (1)
512        let price_exponent: i8 = -8;
513        let qty_exponent: i8 = -8;
514        buf.push(price_exponent as u8);
515        buf.push(qty_exponent as u8);
516
517        // Trades group: 1 trade (42 bytes each)
518        buf.extend_from_slice(&create_group_header(42, 1));
519
520        // Trade: id(8) + price(8) + qty(8) + quoteQty(8) + time(8) + isBuyerMaker(1) + isBestMatch(1)
521        let trade_id: i64 = 999;
522        let price: i64 = 100_000_000_000;
523        let qty: i64 = 10_000_000;
524        let quote_qty: i64 = 1_000_000_000_000;
525        let time: i64 = 1734300000000;
526        let is_buyer_maker: u8 = 1; // true
527        let is_best_match: u8 = 1; // true
528
529        buf.extend_from_slice(&trade_id.to_le_bytes());
530        buf.extend_from_slice(&price.to_le_bytes());
531        buf.extend_from_slice(&qty.to_le_bytes());
532        buf.extend_from_slice(&quote_qty.to_le_bytes());
533        buf.extend_from_slice(&time.to_le_bytes());
534        buf.push(is_buyer_maker);
535        buf.push(is_best_match);
536
537        let trades = decode_trades(&buf).unwrap();
538
539        assert_eq!(trades.price_exponent, -8);
540        assert_eq!(trades.qty_exponent, -8);
541        assert_eq!(trades.trades.len(), 1);
542        assert_eq!(trades.trades[0].id, 999);
543        assert_eq!(trades.trades[0].price_mantissa, 100_000_000_000);
544        assert!(trades.trades[0].is_buyer_maker);
545        assert!(trades.trades[0].is_best_match);
546    }
547
548    #[rstest]
549    fn test_decode_trades_empty() {
550        let header = create_header(2, TRADES_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
551
552        let mut buf = Vec::new();
553        buf.extend_from_slice(&header);
554        buf.push(0); // price_exponent
555        buf.push(0); // qty_exponent
556
557        // Empty trades group
558        buf.extend_from_slice(&create_group_header(42, 0));
559
560        let trades = decode_trades(&buf).unwrap();
561
562        assert!(trades.trades.is_empty());
563    }
564
565    #[rstest]
566    fn test_decode_depth_wrong_template() {
567        let header = create_header(10, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
568
569        let mut buf = Vec::new();
570        buf.extend_from_slice(&header);
571        buf.extend_from_slice(&[0u8; 10]); // dummy block
572
573        let err = decode_depth(&buf).unwrap_err();
574        assert!(matches!(err, SbeDecodeError::UnknownTemplateId(101)));
575    }
576
577    #[rstest]
578    fn test_decode_trades_wrong_template() {
579        let header = create_header(2, PING_TEMPLATE_ID, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION);
580
581        let mut buf = Vec::new();
582        buf.extend_from_slice(&header);
583        buf.extend_from_slice(&[0u8; 2]); // dummy block
584
585        let err = decode_trades(&buf).unwrap_err();
586        assert!(matches!(err, SbeDecodeError::UnknownTemplateId(101)));
587    }
588}