nautilus_binance/common/sbe/stream/
trades.rs1use ustr::Ustr;
31
32use super::{MessageHeader, StreamDecodeError};
33use crate::common::sbe::{cursor::SbeCursor, error::SbeDecodeError};
34
35#[derive(Debug, Clone, Copy)]
37pub struct Trade {
38 pub id: i64,
40 pub price_mantissa: i64,
42 pub qty_mantissa: i64,
44 pub is_buyer_maker: bool,
46}
47
48impl Trade {
49 pub const ENCODED_LENGTH: usize = 25;
51
52 fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
58 Ok(Self {
59 id: cursor.read_i64_le()?,
60 price_mantissa: cursor.read_i64_le()?,
61 qty_mantissa: cursor.read_i64_le()?,
62 is_buyer_maker: cursor.read_u8()? != 0,
63 })
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct TradesStreamEvent {
70 pub event_time_us: i64,
72 pub transact_time_us: i64,
74 pub price_exponent: i8,
76 pub qty_exponent: i8,
78 pub trades: Vec<Trade>,
80 pub symbol: Ustr,
82}
83
84impl TradesStreamEvent {
85 pub const BLOCK_LENGTH: usize = 18;
87
88 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
95 let header = MessageHeader::decode(buf)?;
96 header.validate_schema()?;
97
98 let mut cursor = SbeCursor::new_at(buf, MessageHeader::ENCODED_LENGTH);
99
100 let event_time_us = cursor.read_i64_le()?;
101 let transact_time_us = cursor.read_i64_le()?;
102 let price_exponent = cursor.read_i8()?;
103 let qty_exponent = cursor.read_i8()?;
104
105 let (block_length, num_in_group) = cursor.read_group_header()?;
106 let trades = cursor.read_group(block_length, num_in_group, Trade::decode)?;
107
108 let symbol_str = cursor.read_var_string8()?;
109
110 Ok(Self {
111 event_time_us,
112 transact_time_us,
113 price_exponent,
114 qty_exponent,
115 trades,
116 symbol: Ustr::from(&symbol_str),
117 })
118 }
119
120 #[inline]
122 #[must_use]
123 pub fn trade_price(&self, trade: &Trade) -> f64 {
124 super::mantissa_to_f64(trade.price_mantissa, self.price_exponent)
125 }
126
127 #[inline]
129 #[must_use]
130 pub fn trade_qty(&self, trade: &Trade) -> f64 {
131 super::mantissa_to_f64(trade.qty_mantissa, self.qty_exponent)
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use rstest::rstest;
138
139 use super::*;
140 use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
141
142 fn make_valid_buffer(num_trades: usize) -> Vec<u8> {
143 let trade_block_len = 25u16;
144 let body_size = 18 + 6 + (num_trades * trade_block_len as usize) + 8; let mut buf = vec![0u8; 8 + body_size];
146
147 buf[0..2].copy_from_slice(&18u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::TRADES_STREAM_EVENT.to_le_bytes());
150 buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
151 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let body = &mut buf[8..];
155 body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&1000001i64.to_le_bytes()); body[16] = (-2i8) as u8; body[17] = (-8i8) as u8; body[18..20].copy_from_slice(&trade_block_len.to_le_bytes());
162 body[20..24].copy_from_slice(&(num_trades as u32).to_le_bytes());
163
164 let mut offset = 24;
166 for i in 0..num_trades {
167 body[offset..offset + 8].copy_from_slice(&(i as i64 + 1).to_le_bytes()); body[offset + 8..offset + 16].copy_from_slice(&4200000i64.to_le_bytes()); body[offset + 16..offset + 24].copy_from_slice(&100000000i64.to_le_bytes()); body[offset + 24] = u8::from(i % 2 == 0); offset += trade_block_len as usize;
172 }
173
174 body[offset] = 7;
176 body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
177
178 buf
179 }
180
181 #[rstest]
182 fn test_decode_valid_single_trade() {
183 let buf = make_valid_buffer(1);
184 let event = TradesStreamEvent::decode(&buf).unwrap();
185
186 assert_eq!(event.event_time_us, 1000000);
187 assert_eq!(event.transact_time_us, 1000001);
188 assert_eq!(event.trades.len(), 1);
189 assert_eq!(event.trades[0].id, 1);
190 assert!(event.trades[0].is_buyer_maker);
191 assert_eq!(event.symbol, "BTCUSDT");
192 }
193
194 #[rstest]
195 fn test_decode_valid_multiple_trades() {
196 let buf = make_valid_buffer(5);
197 let event = TradesStreamEvent::decode(&buf).unwrap();
198
199 assert_eq!(event.trades.len(), 5);
200 for (i, trade) in event.trades.iter().enumerate() {
201 assert_eq!(trade.id, i as i64 + 1);
202 }
203 }
204
205 #[rstest]
206 fn test_decode_truncated_trades() {
207 let mut buf = make_valid_buffer(3);
208 buf.truncate(50); let err = TradesStreamEvent::decode(&buf).unwrap_err();
210 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
211 }
212
213 #[rstest]
214 fn test_decode_wrong_schema() {
215 let mut buf = make_valid_buffer(1);
216 buf[4..6].copy_from_slice(&99u16.to_le_bytes());
217 let err = TradesStreamEvent::decode(&buf).unwrap_err();
218 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
219 }
220}