nautilus_binance/common/sbe/stream/
trades.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Trades stream event decoder.
17//!
18//! Message layout (after 8-byte header):
19//! - eventTime: i64 (microseconds)
20//! - transactTime: i64 (microseconds)
21//! - priceExponent: i8
22//! - qtyExponent: i8
23//! - trades group (groupSizeEncoding: u16 blockLength + u32 numInGroup):
24//!   - id: i64
25//!   - price: i64 (mantissa)
26//!   - qty: i64 (mantissa)
27//!   - isBuyerMaker: u8
28//! - symbol: varString8
29
30use ustr::Ustr;
31
32use super::{MessageHeader, StreamDecodeError};
33use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
34
35/// Individual trade within a trades stream event.
36#[derive(Debug, Clone, Copy)]
37pub struct Trade {
38    /// Trade ID.
39    pub id: i64,
40    /// Price mantissa.
41    pub price_mantissa: i64,
42    /// Quantity mantissa.
43    pub qty_mantissa: i64,
44    /// True if buyer is the maker (seller initiated the trade).
45    pub is_buyer_maker: bool,
46}
47
48impl Trade {
49    /// Encoded length per trade entry.
50    pub const ENCODED_LENGTH: usize = 25;
51
52    /// Decode a single trade from cursor.
53    ///
54    /// # Errors
55    ///
56    /// Returns error if buffer is too short.
57    fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
58        Ok(Self {
59            id: cursor.read_i64_le()?,
60            price_mantissa: cursor.read_i64_le()?,
61            qty_mantissa: cursor.read_i64_le()?,
62            is_buyer_maker: cursor.read_u8()? != 0,
63        })
64    }
65}
66
67/// Trades stream event (may contain multiple trades).
68#[derive(Debug, Clone)]
69pub struct TradesStreamEvent {
70    /// Event timestamp in microseconds.
71    pub event_time_us: i64,
72    /// Transaction timestamp in microseconds.
73    pub transact_time_us: i64,
74    /// Price exponent (prices = mantissa * 10^exponent).
75    pub price_exponent: i8,
76    /// Quantity exponent (quantities = mantissa * 10^exponent).
77    pub qty_exponent: i8,
78    /// Trades in this event.
79    pub trades: Vec<Trade>,
80    /// Trading symbol.
81    pub symbol: Ustr,
82}
83
84impl TradesStreamEvent {
85    /// Fixed block length (excluding header, groups, and variable-length data).
86    pub const BLOCK_LENGTH: usize = 18;
87
88    /// Decode from SBE buffer (including 8-byte header).
89    ///
90    /// # Errors
91    ///
92    /// Returns error if buffer is too short, group size exceeds limits,
93    /// or data is otherwise invalid.
94    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
95        let header = MessageHeader::decode(buf)?;
96        header.validate_schema()?;
97
98        let mut cursor = SbeCursor::new_at(buf, MessageHeader::ENCODED_LENGTH);
99
100        let event_time_us = cursor.read_i64_le()?;
101        let transact_time_us = cursor.read_i64_le()?;
102        let price_exponent = cursor.read_i8()?;
103        let qty_exponent = cursor.read_i8()?;
104
105        let (block_length, num_in_group) = cursor.read_group_header()?;
106        let trades = cursor.read_group(block_length, num_in_group, Trade::decode)?;
107
108        let symbol_str = cursor.read_var_string8()?;
109
110        Ok(Self {
111            event_time_us,
112            transact_time_us,
113            price_exponent,
114            qty_exponent,
115            trades,
116            symbol: Ustr::from(&symbol_str),
117        })
118    }
119
120    /// Get price as f64 for a trade.
121    #[inline]
122    #[must_use]
123    pub fn trade_price(&self, trade: &Trade) -> f64 {
124        super::mantissa_to_f64(trade.price_mantissa, self.price_exponent)
125    }
126
127    /// Get quantity as f64 for a trade.
128    #[inline]
129    #[must_use]
130    pub fn trade_qty(&self, trade: &Trade) -> f64 {
131        super::mantissa_to_f64(trade.qty_mantissa, self.qty_exponent)
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use rstest::rstest;
138
139    use super::*;
140    use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
141
142    fn make_valid_buffer(num_trades: usize) -> Vec<u8> {
143        let trade_block_len = 25u16;
144        let body_size = 18 + 6 + (num_trades * trade_block_len as usize) + 8; // fixed + group header + trades + symbol
145        let mut buf = vec![0u8; 8 + body_size];
146
147        // Header
148        buf[0..2].copy_from_slice(&18u16.to_le_bytes()); // block_length
149        buf[2..4].copy_from_slice(&template_id::TRADES_STREAM_EVENT.to_le_bytes());
150        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
151        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
152
153        // Body
154        let body = &mut buf[8..];
155        body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); // event_time_us
156        body[8..16].copy_from_slice(&1000001i64.to_le_bytes()); // transact_time_us
157        body[16] = (-2i8) as u8; // price_exponent
158        body[17] = (-8i8) as u8; // qty_exponent
159
160        // Group header
161        body[18..20].copy_from_slice(&trade_block_len.to_le_bytes());
162        body[20..24].copy_from_slice(&(num_trades as u32).to_le_bytes());
163
164        // Trades
165        let mut offset = 24;
166        for i in 0..num_trades {
167            body[offset..offset + 8].copy_from_slice(&(i as i64 + 1).to_le_bytes()); // id
168            body[offset + 8..offset + 16].copy_from_slice(&4200000i64.to_le_bytes()); // price
169            body[offset + 16..offset + 24].copy_from_slice(&100000000i64.to_le_bytes()); // qty
170            body[offset + 24] = u8::from(i % 2 == 0); // is_buyer_maker
171            offset += trade_block_len as usize;
172        }
173
174        // Symbol: "BTCUSDT"
175        body[offset] = 7;
176        body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
177
178        buf
179    }
180
181    #[rstest]
182    fn test_decode_valid_single_trade() {
183        let buf = make_valid_buffer(1);
184        let event = TradesStreamEvent::decode(&buf).unwrap();
185
186        assert_eq!(event.event_time_us, 1000000);
187        assert_eq!(event.transact_time_us, 1000001);
188        assert_eq!(event.trades.len(), 1);
189        assert_eq!(event.trades[0].id, 1);
190        assert!(event.trades[0].is_buyer_maker);
191        assert_eq!(event.symbol, "BTCUSDT");
192    }
193
194    #[rstest]
195    fn test_decode_valid_multiple_trades() {
196        let buf = make_valid_buffer(5);
197        let event = TradesStreamEvent::decode(&buf).unwrap();
198
199        assert_eq!(event.trades.len(), 5);
200        for (i, trade) in event.trades.iter().enumerate() {
201            assert_eq!(trade.id, i as i64 + 1);
202        }
203    }
204
205    #[rstest]
206    fn test_decode_truncated_trades() {
207        let mut buf = make_valid_buffer(3);
208        buf.truncate(50); // Truncate in the middle of trades
209        let err = TradesStreamEvent::decode(&buf).unwrap_err();
210        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
211    }
212
213    #[rstest]
214    fn test_decode_wrong_schema() {
215        let mut buf = make_valid_buffer(1);
216        buf[4..6].copy_from_slice(&99u16.to_le_bytes());
217        let err = TradesStreamEvent::decode(&buf).unwrap_err();
218        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
219    }
220}