nautilus_binance/spot/websocket/streams/
parse.rs1use nautilus_core::nanos::UnixNanos;
19use nautilus_model::{
20 data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
21 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
22 identifiers::TradeId,
23 instruments::{Instrument, InstrumentAny},
24};
25
26use crate::common::{
27 fixed::{mantissa_to_price, mantissa_to_quantity},
28 sbe::stream::{
29 BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
30 StreamDecodeError, TradesStreamEvent, template_id,
31 },
32};
33
34#[derive(Debug)]
36pub enum MarketDataMessage {
37 Trades(TradesStreamEvent),
39 BestBidAsk(BestBidAskStreamEvent),
41 DepthSnapshot(DepthSnapshotStreamEvent),
43 DepthDiff(DepthDiffStreamEvent),
45}
46
47pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
57 let header = MessageHeader::decode(buf)?;
58 header.validate_schema()?;
59
60 match header.template_id {
61 template_id::TRADES_STREAM_EVENT => {
62 Ok(MarketDataMessage::Trades(TradesStreamEvent::decode(buf)?))
63 }
64 template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
65 BestBidAskStreamEvent::decode(buf)?,
66 )),
67 template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
68 DepthSnapshotStreamEvent::decode(buf)?,
69 )),
70 template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
71 DepthDiffStreamEvent::decode(buf)?,
72 )),
73 _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
74 }
75}
76
77pub fn parse_trades_event(event: &TradesStreamEvent, instrument: &InstrumentAny) -> Vec<Data> {
79 let instrument_id = instrument.id();
80 let price_precision = instrument.price_precision();
81 let size_precision = instrument.size_precision();
82
83 event
84 .trades
85 .iter()
86 .map(|t| {
87 let price = mantissa_to_price(t.price_mantissa, event.price_exponent, price_precision);
88 let size = mantissa_to_quantity(t.qty_mantissa, event.qty_exponent, size_precision);
89 let ts_event = UnixNanos::from(event.transact_time_us as u64 * 1000); let trade = TradeTick::new(
92 instrument_id,
93 price,
94 size,
95 if t.is_buyer_maker {
96 AggressorSide::Seller
97 } else {
98 AggressorSide::Buyer
99 },
100 TradeId::new(t.id.to_string()),
101 ts_event,
102 ts_event,
103 );
104 Data::from(trade)
105 })
106 .collect()
107}
108
109pub fn parse_bbo_event(event: &BestBidAskStreamEvent, instrument: &InstrumentAny) -> QuoteTick {
111 let instrument_id = instrument.id();
112 let price_precision = instrument.price_precision();
113 let size_precision = instrument.size_precision();
114
115 let bid_price = mantissa_to_price(
116 event.bid_price_mantissa,
117 event.price_exponent,
118 price_precision,
119 );
120 let bid_size = mantissa_to_quantity(event.bid_qty_mantissa, event.qty_exponent, size_precision);
121 let ask_price = mantissa_to_price(
122 event.ask_price_mantissa,
123 event.price_exponent,
124 price_precision,
125 );
126 let ask_size = mantissa_to_quantity(event.ask_qty_mantissa, event.qty_exponent, size_precision);
127 let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000); QuoteTick::new(
130 instrument_id,
131 bid_price,
132 ask_price,
133 bid_size,
134 ask_size,
135 ts_event,
136 ts_event,
137 )
138}
139
140pub fn parse_depth_snapshot(
144 event: &DepthSnapshotStreamEvent,
145 instrument: &InstrumentAny,
146) -> Option<OrderBookDeltas> {
147 let instrument_id = instrument.id();
148 let price_precision = instrument.price_precision();
149 let size_precision = instrument.size_precision();
150 let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
151
152 let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len() + 1);
153
154 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event));
156
157 for (i, level) in event.bids.iter().enumerate() {
159 let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
160 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
161 let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
162 RecordFlag::F_LAST as u8
163 } else {
164 0
165 };
166
167 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
168
169 deltas.push(OrderBookDelta::new(
170 instrument_id,
171 BookAction::Add,
172 order,
173 flags,
174 0,
175 ts_event,
176 ts_event,
177 ));
178 }
179
180 for (i, level) in event.asks.iter().enumerate() {
182 let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
183 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
184 let flags = if i == event.asks.len() - 1 {
185 RecordFlag::F_LAST as u8
186 } else {
187 0
188 };
189
190 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
191
192 deltas.push(OrderBookDelta::new(
193 instrument_id,
194 BookAction::Add,
195 order,
196 flags,
197 0,
198 ts_event,
199 ts_event,
200 ));
201 }
202
203 if deltas.len() <= 1 {
204 return None;
205 }
206
207 Some(OrderBookDeltas::new(instrument_id, deltas))
208}
209
210pub fn parse_depth_diff(
214 event: &DepthDiffStreamEvent,
215 instrument: &InstrumentAny,
216) -> Option<OrderBookDeltas> {
217 let instrument_id = instrument.id();
218 let price_precision = instrument.price_precision();
219 let size_precision = instrument.size_precision();
220 let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
221
222 let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len());
223
224 for (i, level) in event.bids.iter().enumerate() {
226 let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
227 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
228
229 let action = if level.qty_mantissa == 0 {
231 BookAction::Delete
232 } else {
233 BookAction::Update
234 };
235
236 let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
237 RecordFlag::F_LAST as u8
238 } else {
239 0
240 };
241
242 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
243
244 deltas.push(OrderBookDelta::new(
245 instrument_id,
246 action,
247 order,
248 flags,
249 0,
250 ts_event,
251 ts_event,
252 ));
253 }
254
255 for (i, level) in event.asks.iter().enumerate() {
257 let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
258 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
259
260 let action = if level.qty_mantissa == 0 {
261 BookAction::Delete
262 } else {
263 BookAction::Update
264 };
265
266 let flags = if i == event.asks.len() - 1 {
267 RecordFlag::F_LAST as u8
268 } else {
269 0
270 };
271
272 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
273
274 deltas.push(OrderBookDelta::new(
275 instrument_id,
276 action,
277 order,
278 flags,
279 0,
280 ts_event,
281 ts_event,
282 ));
283 }
284
285 if deltas.is_empty() {
286 return None;
287 }
288
289 Some(OrderBookDeltas::new(instrument_id, deltas))
290}
291
292#[cfg(test)]
293mod tests {
294 use rstest::rstest;
295
296 use super::*;
297 use crate::common::sbe::stream::STREAM_SCHEMA_ID;
298
299 #[rstest]
300 fn test_decode_empty_buffer() {
301 let err = decode_market_data(&[]).unwrap_err();
302 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
303 }
304
305 #[rstest]
306 fn test_decode_short_buffer() {
307 let buf = [0u8; 5];
308 let err = decode_market_data(&buf).unwrap_err();
309 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
310 }
311
312 #[rstest]
313 fn test_decode_wrong_schema() {
314 let mut buf = [0u8; 100];
315 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
317 buf[4..6].copy_from_slice(&99u16.to_le_bytes()); buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
321 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
322 }
323
324 #[rstest]
325 fn test_decode_unknown_template() {
326 let mut buf = [0u8; 100];
327 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
330 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
333 assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
334 }
335}