Skip to main content

nautilus_binance/futures/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for Binance Futures WebSocket JSON messages.
17
18use nautilus_core::nanos::UnixNanos;
19use nautilus_model::{
20    data::{
21        Bar, BarSpecification, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate,
22        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
23    },
24    enums::{
25        AggregationSource, AggressorSide, BarAggregation, BookAction, OrderSide, PriceType,
26        RecordFlag,
27    },
28    identifiers::TradeId,
29    instruments::{Instrument, InstrumentAny},
30    types::{Price, Quantity},
31};
32use rust_decimal::{Decimal, prelude::FromPrimitive};
33use ustr::Ustr;
34
35use super::{
36    error::{BinanceWsError, BinanceWsResult},
37    messages::{
38        BinanceFuturesAggTradeMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
39        BinanceFuturesKlineMsg, BinanceFuturesMarkPriceMsg, BinanceFuturesTradeMsg,
40    },
41};
42use crate::common::enums::{BinanceKlineInterval, BinanceWsEventType};
43
44/// Parses an aggregate trade message into a `TradeTick`.
45///
46/// # Errors
47///
48/// Returns an error if parsing fails.
49pub fn parse_agg_trade(
50    msg: &BinanceFuturesAggTradeMsg,
51    instrument: &InstrumentAny,
52    ts_init: UnixNanos,
53) -> BinanceWsResult<TradeTick> {
54    let instrument_id = instrument.id();
55    let price_precision = instrument.price_precision();
56    let size_precision = instrument.size_precision();
57
58    let price = msg
59        .price
60        .parse::<f64>()
61        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
62    let size = msg
63        .quantity
64        .parse::<f64>()
65        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
66
67    let aggressor_side = if msg.is_buyer_maker {
68        AggressorSide::Seller
69    } else {
70        AggressorSide::Buyer
71    };
72
73    let ts_event = UnixNanos::from(msg.trade_time as u64 * 1_000_000); // ms to ns
74    let trade_id = TradeId::new(msg.agg_trade_id.to_string());
75
76    Ok(TradeTick::new(
77        instrument_id,
78        Price::new(price, price_precision),
79        Quantity::new(size, size_precision),
80        aggressor_side,
81        trade_id,
82        ts_event,
83        ts_init,
84    ))
85}
86
87/// Parses a trade message into a `TradeTick`.
88///
89/// # Errors
90///
91/// Returns an error if parsing fails.
92pub fn parse_trade(
93    msg: &BinanceFuturesTradeMsg,
94    instrument: &InstrumentAny,
95    ts_init: UnixNanos,
96) -> BinanceWsResult<TradeTick> {
97    let instrument_id = instrument.id();
98    let price_precision = instrument.price_precision();
99    let size_precision = instrument.size_precision();
100
101    let price = msg
102        .price
103        .parse::<f64>()
104        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
105    let size = msg
106        .quantity
107        .parse::<f64>()
108        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
109
110    let aggressor_side = if msg.is_buyer_maker {
111        AggressorSide::Seller
112    } else {
113        AggressorSide::Buyer
114    };
115
116    let ts_event = UnixNanos::from(msg.trade_time as u64 * 1_000_000); // ms to ns
117    let trade_id = TradeId::new(msg.trade_id.to_string());
118
119    Ok(TradeTick::new(
120        instrument_id,
121        Price::new(price, price_precision),
122        Quantity::new(size, size_precision),
123        aggressor_side,
124        trade_id,
125        ts_event,
126        ts_init,
127    ))
128}
129
130/// Parses a book ticker message into a `QuoteTick`.
131///
132/// # Errors
133///
134/// Returns an error if parsing fails.
135pub fn parse_book_ticker(
136    msg: &BinanceFuturesBookTickerMsg,
137    instrument: &InstrumentAny,
138    ts_init: UnixNanos,
139) -> BinanceWsResult<QuoteTick> {
140    let instrument_id = instrument.id();
141    let price_precision = instrument.price_precision();
142    let size_precision = instrument.size_precision();
143
144    let bid_price = msg
145        .best_bid_price
146        .parse::<f64>()
147        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
148    let bid_size = msg
149        .best_bid_qty
150        .parse::<f64>()
151        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
152    let ask_price = msg
153        .best_ask_price
154        .parse::<f64>()
155        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
156    let ask_size = msg
157        .best_ask_qty
158        .parse::<f64>()
159        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
160
161    let ts_event = UnixNanos::from(msg.transaction_time as u64 * 1_000_000); // ms to ns
162
163    Ok(QuoteTick::new(
164        instrument_id,
165        Price::new(bid_price, price_precision),
166        Price::new(ask_price, price_precision),
167        Quantity::new(bid_size, size_precision),
168        Quantity::new(ask_size, size_precision),
169        ts_event,
170        ts_init,
171    ))
172}
173
174/// Parses a depth update message into `OrderBookDeltas`.
175///
176/// # Errors
177///
178/// Returns an error if parsing fails.
179pub fn parse_depth_update(
180    msg: &BinanceFuturesDepthUpdateMsg,
181    instrument: &InstrumentAny,
182    ts_init: UnixNanos,
183) -> BinanceWsResult<OrderBookDeltas> {
184    let instrument_id = instrument.id();
185    let price_precision = instrument.price_precision();
186    let size_precision = instrument.size_precision();
187
188    let ts_event = UnixNanos::from(msg.transaction_time as u64 * 1_000_000); // ms to ns
189
190    let mut deltas = Vec::with_capacity(msg.bids.len() + msg.asks.len());
191
192    // Process bids
193    for (i, bid) in msg.bids.iter().enumerate() {
194        let price = bid[0]
195            .parse::<f64>()
196            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
197        let size = bid[1]
198            .parse::<f64>()
199            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
200
201        let action = if size == 0.0 {
202            BookAction::Delete
203        } else {
204            BookAction::Update
205        };
206
207        let is_last = i == msg.bids.len() - 1 && msg.asks.is_empty();
208        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
209
210        let order = BookOrder::new(
211            OrderSide::Buy,
212            Price::new(price, price_precision),
213            Quantity::new(size, size_precision),
214            0,
215        );
216
217        deltas.push(OrderBookDelta::new(
218            instrument_id,
219            action,
220            order,
221            flags,
222            msg.final_update_id,
223            ts_event,
224            ts_init,
225        ));
226    }
227
228    // Process asks
229    for (i, ask) in msg.asks.iter().enumerate() {
230        let price = ask[0]
231            .parse::<f64>()
232            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
233        let size = ask[1]
234            .parse::<f64>()
235            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
236
237        let action = if size == 0.0 {
238            BookAction::Delete
239        } else {
240            BookAction::Update
241        };
242
243        let is_last = i == msg.asks.len() - 1;
244        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
245
246        let order = BookOrder::new(
247            OrderSide::Sell,
248            Price::new(price, price_precision),
249            Quantity::new(size, size_precision),
250            0,
251        );
252
253        deltas.push(OrderBookDelta::new(
254            instrument_id,
255            action,
256            order,
257            flags,
258            msg.final_update_id,
259            ts_event,
260            ts_init,
261        ));
262    }
263
264    Ok(OrderBookDeltas::new(instrument_id, deltas))
265}
266
267/// Parses a mark price message into `MarkPriceUpdate`, `IndexPriceUpdate`, and `FundingRateUpdate`.
268///
269/// # Errors
270///
271/// Returns an error if parsing fails.
272pub fn parse_mark_price(
273    msg: &BinanceFuturesMarkPriceMsg,
274    instrument: &InstrumentAny,
275    ts_init: UnixNanos,
276) -> BinanceWsResult<(MarkPriceUpdate, IndexPriceUpdate, FundingRateUpdate)> {
277    let instrument_id = instrument.id();
278    let price_precision = instrument.price_precision();
279
280    let mark_price = msg
281        .mark_price
282        .parse::<f64>()
283        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
284    let index_price = msg
285        .index_price
286        .parse::<f64>()
287        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
288    let funding_rate = msg
289        .funding_rate
290        .parse::<f64>()
291        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
292
293    let ts_event = UnixNanos::from(msg.event_time as u64 * 1_000_000); // ms to ns
294    let next_funding_ns = if msg.next_funding_time > 0 {
295        Some(UnixNanos::from(msg.next_funding_time as u64 * 1_000_000))
296    } else {
297        None
298    };
299
300    let mark_update = MarkPriceUpdate::new(
301        instrument_id,
302        Price::new(mark_price, price_precision),
303        ts_event,
304        ts_init,
305    );
306
307    let index_update = IndexPriceUpdate::new(
308        instrument_id,
309        Price::new(index_price, price_precision),
310        ts_event,
311        ts_init,
312    );
313
314    let funding_update = FundingRateUpdate::new(
315        instrument_id,
316        Decimal::from_f64(funding_rate).unwrap_or_default(),
317        next_funding_ns,
318        ts_event,
319        ts_init,
320    );
321
322    Ok((mark_update, index_update, funding_update))
323}
324
325/// Converts a Binance kline interval to a Nautilus `BarSpecification`.
326fn interval_to_bar_spec(interval: BinanceKlineInterval) -> BarSpecification {
327    match interval {
328        BinanceKlineInterval::Second1 => {
329            BarSpecification::new(1, BarAggregation::Second, PriceType::Last)
330        }
331        BinanceKlineInterval::Minute1 => {
332            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
333        }
334        BinanceKlineInterval::Minute3 => {
335            BarSpecification::new(3, BarAggregation::Minute, PriceType::Last)
336        }
337        BinanceKlineInterval::Minute5 => {
338            BarSpecification::new(5, BarAggregation::Minute, PriceType::Last)
339        }
340        BinanceKlineInterval::Minute15 => {
341            BarSpecification::new(15, BarAggregation::Minute, PriceType::Last)
342        }
343        BinanceKlineInterval::Minute30 => {
344            BarSpecification::new(30, BarAggregation::Minute, PriceType::Last)
345        }
346        BinanceKlineInterval::Hour1 => {
347            BarSpecification::new(1, BarAggregation::Hour, PriceType::Last)
348        }
349        BinanceKlineInterval::Hour2 => {
350            BarSpecification::new(2, BarAggregation::Hour, PriceType::Last)
351        }
352        BinanceKlineInterval::Hour4 => {
353            BarSpecification::new(4, BarAggregation::Hour, PriceType::Last)
354        }
355        BinanceKlineInterval::Hour6 => {
356            BarSpecification::new(6, BarAggregation::Hour, PriceType::Last)
357        }
358        BinanceKlineInterval::Hour8 => {
359            BarSpecification::new(8, BarAggregation::Hour, PriceType::Last)
360        }
361        BinanceKlineInterval::Hour12 => {
362            BarSpecification::new(12, BarAggregation::Hour, PriceType::Last)
363        }
364        BinanceKlineInterval::Day1 => {
365            BarSpecification::new(1, BarAggregation::Day, PriceType::Last)
366        }
367        BinanceKlineInterval::Day3 => {
368            BarSpecification::new(3, BarAggregation::Day, PriceType::Last)
369        }
370        BinanceKlineInterval::Week1 => {
371            BarSpecification::new(1, BarAggregation::Week, PriceType::Last)
372        }
373        BinanceKlineInterval::Month1 => {
374            BarSpecification::new(1, BarAggregation::Month, PriceType::Last)
375        }
376    }
377}
378
379/// Parses a kline message into a `Bar`.
380///
381/// Returns `None` if the kline is not closed yet.
382///
383/// # Errors
384///
385/// Returns an error if parsing fails.
386pub fn parse_kline(
387    msg: &BinanceFuturesKlineMsg,
388    instrument: &InstrumentAny,
389    ts_init: UnixNanos,
390) -> BinanceWsResult<Option<Bar>> {
391    // Only emit bars when the kline is closed
392    if !msg.kline.is_closed {
393        return Ok(None);
394    }
395
396    let instrument_id = instrument.id();
397    let price_precision = instrument.price_precision();
398    let size_precision = instrument.size_precision();
399
400    let spec = interval_to_bar_spec(msg.kline.interval);
401    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
402
403    let open = msg
404        .kline
405        .open
406        .parse::<f64>()
407        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
408    let high = msg
409        .kline
410        .high
411        .parse::<f64>()
412        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
413    let low = msg
414        .kline
415        .low
416        .parse::<f64>()
417        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
418    let close = msg
419        .kline
420        .close
421        .parse::<f64>()
422        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
423    let volume = msg
424        .kline
425        .volume
426        .parse::<f64>()
427        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
428
429    // Use the kline close time as the event timestamp
430    let ts_event = UnixNanos::from(msg.kline.close_time as u64 * 1_000_000); // ms to ns
431
432    let bar = Bar::new(
433        bar_type,
434        Price::new(open, price_precision),
435        Price::new(high, price_precision),
436        Price::new(low, price_precision),
437        Price::new(close, price_precision),
438        Quantity::new(volume, size_precision),
439        ts_event,
440        ts_init,
441    );
442
443    Ok(Some(bar))
444}
445
446/// Extracts the symbol from a raw JSON message.
447pub fn extract_symbol(json: &serde_json::Value) -> Option<Ustr> {
448    json.get("s").and_then(|v| v.as_str()).map(Ustr::from)
449}
450
451/// Extracts the event type from a raw JSON message.
452pub fn extract_event_type(json: &serde_json::Value) -> Option<BinanceWsEventType> {
453    json.get("e")
454        .and_then(|v| serde_json::from_value(v.clone()).ok())
455}