nautilus_binance/common/sbe/stream/
trades.rs1use super::{
31 GroupSizeEncoding, MessageHeader, StreamDecodeError, decode_var_string8, read_i8, read_i64_le,
32 read_u8,
33};
34
35#[derive(Debug, Clone, Copy)]
37pub struct Trade {
38 pub id: i64,
40 pub price_mantissa: i64,
42 pub qty_mantissa: i64,
44 pub is_buyer_maker: bool,
46}
47
48impl Trade {
49 pub const ENCODED_LENGTH: usize = 25;
51
52 fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
58 if buf.len() < Self::ENCODED_LENGTH {
59 return Err(StreamDecodeError::BufferTooShort {
60 expected: Self::ENCODED_LENGTH,
61 actual: buf.len(),
62 });
63 }
64
65 Ok(Self {
66 id: read_i64_le(buf, 0)?,
67 price_mantissa: read_i64_le(buf, 8)?,
68 qty_mantissa: read_i64_le(buf, 16)?,
69 is_buyer_maker: read_u8(buf, 24)? != 0,
70 })
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct TradesStreamEvent {
77 pub event_time_us: i64,
79 pub transact_time_us: i64,
81 pub price_exponent: i8,
83 pub qty_exponent: i8,
85 pub trades: Vec<Trade>,
87 pub symbol: String,
89}
90
91impl TradesStreamEvent {
92 pub const BLOCK_LENGTH: usize = 18;
94
95 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
102 let header = MessageHeader::decode(buf)?;
103 header.validate_schema()?;
104
105 let body = &buf[MessageHeader::ENCODED_LENGTH..];
106
107 let min_body_size = Self::BLOCK_LENGTH + GroupSizeEncoding::ENCODED_LENGTH;
108 if body.len() < min_body_size {
109 return Err(StreamDecodeError::BufferTooShort {
110 expected: MessageHeader::ENCODED_LENGTH + min_body_size,
111 actual: buf.len(),
112 });
113 }
114
115 let event_time_us = read_i64_le(body, 0)?;
116 let transact_time_us = read_i64_le(body, 8)?;
117 let price_exponent = read_i8(body, 16)?;
118 let qty_exponent = read_i8(body, 17)?;
119
120 let group_start = Self::BLOCK_LENGTH;
122 let group_size = GroupSizeEncoding::decode(&body[group_start..])?;
123 let num_trades = group_size.num_in_group as usize;
124 let trade_block_length = group_size.block_length as usize;
125
126 let trades_data_start = group_start + GroupSizeEncoding::ENCODED_LENGTH;
127 let trades_data_size = num_trades * trade_block_length;
128 let required_size = trades_data_start + trades_data_size + 1; if body.len() < required_size {
131 return Err(StreamDecodeError::BufferTooShort {
132 expected: MessageHeader::ENCODED_LENGTH + required_size,
133 actual: buf.len(),
134 });
135 }
136
137 let mut trades = Vec::with_capacity(num_trades);
138 let mut offset = trades_data_start;
139
140 for _ in 0..num_trades {
141 trades.push(Trade::decode(&body[offset..])?);
142 offset += trade_block_length;
143 }
144
145 let (symbol, _) = decode_var_string8(&body[offset..])?;
146
147 Ok(Self {
148 event_time_us,
149 transact_time_us,
150 price_exponent,
151 qty_exponent,
152 trades,
153 symbol,
154 })
155 }
156
157 #[inline]
159 #[must_use]
160 pub fn trade_price(&self, trade: &Trade) -> f64 {
161 super::mantissa_to_f64(trade.price_mantissa, self.price_exponent)
162 }
163
164 #[inline]
166 #[must_use]
167 pub fn trade_qty(&self, trade: &Trade) -> f64 {
168 super::mantissa_to_f64(trade.qty_mantissa, self.qty_exponent)
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use rstest::rstest;
175
176 use super::*;
177 use crate::common::sbe::stream::{STREAM_SCHEMA_ID, template_id};
178
179 fn make_valid_buffer(num_trades: usize) -> Vec<u8> {
180 let trade_block_len = 25u16;
181 let body_size = 18 + 6 + (num_trades * trade_block_len as usize) + 8; let mut buf = vec![0u8; 8 + body_size];
183
184 buf[0..2].copy_from_slice(&18u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::TRADES_STREAM_EVENT.to_le_bytes());
187 buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
188 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let body = &mut buf[8..];
192 body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&1000001i64.to_le_bytes()); body[16] = (-2i8) as u8; body[17] = (-8i8) as u8; body[18..20].copy_from_slice(&trade_block_len.to_le_bytes());
199 body[20..24].copy_from_slice(&(num_trades as u32).to_le_bytes());
200
201 let mut offset = 24;
203 for i in 0..num_trades {
204 body[offset..offset + 8].copy_from_slice(&(i as i64 + 1).to_le_bytes()); body[offset + 8..offset + 16].copy_from_slice(&4200000i64.to_le_bytes()); body[offset + 16..offset + 24].copy_from_slice(&100000000i64.to_le_bytes()); body[offset + 24] = u8::from(i % 2 == 0); offset += trade_block_len as usize;
209 }
210
211 body[offset] = 7;
213 body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
214
215 buf
216 }
217
218 #[rstest]
219 fn test_decode_valid_single_trade() {
220 let buf = make_valid_buffer(1);
221 let event = TradesStreamEvent::decode(&buf).unwrap();
222
223 assert_eq!(event.event_time_us, 1000000);
224 assert_eq!(event.transact_time_us, 1000001);
225 assert_eq!(event.trades.len(), 1);
226 assert_eq!(event.trades[0].id, 1);
227 assert!(event.trades[0].is_buyer_maker);
228 assert_eq!(event.symbol, "BTCUSDT");
229 }
230
231 #[rstest]
232 fn test_decode_valid_multiple_trades() {
233 let buf = make_valid_buffer(5);
234 let event = TradesStreamEvent::decode(&buf).unwrap();
235
236 assert_eq!(event.trades.len(), 5);
237 for (i, trade) in event.trades.iter().enumerate() {
238 assert_eq!(trade.id, i as i64 + 1);
239 }
240 }
241
242 #[rstest]
243 fn test_decode_truncated_trades() {
244 let mut buf = make_valid_buffer(3);
245 buf.truncate(50); let err = TradesStreamEvent::decode(&buf).unwrap_err();
247 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
248 }
249
250 #[rstest]
251 fn test_decode_wrong_schema() {
252 let mut buf = make_valid_buffer(1);
253 buf[4..6].copy_from_slice(&99u16.to_le_bytes());
254 let err = TradesStreamEvent::decode(&buf).unwrap_err();
255 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
256 }
257}