Skip to main content

nautilus_binance/spot/websocket/streams/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for Binance Spot WebSocket SBE messages.
17
18use nautilus_core::nanos::UnixNanos;
19use nautilus_model::{
20    data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
21    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
22    identifiers::TradeId,
23    instruments::{Instrument, InstrumentAny},
24};
25
26use crate::common::{
27    fixed::{mantissa_to_price, mantissa_to_quantity},
28    sbe::stream::{
29        BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
30        StreamDecodeError, TradesStreamEvent, template_id,
31    },
32};
33
34/// Decoded market data message.
35#[derive(Debug)]
36pub enum MarketDataMessage {
37    /// Trade event.
38    Trades(TradesStreamEvent),
39    /// Best bid/ask update.
40    BestBidAsk(BestBidAskStreamEvent),
41    /// Order book snapshot.
42    DepthSnapshot(DepthSnapshotStreamEvent),
43    /// Order book diff update.
44    DepthDiff(DepthDiffStreamEvent),
45}
46
47/// Decode an SBE binary frame into a market data message.
48///
49/// Validates the message header (including schema ID) and routes to the
50/// appropriate decoder based on template ID.
51///
52/// # Errors
53///
54/// Returns an error if the buffer is too short, schema validation fails,
55/// or the template ID is unknown.
56pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
57    let header = MessageHeader::decode(buf)?;
58    header.validate_schema()?;
59
60    match header.template_id {
61        template_id::TRADES_STREAM_EVENT => {
62            Ok(MarketDataMessage::Trades(TradesStreamEvent::decode(buf)?))
63        }
64        template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
65            BestBidAskStreamEvent::decode(buf)?,
66        )),
67        template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
68            DepthSnapshotStreamEvent::decode(buf)?,
69        )),
70        template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
71            DepthDiffStreamEvent::decode(buf)?,
72        )),
73        _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
74    }
75}
76
77/// Parses a trades stream event into a vector of `TradeTick`.
78pub fn parse_trades_event(event: &TradesStreamEvent, instrument: &InstrumentAny) -> Vec<Data> {
79    let instrument_id = instrument.id();
80    let price_precision = instrument.price_precision();
81    let size_precision = instrument.size_precision();
82
83    event
84        .trades
85        .iter()
86        .map(|t| {
87            let price = mantissa_to_price(t.price_mantissa, event.price_exponent, price_precision);
88            let size = mantissa_to_quantity(t.qty_mantissa, event.qty_exponent, size_precision);
89            let ts_event = UnixNanos::from(event.transact_time_us as u64 * 1000); // us to ns
90
91            let trade = TradeTick::new(
92                instrument_id,
93                price,
94                size,
95                if t.is_buyer_maker {
96                    AggressorSide::Seller
97                } else {
98                    AggressorSide::Buyer
99                },
100                TradeId::new(t.id.to_string()),
101                ts_event,
102                ts_event,
103            );
104            Data::from(trade)
105        })
106        .collect()
107}
108
109/// Parses a best bid/ask event into a `QuoteTick`.
110pub fn parse_bbo_event(event: &BestBidAskStreamEvent, instrument: &InstrumentAny) -> QuoteTick {
111    let instrument_id = instrument.id();
112    let price_precision = instrument.price_precision();
113    let size_precision = instrument.size_precision();
114
115    let bid_price = mantissa_to_price(
116        event.bid_price_mantissa,
117        event.price_exponent,
118        price_precision,
119    );
120    let bid_size = mantissa_to_quantity(event.bid_qty_mantissa, event.qty_exponent, size_precision);
121    let ask_price = mantissa_to_price(
122        event.ask_price_mantissa,
123        event.price_exponent,
124        price_precision,
125    );
126    let ask_size = mantissa_to_quantity(event.ask_qty_mantissa, event.qty_exponent, size_precision);
127    let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000); // us to ns
128
129    QuoteTick::new(
130        instrument_id,
131        bid_price,
132        ask_price,
133        bid_size,
134        ask_size,
135        ts_event,
136        ts_event,
137    )
138}
139
140/// Parses a depth snapshot event into `OrderBookDeltas`.
141///
142/// Returns `None` if the snapshot contains no levels.
143pub fn parse_depth_snapshot(
144    event: &DepthSnapshotStreamEvent,
145    instrument: &InstrumentAny,
146) -> Option<OrderBookDeltas> {
147    let instrument_id = instrument.id();
148    let price_precision = instrument.price_precision();
149    let size_precision = instrument.size_precision();
150    let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
151
152    let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len() + 1);
153
154    // Add clear delta first
155    deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event));
156
157    // Add bid levels
158    for (i, level) in event.bids.iter().enumerate() {
159        let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
160        let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
161        let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
162            RecordFlag::F_LAST as u8
163        } else {
164            0
165        };
166
167        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
168
169        deltas.push(OrderBookDelta::new(
170            instrument_id,
171            BookAction::Add,
172            order,
173            flags,
174            0,
175            ts_event,
176            ts_event,
177        ));
178    }
179
180    // Add ask levels
181    for (i, level) in event.asks.iter().enumerate() {
182        let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
183        let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
184        let flags = if i == event.asks.len() - 1 {
185            RecordFlag::F_LAST as u8
186        } else {
187            0
188        };
189
190        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
191
192        deltas.push(OrderBookDelta::new(
193            instrument_id,
194            BookAction::Add,
195            order,
196            flags,
197            0,
198            ts_event,
199            ts_event,
200        ));
201    }
202
203    if deltas.len() <= 1 {
204        return None;
205    }
206
207    Some(OrderBookDeltas::new(instrument_id, deltas))
208}
209
210/// Parses a depth diff event into `OrderBookDeltas`.
211///
212/// Returns `None` if the diff contains no updates.
213pub fn parse_depth_diff(
214    event: &DepthDiffStreamEvent,
215    instrument: &InstrumentAny,
216) -> Option<OrderBookDeltas> {
217    let instrument_id = instrument.id();
218    let price_precision = instrument.price_precision();
219    let size_precision = instrument.size_precision();
220    let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
221
222    let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len());
223
224    // Add bid updates
225    for (i, level) in event.bids.iter().enumerate() {
226        let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
227        let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
228
229        // Zero size means delete, otherwise update
230        let action = if level.qty_mantissa == 0 {
231            BookAction::Delete
232        } else {
233            BookAction::Update
234        };
235
236        let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
237            RecordFlag::F_LAST as u8
238        } else {
239            0
240        };
241
242        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
243
244        deltas.push(OrderBookDelta::new(
245            instrument_id,
246            action,
247            order,
248            flags,
249            0,
250            ts_event,
251            ts_event,
252        ));
253    }
254
255    // Add ask updates
256    for (i, level) in event.asks.iter().enumerate() {
257        let price = mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
258        let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
259
260        let action = if level.qty_mantissa == 0 {
261            BookAction::Delete
262        } else {
263            BookAction::Update
264        };
265
266        let flags = if i == event.asks.len() - 1 {
267            RecordFlag::F_LAST as u8
268        } else {
269            0
270        };
271
272        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
273
274        deltas.push(OrderBookDelta::new(
275            instrument_id,
276            action,
277            order,
278            flags,
279            0,
280            ts_event,
281            ts_event,
282        ));
283    }
284
285    if deltas.is_empty() {
286        return None;
287    }
288
289    Some(OrderBookDeltas::new(instrument_id, deltas))
290}
291
292#[cfg(test)]
293mod tests {
294    use rstest::rstest;
295
296    use super::*;
297    use crate::common::sbe::stream::STREAM_SCHEMA_ID;
298
299    #[rstest]
300    fn test_decode_empty_buffer() {
301        let err = decode_market_data(&[]).unwrap_err();
302        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
303    }
304
305    #[rstest]
306    fn test_decode_short_buffer() {
307        let buf = [0u8; 5];
308        let err = decode_market_data(&buf).unwrap_err();
309        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
310    }
311
312    #[rstest]
313    fn test_decode_wrong_schema() {
314        let mut buf = [0u8; 100];
315        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
316        buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
317        buf[4..6].copy_from_slice(&99u16.to_le_bytes()); // Wrong schema
318        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
319
320        let err = decode_market_data(&buf).unwrap_err();
321        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
322    }
323
324    #[rstest]
325    fn test_decode_unknown_template() {
326        let mut buf = [0u8; 100];
327        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
328        buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); // Unknown template
329        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
330        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
331
332        let err = decode_market_data(&buf).unwrap_err();
333        assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
334    }
335}