nautilus_tardis/machine/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::{
23        Bar, BarType, BookOrder, Data, FundingRateUpdate, OrderBookDelta, OrderBookDeltas,
24        OrderBookDeltas_API, QuoteTick, TradeTick,
25    },
26    enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27    identifiers::{InstrumentId, TradeId},
28    types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33    message::{
34        BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35    },
36    types::TardisInstrumentMiniInfo,
37};
38use crate::parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action};
39
40#[must_use]
41pub fn parse_tardis_ws_message(
42    msg: WsMessage,
43    info: Arc<TardisInstrumentMiniInfo>,
44) -> Option<Data> {
45    match msg {
46        WsMessage::BookChange(msg) => {
47            if msg.bids.is_empty() && msg.asks.is_empty() {
48                tracing::error!(
49                    "Invalid book change for {} {} (empty bids and asks)",
50                    msg.exchange,
51                    msg.symbol
52                );
53                return None;
54            }
55
56            match parse_book_change_msg_as_deltas(
57                msg,
58                info.price_precision,
59                info.size_precision,
60                info.instrument_id,
61            ) {
62                Ok(deltas) => Some(Data::Deltas(deltas)),
63                Err(e) => {
64                    tracing::error!("Failed to parse book change message: {e}");
65                    None
66                }
67            }
68        }
69        WsMessage::BookSnapshot(msg) => match msg.bids.len() {
70            1 => {
71                match parse_book_snapshot_msg_as_quote(
72                    msg,
73                    info.price_precision,
74                    info.size_precision,
75                    info.instrument_id,
76                ) {
77                    Ok(quote) => Some(Data::Quote(quote)),
78                    Err(e) => {
79                        tracing::error!("Failed to parse book snapshot quote message: {e}");
80                        None
81                    }
82                }
83            }
84            _ => {
85                match parse_book_snapshot_msg_as_deltas(
86                    msg,
87                    info.price_precision,
88                    info.size_precision,
89                    info.instrument_id,
90                ) {
91                    Ok(deltas) => Some(Data::Deltas(deltas)),
92                    Err(e) => {
93                        tracing::error!("Failed to parse book snapshot message: {e}");
94                        None
95                    }
96                }
97            }
98        },
99        WsMessage::Trade(msg) => {
100            match parse_trade_msg(
101                msg,
102                info.price_precision,
103                info.size_precision,
104                info.instrument_id,
105            ) {
106                Ok(trade) => Some(Data::Trade(trade)),
107                Err(e) => {
108                    tracing::error!("Failed to parse trade message: {e}");
109                    None
110                }
111            }
112        }
113        WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
114            msg,
115            info.price_precision,
116            info.size_precision,
117            info.instrument_id,
118        ))),
119        // Derivative ticker messages are handled through a separate callback path
120        // for FundingRateUpdate since they're not part of the Data enum.
121        WsMessage::DerivativeTicker(_) => None,
122        WsMessage::Disconnect(_) => None,
123    }
124}
125
126/// Parse a Tardis WebSocket message specifically for funding rate updates.
127/// Returns `Some(FundingRateUpdate)` if the message contains funding rate data, `None` otherwise.
128#[must_use]
129pub fn parse_tardis_ws_message_funding_rate(
130    msg: WsMessage,
131    info: Arc<TardisInstrumentMiniInfo>,
132) -> Option<FundingRateUpdate> {
133    match msg {
134        WsMessage::DerivativeTicker(msg) => {
135            match parse_derivative_ticker_msg(msg, info.instrument_id) {
136                Ok(funding_rate) => funding_rate,
137                Err(e) => {
138                    tracing::error!(
139                        "Failed to parse derivative ticker message for funding rate: {e}"
140                    );
141                    None
142                }
143            }
144        }
145        _ => None, // Only derivative ticker messages can contain funding rates
146    }
147}
148
149/// Parse a book change message into order book deltas, returning an error if timestamps invalid.
150/// Parse a book change message into order book deltas.
151///
152/// # Errors
153///
154/// Returns an error if timestamp fields cannot be converted to nanoseconds.
155pub fn parse_book_change_msg_as_deltas(
156    msg: BookChangeMsg,
157    price_precision: u8,
158    size_precision: u8,
159    instrument_id: InstrumentId,
160) -> anyhow::Result<OrderBookDeltas_API> {
161    parse_book_msg_as_deltas(
162        msg.bids,
163        msg.asks,
164        msg.is_snapshot,
165        price_precision,
166        size_precision,
167        instrument_id,
168        msg.timestamp,
169        msg.local_timestamp,
170    )
171}
172
173/// Parse a book snapshot message into order book deltas, returning an error if timestamps invalid.
174/// Parse a book snapshot message into order book deltas.
175///
176/// # Errors
177///
178/// Returns an error if timestamp fields cannot be converted to nanoseconds.
179pub fn parse_book_snapshot_msg_as_deltas(
180    msg: BookSnapshotMsg,
181    price_precision: u8,
182    size_precision: u8,
183    instrument_id: InstrumentId,
184) -> anyhow::Result<OrderBookDeltas_API> {
185    parse_book_msg_as_deltas(
186        msg.bids,
187        msg.asks,
188        true,
189        price_precision,
190        size_precision,
191        instrument_id,
192        msg.timestamp,
193        msg.local_timestamp,
194    )
195}
196
197/// Parse raw book levels into order book deltas, returning error for invalid timestamps.
198#[allow(clippy::too_many_arguments)]
199/// Parse raw book levels into order book deltas.
200///
201/// # Errors
202///
203/// Returns an error if timestamp fields cannot be converted to nanoseconds.
204pub fn parse_book_msg_as_deltas(
205    bids: Vec<BookLevel>,
206    asks: Vec<BookLevel>,
207    is_snapshot: bool,
208    price_precision: u8,
209    size_precision: u8,
210    instrument_id: InstrumentId,
211    timestamp: DateTime<Utc>,
212    local_timestamp: DateTime<Utc>,
213) -> anyhow::Result<OrderBookDeltas_API> {
214    let event_nanos = timestamp
215        .timestamp_nanos_opt()
216        .context("invalid timestamp: cannot extract event nanoseconds")?;
217    let ts_event = UnixNanos::from(event_nanos as u64);
218    let init_nanos = local_timestamp
219        .timestamp_nanos_opt()
220        .context("invalid timestamp: cannot extract init nanoseconds")?;
221    let ts_init = UnixNanos::from(init_nanos as u64);
222
223    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
224
225    for level in bids {
226        deltas.push(parse_book_level(
227            instrument_id,
228            price_precision,
229            size_precision,
230            OrderSide::Buy,
231            level,
232            is_snapshot,
233            ts_event,
234            ts_init,
235        ));
236    }
237
238    for level in asks {
239        deltas.push(parse_book_level(
240            instrument_id,
241            price_precision,
242            size_precision,
243            OrderSide::Sell,
244            level,
245            is_snapshot,
246            ts_event,
247            ts_init,
248        ));
249    }
250
251    if let Some(last_delta) = deltas.last_mut() {
252        last_delta.flags += RecordFlag::F_LAST.value();
253    }
254
255    // TODO: Opaque pointer wrapper necessary for Cython (remove once Cython gone)
256    Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
257        instrument_id,
258        deltas,
259    )))
260}
261
262#[must_use]
263/// Parse a single book level into an order book delta.
264///
265/// # Panics
266///
267/// Panics if a non-delete action has a zero size after normalization.
268#[allow(clippy::too_many_arguments)]
269pub fn parse_book_level(
270    instrument_id: InstrumentId,
271    price_precision: u8,
272    size_precision: u8,
273    side: OrderSide,
274    level: BookLevel,
275    is_snapshot: bool,
276    ts_event: UnixNanos,
277    ts_init: UnixNanos,
278) -> OrderBookDelta {
279    let amount = normalize_amount(level.amount, size_precision);
280    let action = parse_book_action(is_snapshot, amount);
281    let price = Price::new(level.price, price_precision);
282    let size = Quantity::new(amount, size_precision);
283    let order_id = 0; // Not applicable for L2 data
284    let order = BookOrder::new(side, price, size, order_id);
285    let flags = if is_snapshot {
286        RecordFlag::F_SNAPSHOT.value()
287    } else {
288        0
289    };
290    let sequence = 0; // Not available
291
292    assert!(
293        !(action != BookAction::Delete && size.is_zero()),
294        "Invalid zero size for {action}"
295    );
296
297    OrderBookDelta::new(
298        instrument_id,
299        action,
300        order,
301        flags,
302        sequence,
303        ts_event,
304        ts_init,
305    )
306}
307
308/// Parse a book snapshot message into a quote tick, returning an error on invalid data.
309/// Parse a book snapshot message into a quote tick.
310///
311/// # Errors
312///
313/// Returns an error if missing bid/ask levels or invalid sizes.
314pub fn parse_book_snapshot_msg_as_quote(
315    msg: BookSnapshotMsg,
316    price_precision: u8,
317    size_precision: u8,
318    instrument_id: InstrumentId,
319) -> anyhow::Result<QuoteTick> {
320    let ts_event = UnixNanos::from(msg.timestamp);
321    let ts_init = UnixNanos::from(msg.local_timestamp);
322
323    let best_bid = msg
324        .bids
325        .first()
326        .context("missing best bid level for quote message")?;
327    let bid_price = Price::new(best_bid.price, price_precision);
328    let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
329        .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
330
331    let best_ask = msg
332        .asks
333        .first()
334        .context("missing best ask level for quote message")?;
335    let ask_price = Price::new(best_ask.price, price_precision);
336    let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
337        .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
338
339    Ok(QuoteTick::new(
340        instrument_id,
341        bid_price,
342        ask_price,
343        bid_size,
344        ask_size,
345        ts_event,
346        ts_init,
347    ))
348}
349
350/// Parse a trade message into a trade tick, returning an error on invalid data.
351/// Parse a trade message into a trade tick.
352///
353/// # Errors
354///
355/// Returns an error if invalid trade size is encountered.
356pub fn parse_trade_msg(
357    msg: TradeMsg,
358    price_precision: u8,
359    size_precision: u8,
360    instrument_id: InstrumentId,
361) -> anyhow::Result<TradeTick> {
362    let price = Price::new(msg.price, price_precision);
363    let size = Quantity::non_zero_checked(msg.amount, size_precision)
364        .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
365    let aggressor_side = parse_aggressor_side(&msg.side);
366    let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
367    let ts_event = UnixNanos::from(msg.timestamp);
368    let ts_init = UnixNanos::from(msg.local_timestamp);
369
370    Ok(TradeTick::new(
371        instrument_id,
372        price,
373        size,
374        aggressor_side,
375        trade_id,
376        ts_event,
377        ts_init,
378    ))
379}
380
381#[must_use]
382pub fn parse_bar_msg(
383    msg: BarMsg,
384    price_precision: u8,
385    size_precision: u8,
386    instrument_id: InstrumentId,
387) -> Bar {
388    let spec = parse_bar_spec(&msg.name);
389    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
390
391    let open = Price::new(msg.open, price_precision);
392    let high = Price::new(msg.high, price_precision);
393    let low = Price::new(msg.low, price_precision);
394    let close = Price::new(msg.close, price_precision);
395    let volume = Quantity::non_zero(msg.volume, size_precision);
396    let ts_event = UnixNanos::from(msg.timestamp);
397    let ts_init = UnixNanos::from(msg.local_timestamp);
398
399    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
400}
401
402/// Parse a derivative ticker message into a funding rate update.
403///
404/// # Errors
405///
406/// Returns an error if timestamp fields cannot be converted to nanoseconds or decimal conversion fails.
407pub fn parse_derivative_ticker_msg(
408    msg: DerivativeTickerMsg,
409    instrument_id: InstrumentId,
410) -> anyhow::Result<Option<FundingRateUpdate>> {
411    // Only process if we have funding rate data
412    let funding_rate = match msg.funding_rate {
413        Some(rate) => rate,
414        None => return Ok(None), // No funding rate data
415    };
416
417    let ts_event = msg
418        .timestamp
419        .timestamp_nanos_opt()
420        .context("invalid timestamp: cannot extract event nanoseconds")?;
421    let ts_event = UnixNanos::from(ts_event as u64);
422
423    let ts_init = msg
424        .local_timestamp
425        .timestamp_nanos_opt()
426        .context("invalid timestamp: cannot extract init nanoseconds")?;
427    let ts_init = UnixNanos::from(ts_init as u64);
428
429    let rate = rust_decimal::Decimal::try_from(funding_rate)
430        .with_context(|| format!("Failed to convert funding rate {funding_rate} to Decimal"))?
431        .normalize();
432
433    // For live data, we don't typically have funding timestamp info from derivative ticker
434    let next_funding_ns = None;
435
436    Ok(Some(FundingRateUpdate::new(
437        instrument_id,
438        rate,
439        next_funding_ns,
440        ts_event,
441        ts_init,
442    )))
443}
444
445////////////////////////////////////////////////////////////////////////////////
446// Tests
447////////////////////////////////////////////////////////////////////////////////
448
449#[cfg(test)]
450mod tests {
451    use nautilus_model::enums::{AggressorSide, BookAction};
452    use rstest::rstest;
453
454    use super::*;
455    use crate::tests::load_test_json;
456
457    #[rstest]
458    fn test_parse_book_change_message() {
459        let json_data = load_test_json("book_change.json");
460        let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
461
462        let price_precision = 0;
463        let size_precision = 0;
464        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
465        let deltas =
466            parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
467                .unwrap();
468
469        assert_eq!(deltas.deltas.len(), 1);
470        assert_eq!(deltas.instrument_id, instrument_id);
471        assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
472        assert_eq!(deltas.sequence, 0);
473        assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
474        assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
475        assert_eq!(
476            deltas.deltas[0].instrument_id,
477            InstrumentId::from("XBTUSD.BITMEX")
478        );
479        assert_eq!(deltas.deltas[0].action, BookAction::Update);
480        assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
481        assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
482        assert_eq!(deltas.deltas[0].order.order_id, 0);
483        assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
484        assert_eq!(deltas.deltas[0].sequence, 0);
485        assert_eq!(
486            deltas.deltas[0].ts_event,
487            UnixNanos::from(1571830193469000000)
488        );
489        assert_eq!(
490            deltas.deltas[0].ts_init,
491            UnixNanos::from(1571830193469000000)
492        );
493    }
494
495    #[rstest]
496    fn test_parse_book_snapshot_message_as_deltas() {
497        let json_data = load_test_json("book_snapshot.json");
498        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
499
500        let price_precision = 1;
501        let size_precision = 0;
502        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
503        let deltas =
504            parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
505                .unwrap();
506        let delta_0 = deltas.deltas[0];
507        let delta_2 = deltas.deltas[2];
508
509        assert_eq!(deltas.deltas.len(), 4);
510        assert_eq!(deltas.instrument_id, instrument_id);
511        assert_eq!(
512            deltas.flags,
513            RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
514        );
515        assert_eq!(deltas.sequence, 0);
516        assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
517        assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
518        assert_eq!(delta_0.instrument_id, instrument_id);
519        assert_eq!(delta_0.action, BookAction::Add);
520        assert_eq!(delta_0.order.side, OrderSide::Buy);
521        assert_eq!(delta_0.order.price, Price::from("7633.5"));
522        assert_eq!(delta_0.order.size, Quantity::from(1906067));
523        assert_eq!(delta_0.order.order_id, 0);
524        assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
525        assert_eq!(delta_0.sequence, 0);
526        assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
527        assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
528        assert_eq!(delta_2.instrument_id, instrument_id);
529        assert_eq!(delta_2.action, BookAction::Add);
530        assert_eq!(delta_2.order.side, OrderSide::Sell);
531        assert_eq!(delta_2.order.price, Price::from("7634.0"));
532        assert_eq!(delta_2.order.size, Quantity::from(1467849));
533        assert_eq!(delta_2.order.order_id, 0);
534        assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
535        assert_eq!(delta_2.sequence, 0);
536        assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
537        assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
538    }
539
540    #[rstest]
541    fn test_parse_book_snapshot_message_as_quote() {
542        let json_data = load_test_json("book_snapshot.json");
543        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
544
545        let price_precision = 1;
546        let size_precision = 0;
547        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
548        let quote =
549            parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
550                .expect("Failed to parse book snapshot quote message");
551
552        assert_eq!(quote.instrument_id, instrument_id);
553        assert_eq!(quote.bid_price, Price::from("7633.5"));
554        assert_eq!(quote.bid_size, Quantity::from(1906067));
555        assert_eq!(quote.ask_price, Price::from("7634.0"));
556        assert_eq!(quote.ask_size, Quantity::from(1467849));
557        assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
558        assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
559    }
560
561    #[rstest]
562    fn test_parse_trade_message() {
563        let json_data = load_test_json("trade.json");
564        let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
565
566        let price_precision = 0;
567        let size_precision = 0;
568        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
569        let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
570            .expect("Failed to parse trade message");
571
572        assert_eq!(trade.instrument_id, instrument_id);
573        assert_eq!(trade.price, Price::from("7996"));
574        assert_eq!(trade.size, Quantity::from(50));
575        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
576        assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
577        assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
578    }
579
580    #[rstest]
581    fn test_parse_bar_message() {
582        let json_data = load_test_json("bar.json");
583        let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
584
585        let price_precision = 1;
586        let size_precision = 0;
587        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
588        let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
589
590        assert_eq!(
591            bar.bar_type,
592            BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
593        );
594        assert_eq!(bar.open, Price::from("7623.5"));
595        assert_eq!(bar.high, Price::from("7623.5"));
596        assert_eq!(bar.low, Price::from("7623"));
597        assert_eq!(bar.close, Price::from("7623.5"));
598        assert_eq!(bar.volume, Quantity::from(37034));
599        assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
600        assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
601    }
602}