nautilus_tardis/machine/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
23        QuoteTick, TradeTick,
24    },
25    enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
26    identifiers::{InstrumentId, TradeId},
27    types::{Price, Quantity},
28};
29use uuid::Uuid;
30
31use super::{
32    message::{BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, TradeMsg, WsMessage},
33    types::InstrumentMiniInfo,
34};
35use crate::parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action};
36
37#[must_use]
38pub fn parse_tardis_ws_message(msg: WsMessage, info: Arc<InstrumentMiniInfo>) -> Option<Data> {
39    match msg {
40        WsMessage::BookChange(msg) => {
41            if msg.bids.is_empty() && msg.asks.is_empty() {
42                tracing::error!(
43                    "Invalid book change for {} {} (empty bids and asks)",
44                    msg.exchange,
45                    msg.symbol
46                );
47                return None;
48            }
49            Some(Data::Deltas(parse_book_change_msg_as_deltas(
50                msg,
51                info.price_precision,
52                info.size_precision,
53                info.instrument_id,
54            )))
55        }
56        WsMessage::BookSnapshot(msg) => match msg.bids.len() {
57            1 => Some(Data::Quote(parse_book_snapshot_msg_as_quote(
58                msg,
59                info.price_precision,
60                info.size_precision,
61                info.instrument_id,
62            ))),
63            _ => Some(Data::Deltas(parse_book_snapshot_msg_as_deltas(
64                msg,
65                info.price_precision,
66                info.size_precision,
67                info.instrument_id,
68            ))),
69        },
70        WsMessage::Trade(msg) => Some(Data::Trade(parse_trade_msg(
71            msg,
72            info.price_precision,
73            info.size_precision,
74            info.instrument_id,
75        ))),
76        WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
77            msg,
78            info.price_precision,
79            info.size_precision,
80            info.instrument_id,
81        ))),
82        WsMessage::DerivativeTicker(_) => None,
83        WsMessage::Disconnect(_) => None,
84    }
85}
86
87#[must_use]
88pub fn parse_book_change_msg_as_deltas(
89    msg: BookChangeMsg,
90    price_precision: u8,
91    size_precision: u8,
92    instrument_id: InstrumentId,
93) -> OrderBookDeltas_API {
94    parse_book_msg_as_deltas(
95        msg.bids,
96        msg.asks,
97        msg.is_snapshot,
98        price_precision,
99        size_precision,
100        instrument_id,
101        msg.timestamp,
102        msg.local_timestamp,
103    )
104}
105
106#[must_use]
107pub fn parse_book_snapshot_msg_as_deltas(
108    msg: BookSnapshotMsg,
109    price_precision: u8,
110    size_precision: u8,
111    instrument_id: InstrumentId,
112) -> OrderBookDeltas_API {
113    parse_book_msg_as_deltas(
114        msg.bids,
115        msg.asks,
116        true,
117        price_precision,
118        size_precision,
119        instrument_id,
120        msg.timestamp,
121        msg.local_timestamp,
122    )
123}
124
125#[allow(clippy::too_many_arguments)]
126#[must_use]
127pub fn parse_book_msg_as_deltas(
128    bids: Vec<BookLevel>,
129    asks: Vec<BookLevel>,
130    is_snapshot: bool,
131    price_precision: u8,
132    size_precision: u8,
133    instrument_id: InstrumentId,
134    timestamp: DateTime<Utc>,
135    local_timestamp: DateTime<Utc>,
136) -> OrderBookDeltas_API {
137    let ts_event = UnixNanos::from(timestamp.timestamp_nanos_opt().unwrap() as u64);
138    let ts_init = UnixNanos::from(local_timestamp.timestamp_nanos_opt().unwrap() as u64);
139
140    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
141
142    for level in bids {
143        deltas.push(parse_book_level(
144            instrument_id,
145            price_precision,
146            size_precision,
147            OrderSide::Buy,
148            level,
149            is_snapshot,
150            ts_event,
151            ts_init,
152        ));
153    }
154
155    for level in asks {
156        deltas.push(parse_book_level(
157            instrument_id,
158            price_precision,
159            size_precision,
160            OrderSide::Sell,
161            level,
162            is_snapshot,
163            ts_event,
164            ts_init,
165        ));
166    }
167
168    if let Some(last_delta) = deltas.last_mut() {
169        last_delta.flags += RecordFlag::F_LAST.value();
170    }
171
172    // TODO: Opaque pointer wrapper necessary for Cython (remove once Cython gone)
173    OrderBookDeltas_API::new(OrderBookDeltas::new(instrument_id, deltas))
174}
175
176#[allow(clippy::too_many_arguments)]
177#[must_use]
178pub fn parse_book_level(
179    instrument_id: InstrumentId,
180    price_precision: u8,
181    size_precision: u8,
182    side: OrderSide,
183    level: BookLevel,
184    is_snapshot: bool,
185    ts_event: UnixNanos,
186    ts_init: UnixNanos,
187) -> OrderBookDelta {
188    let amount = normalize_amount(level.amount, size_precision);
189    let action = parse_book_action(is_snapshot, amount);
190    let price = Price::new(level.price, price_precision);
191    let size = Quantity::new(amount, size_precision);
192    let order_id = 0; // Not applicable for L2 data
193    let order = BookOrder::new(side, price, size, order_id);
194    let flags = if is_snapshot {
195        RecordFlag::F_SNAPSHOT.value()
196    } else {
197        0
198    };
199    let sequence = 0; // Not available
200
201    assert!(
202        !(action != BookAction::Delete && size.is_zero()),
203        "Invalid zero size for {action}"
204    );
205
206    OrderBookDelta::new(
207        instrument_id,
208        action,
209        order,
210        flags,
211        sequence,
212        ts_event,
213        ts_init,
214    )
215}
216
217#[must_use]
218pub fn parse_book_snapshot_msg_as_quote(
219    msg: BookSnapshotMsg,
220    price_precision: u8,
221    size_precision: u8,
222    instrument_id: InstrumentId,
223) -> QuoteTick {
224    let ts_event = UnixNanos::from(msg.timestamp);
225    let ts_init = UnixNanos::from(msg.local_timestamp);
226
227    let best_bid = &msg.bids[0];
228    let bid_price = Price::new(best_bid.price, price_precision);
229    let bid_size = Quantity::new(best_bid.amount, size_precision);
230
231    let best_ask = &msg.asks[0];
232    let ask_price = Price::new(best_ask.price, price_precision);
233    let ask_size = Quantity::new(best_ask.amount, size_precision);
234
235    QuoteTick::new(
236        instrument_id,
237        bid_price,
238        ask_price,
239        bid_size,
240        ask_size,
241        ts_event,
242        ts_init,
243    )
244}
245
246#[must_use]
247pub fn parse_trade_msg(
248    msg: TradeMsg,
249    price_precision: u8,
250    size_precision: u8,
251    instrument_id: InstrumentId,
252) -> TradeTick {
253    let price = Price::new(msg.price, price_precision);
254    let size = Quantity::new(msg.amount, size_precision);
255    let aggressor_side = parse_aggressor_side(&msg.side);
256    let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
257    let ts_event = UnixNanos::from(msg.timestamp);
258    let ts_init = UnixNanos::from(msg.local_timestamp);
259
260    TradeTick::new(
261        instrument_id,
262        price,
263        size,
264        aggressor_side,
265        trade_id,
266        ts_event,
267        ts_init,
268    )
269}
270
271#[must_use]
272pub fn parse_bar_msg(
273    msg: BarMsg,
274    price_precision: u8,
275    size_precision: u8,
276    instrument_id: InstrumentId,
277) -> Bar {
278    let spec = parse_bar_spec(&msg.name);
279    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
280
281    let open = Price::new(msg.open, price_precision);
282    let high = Price::new(msg.high, price_precision);
283    let low = Price::new(msg.low, price_precision);
284    let close = Price::new(msg.close, price_precision);
285    let volume = Quantity::new(msg.volume, size_precision);
286    let ts_event = UnixNanos::from(msg.timestamp);
287    let ts_init = UnixNanos::from(msg.local_timestamp);
288
289    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
290}
291
292////////////////////////////////////////////////////////////////////////////////
293// Tests
294////////////////////////////////////////////////////////////////////////////////
295#[cfg(test)]
296mod tests {
297    use nautilus_model::enums::{AggressorSide, BookAction};
298    use rstest::rstest;
299
300    use super::*;
301    use crate::tests::load_test_json;
302
303    #[rstest]
304    fn test_parse_book_change_message() {
305        let json_data = load_test_json("book_change.json");
306        let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
307
308        let price_precision = 0;
309        let size_precision = 0;
310        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
311        let deltas =
312            parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
313
314        assert_eq!(deltas.deltas.len(), 1);
315        assert_eq!(deltas.instrument_id, instrument_id);
316        assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
317        assert_eq!(deltas.sequence, 0);
318        assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
319        assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
320        assert_eq!(
321            deltas.deltas[0].instrument_id,
322            InstrumentId::from("XBTUSD.BITMEX")
323        );
324        assert_eq!(deltas.deltas[0].action, BookAction::Update);
325        assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
326        assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
327        assert_eq!(deltas.deltas[0].order.order_id, 0);
328        assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
329        assert_eq!(deltas.deltas[0].sequence, 0);
330        assert_eq!(
331            deltas.deltas[0].ts_event,
332            UnixNanos::from(1571830193469000000)
333        );
334        assert_eq!(
335            deltas.deltas[0].ts_init,
336            UnixNanos::from(1571830193469000000)
337        );
338    }
339
340    #[rstest]
341    fn test_parse_book_snapshot_message_as_deltas() {
342        let json_data = load_test_json("book_snapshot.json");
343        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
344
345        let price_precision = 1;
346        let size_precision = 0;
347        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
348        let deltas =
349            parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
350        let delta_0 = deltas.deltas[0];
351        let delta_2 = deltas.deltas[2];
352
353        assert_eq!(deltas.deltas.len(), 4);
354        assert_eq!(deltas.instrument_id, instrument_id);
355        assert_eq!(
356            deltas.flags,
357            RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
358        );
359        assert_eq!(deltas.sequence, 0);
360        assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
361        assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
362        assert_eq!(delta_0.instrument_id, instrument_id);
363        assert_eq!(delta_0.action, BookAction::Add);
364        assert_eq!(delta_0.order.side, OrderSide::Buy);
365        assert_eq!(delta_0.order.price, Price::from("7633.5"));
366        assert_eq!(delta_0.order.size, Quantity::from(1906067));
367        assert_eq!(delta_0.order.order_id, 0);
368        assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
369        assert_eq!(delta_0.sequence, 0);
370        assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
371        assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
372        assert_eq!(delta_2.instrument_id, instrument_id);
373        assert_eq!(delta_2.action, BookAction::Add);
374        assert_eq!(delta_2.order.side, OrderSide::Sell);
375        assert_eq!(delta_2.order.price, Price::from("7634.0"));
376        assert_eq!(delta_2.order.size, Quantity::from(1467849));
377        assert_eq!(delta_2.order.order_id, 0);
378        assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
379        assert_eq!(delta_2.sequence, 0);
380        assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
381        assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
382    }
383
384    #[rstest]
385    fn test_parse_book_snapshot_message_as_quote() {
386        let json_data = load_test_json("book_snapshot.json");
387        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
388
389        let price_precision = 1;
390        let size_precision = 0;
391        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
392        let quote =
393            parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id);
394
395        assert_eq!(quote.instrument_id, instrument_id);
396        assert_eq!(quote.bid_price, Price::from("7633.5"));
397        assert_eq!(quote.bid_size, Quantity::from(1906067));
398        assert_eq!(quote.ask_price, Price::from("7634.0"));
399        assert_eq!(quote.ask_size, Quantity::from(1467849));
400        assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
401        assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
402    }
403
404    #[rstest]
405    fn test_parse_trade_message() {
406        let json_data = load_test_json("trade.json");
407        let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
408
409        let price_precision = 0;
410        let size_precision = 0;
411        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
412        let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id);
413
414        assert_eq!(trade.instrument_id, instrument_id);
415        assert_eq!(trade.price, Price::from("7996"));
416        assert_eq!(trade.size, Quantity::from(50));
417        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
418        assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
419        assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
420    }
421
422    #[rstest]
423    fn test_parse_bar_message() {
424        let json_data = load_test_json("bar.json");
425        let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
426
427        let price_precision = 1;
428        let size_precision = 0;
429        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
430        let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
431
432        assert_eq!(
433            bar.bar_type,
434            BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
435        );
436        assert_eq!(bar.open, Price::from("7623.5"));
437        assert_eq!(bar.high, Price::from("7623.5"));
438        assert_eq!(bar.low, Price::from("7623"));
439        assert_eq!(bar.close, Price::from("7623.5"));
440        assert_eq!(bar.volume, Quantity::from(37034));
441        assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
442        assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
443    }
444}