nautilus_binance/common/sbe/stream/
trades.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Trades stream event decoder.
17//!
18//! Message layout (after 8-byte header):
19//! - eventTime: i64 (microseconds)
20//! - transactTime: i64 (microseconds)
21//! - priceExponent: i8
22//! - qtyExponent: i8
23//! - trades group (groupSizeEncoding: u16 blockLength + u32 numInGroup):
24//!   - id: i64
25//!   - price: i64 (mantissa)
26//!   - qty: i64 (mantissa)
27//!   - isBuyerMaker: u8
28//! - symbol: varString8
29
30use super::{
31    GroupSizeEncoding, MessageHeader, StreamDecodeError, decode_var_string8, read_i8, read_i64_le,
32    read_u8,
33};
34
35/// Individual trade within a trades stream event.
36#[derive(Debug, Clone, Copy)]
37pub struct Trade {
38    /// Trade ID.
39    pub id: i64,
40    /// Price mantissa.
41    pub price_mantissa: i64,
42    /// Quantity mantissa.
43    pub qty_mantissa: i64,
44    /// True if buyer is the maker (seller initiated the trade).
45    pub is_buyer_maker: bool,
46}
47
48impl Trade {
49    /// Encoded length per trade entry.
50    pub const ENCODED_LENGTH: usize = 25;
51
52    /// Decode a single trade from buffer.
53    ///
54    /// # Errors
55    ///
56    /// Returns error if buffer is too short.
57    fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
58        if buf.len() < Self::ENCODED_LENGTH {
59            return Err(StreamDecodeError::BufferTooShort {
60                expected: Self::ENCODED_LENGTH,
61                actual: buf.len(),
62            });
63        }
64
65        Ok(Self {
66            id: read_i64_le(buf, 0)?,
67            price_mantissa: read_i64_le(buf, 8)?,
68            qty_mantissa: read_i64_le(buf, 16)?,
69            is_buyer_maker: read_u8(buf, 24)? != 0,
70        })
71    }
72}
73
74/// Trades stream event (may contain multiple trades).
75#[derive(Debug, Clone)]
76pub struct TradesStreamEvent {
77    /// Event timestamp in microseconds.
78    pub event_time_us: i64,
79    /// Transaction timestamp in microseconds.
80    pub transact_time_us: i64,
81    /// Price exponent (prices = mantissa * 10^exponent).
82    pub price_exponent: i8,
83    /// Quantity exponent (quantities = mantissa * 10^exponent).
84    pub qty_exponent: i8,
85    /// Trades in this event.
86    pub trades: Vec<Trade>,
87    /// Trading symbol.
88    pub symbol: String,
89}
90
91impl TradesStreamEvent {
92    /// Fixed block length (excluding header, groups, and variable-length data).
93    pub const BLOCK_LENGTH: usize = 18;
94
95    /// Decode from SBE buffer (including 8-byte header).
96    ///
97    /// # Errors
98    ///
99    /// Returns error if buffer is too short, group size exceeds limits,
100    /// or data is otherwise invalid.
101    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
102        let header = MessageHeader::decode(buf)?;
103        header.validate_schema()?;
104
105        let body = &buf[MessageHeader::ENCODED_LENGTH..];
106
107        let min_body_size = Self::BLOCK_LENGTH + GroupSizeEncoding::ENCODED_LENGTH;
108        if body.len() < min_body_size {
109            return Err(StreamDecodeError::BufferTooShort {
110                expected: MessageHeader::ENCODED_LENGTH + min_body_size,
111                actual: buf.len(),
112            });
113        }
114
115        let event_time_us = read_i64_le(body, 0)?;
116        let transact_time_us = read_i64_le(body, 8)?;
117        let price_exponent = read_i8(body, 16)?;
118        let qty_exponent = read_i8(body, 17)?;
119
120        // Group size limit enforced inside GroupSizeEncoding::decode
121        let group_start = Self::BLOCK_LENGTH;
122        let group_size = GroupSizeEncoding::decode(&body[group_start..])?;
123        let num_trades = group_size.num_in_group as usize;
124        let trade_block_length = group_size.block_length as usize;
125
126        let trades_data_start = group_start + GroupSizeEncoding::ENCODED_LENGTH;
127        let trades_data_size = num_trades * trade_block_length;
128        let required_size = trades_data_start + trades_data_size + 1; // +1 for symbol length byte
129
130        if body.len() < required_size {
131            return Err(StreamDecodeError::BufferTooShort {
132                expected: MessageHeader::ENCODED_LENGTH + required_size,
133                actual: buf.len(),
134            });
135        }
136
137        let mut trades = Vec::with_capacity(num_trades);
138        let mut offset = trades_data_start;
139
140        for _ in 0..num_trades {
141            trades.push(Trade::decode(&body[offset..])?);
142            offset += trade_block_length;
143        }
144
145        let (symbol, _) = decode_var_string8(&body[offset..])?;
146
147        Ok(Self {
148            event_time_us,
149            transact_time_us,
150            price_exponent,
151            qty_exponent,
152            trades,
153            symbol,
154        })
155    }
156
157    /// Get price as f64 for a trade.
158    #[inline]
159    #[must_use]
160    pub fn trade_price(&self, trade: &Trade) -> f64 {
161        super::mantissa_to_f64(trade.price_mantissa, self.price_exponent)
162    }
163
164    /// Get quantity as f64 for a trade.
165    #[inline]
166    #[must_use]
167    pub fn trade_qty(&self, trade: &Trade) -> f64 {
168        super::mantissa_to_f64(trade.qty_mantissa, self.qty_exponent)
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use rstest::rstest;
175
176    use super::*;
177    use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
178
179    fn make_valid_buffer(num_trades: usize) -> Vec<u8> {
180        let trade_block_len = 25u16;
181        let body_size = 18 + 6 + (num_trades * trade_block_len as usize) + 8; // fixed + group header + trades + symbol
182        let mut buf = vec![0u8; 8 + body_size];
183
184        // Header
185        buf[0..2].copy_from_slice(&18u16.to_le_bytes()); // block_length
186        buf[2..4].copy_from_slice(&template_id::TRADES_STREAM_EVENT.to_le_bytes());
187        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
188        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
189
190        // Body
191        let body = &mut buf[8..];
192        body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); // event_time_us
193        body[8..16].copy_from_slice(&1000001i64.to_le_bytes()); // transact_time_us
194        body[16] = (-2i8) as u8; // price_exponent
195        body[17] = (-8i8) as u8; // qty_exponent
196
197        // Group header
198        body[18..20].copy_from_slice(&trade_block_len.to_le_bytes());
199        body[20..24].copy_from_slice(&(num_trades as u32).to_le_bytes());
200
201        // Trades
202        let mut offset = 24;
203        for i in 0..num_trades {
204            body[offset..offset + 8].copy_from_slice(&(i as i64 + 1).to_le_bytes()); // id
205            body[offset + 8..offset + 16].copy_from_slice(&4200000i64.to_le_bytes()); // price
206            body[offset + 16..offset + 24].copy_from_slice(&100000000i64.to_le_bytes()); // qty
207            body[offset + 24] = u8::from(i % 2 == 0); // is_buyer_maker
208            offset += trade_block_len as usize;
209        }
210
211        // Symbol: "BTCUSDT"
212        body[offset] = 7;
213        body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
214
215        buf
216    }
217
218    #[rstest]
219    fn test_decode_valid_single_trade() {
220        let buf = make_valid_buffer(1);
221        let event = TradesStreamEvent::decode(&buf).unwrap();
222
223        assert_eq!(event.event_time_us, 1000000);
224        assert_eq!(event.transact_time_us, 1000001);
225        assert_eq!(event.trades.len(), 1);
226        assert_eq!(event.trades[0].id, 1);
227        assert!(event.trades[0].is_buyer_maker);
228        assert_eq!(event.symbol, "BTCUSDT");
229    }
230
231    #[rstest]
232    fn test_decode_valid_multiple_trades() {
233        let buf = make_valid_buffer(5);
234        let event = TradesStreamEvent::decode(&buf).unwrap();
235
236        assert_eq!(event.trades.len(), 5);
237        for (i, trade) in event.trades.iter().enumerate() {
238            assert_eq!(trade.id, i as i64 + 1);
239        }
240    }
241
242    #[rstest]
243    fn test_decode_truncated_trades() {
244        let mut buf = make_valid_buffer(3);
245        buf.truncate(50); // Truncate in the middle of trades
246        let err = TradesStreamEvent::decode(&buf).unwrap_err();
247        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
248    }
249
250    #[rstest]
251    fn test_decode_wrong_schema() {
252        let mut buf = make_valid_buffer(1);
253        buf[4..6].copy_from_slice(&99u16.to_le_bytes());
254        let err = TradesStreamEvent::decode(&buf).unwrap_err();
255        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
256    }
257}