nautilus_tardis/machine/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
23        QuoteTick, TradeTick,
24    },
25    enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
26    identifiers::{InstrumentId, TradeId},
27    types::{Price, Quantity},
28};
29use uuid::Uuid;
30
31use super::{
32    message::{BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, TradeMsg, WsMessage},
33    types::InstrumentMiniInfo,
34};
35use crate::parse::{parse_aggressor_side, parse_bar_spec, parse_book_action};
36
37#[must_use]
38pub fn parse_tardis_ws_message(msg: WsMessage, info: Arc<InstrumentMiniInfo>) -> Option<Data> {
39    match msg {
40        WsMessage::BookChange(msg) => {
41            if msg.bids.is_empty() && msg.asks.is_empty() {
42                tracing::error!(
43                    "Invalid book change for {} {} (empty bids and asks)",
44                    msg.exchange,
45                    msg.symbol
46                );
47                return None;
48            }
49            Some(Data::Deltas(parse_book_change_msg_as_deltas(
50                msg,
51                info.price_precision,
52                info.size_precision,
53                info.instrument_id,
54            )))
55        }
56        WsMessage::BookSnapshot(msg) => match msg.bids.len() {
57            1 => Some(Data::Quote(parse_book_snapshot_msg_as_quote(
58                msg,
59                info.price_precision,
60                info.size_precision,
61                info.instrument_id,
62            ))),
63            _ => Some(Data::Deltas(parse_book_snapshot_msg_as_deltas(
64                msg,
65                info.price_precision,
66                info.size_precision,
67                info.instrument_id,
68            ))),
69        },
70        WsMessage::Trade(msg) => Some(Data::Trade(parse_trade_msg(
71            msg,
72            info.price_precision,
73            info.size_precision,
74            info.instrument_id,
75        ))),
76        WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
77            msg,
78            info.price_precision,
79            info.size_precision,
80            info.instrument_id,
81        ))),
82        WsMessage::DerivativeTicker(_) => None,
83        WsMessage::Disconnect(_) => None,
84    }
85}
86
87#[must_use]
88pub fn parse_book_change_msg_as_deltas(
89    msg: BookChangeMsg,
90    price_precision: u8,
91    size_precision: u8,
92    instrument_id: InstrumentId,
93) -> OrderBookDeltas_API {
94    parse_book_msg_as_deltas(
95        msg.bids,
96        msg.asks,
97        msg.is_snapshot,
98        price_precision,
99        size_precision,
100        instrument_id,
101        msg.timestamp,
102        msg.local_timestamp,
103    )
104}
105
106#[must_use]
107pub fn parse_book_snapshot_msg_as_deltas(
108    msg: BookSnapshotMsg,
109    price_precision: u8,
110    size_precision: u8,
111    instrument_id: InstrumentId,
112) -> OrderBookDeltas_API {
113    parse_book_msg_as_deltas(
114        msg.bids,
115        msg.asks,
116        true,
117        price_precision,
118        size_precision,
119        instrument_id,
120        msg.timestamp,
121        msg.local_timestamp,
122    )
123}
124
125#[allow(clippy::too_many_arguments)]
126#[must_use]
127pub fn parse_book_msg_as_deltas(
128    bids: Vec<BookLevel>,
129    asks: Vec<BookLevel>,
130    is_snapshot: bool,
131    price_precision: u8,
132    size_precision: u8,
133    instrument_id: InstrumentId,
134    timestamp: DateTime<Utc>,
135    local_timestamp: DateTime<Utc>,
136) -> OrderBookDeltas_API {
137    let ts_event = UnixNanos::from(timestamp.timestamp_nanos_opt().unwrap() as u64);
138    let ts_init = UnixNanos::from(local_timestamp.timestamp_nanos_opt().unwrap() as u64);
139
140    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
141
142    for level in bids {
143        deltas.push(parse_book_level(
144            instrument_id,
145            price_precision,
146            size_precision,
147            OrderSide::Buy,
148            level,
149            is_snapshot,
150            ts_event,
151            ts_init,
152        ));
153    }
154
155    for level in asks {
156        deltas.push(parse_book_level(
157            instrument_id,
158            price_precision,
159            size_precision,
160            OrderSide::Sell,
161            level,
162            is_snapshot,
163            ts_event,
164            ts_init,
165        ));
166    }
167
168    if let Some(last_delta) = deltas.last_mut() {
169        last_delta.flags += RecordFlag::F_LAST.value();
170    }
171
172    // TODO: Opaque pointer wrapper necessary for Cython (remove once Cython gone)
173    OrderBookDeltas_API::new(OrderBookDeltas::new(instrument_id, deltas))
174}
175
176#[allow(clippy::too_many_arguments)]
177#[must_use]
178pub fn parse_book_level(
179    instrument_id: InstrumentId,
180    price_precision: u8,
181    size_precision: u8,
182    side: OrderSide,
183    level: BookLevel,
184    is_snapshot: bool,
185    ts_event: UnixNanos,
186    ts_init: UnixNanos,
187) -> OrderBookDelta {
188    let action = parse_book_action(is_snapshot, level.amount);
189    let price = Price::new(level.price, price_precision);
190    let size = Quantity::new(level.amount, size_precision);
191    let order_id = 0; // Not applicable for L2 data
192    let order = BookOrder::new(side, price, size, order_id);
193    let flags = if is_snapshot {
194        RecordFlag::F_SNAPSHOT.value()
195    } else {
196        0
197    };
198    let sequence = 0; // Not available
199
200    assert!(
201        !(action != BookAction::Delete && size.is_zero()),
202        "Invalid zero size for {action}"
203    );
204
205    OrderBookDelta::new(
206        instrument_id,
207        action,
208        order,
209        flags,
210        sequence,
211        ts_event,
212        ts_init,
213    )
214}
215
216#[must_use]
217pub fn parse_book_snapshot_msg_as_quote(
218    msg: BookSnapshotMsg,
219    price_precision: u8,
220    size_precision: u8,
221    instrument_id: InstrumentId,
222) -> QuoteTick {
223    let ts_event = UnixNanos::from(msg.timestamp);
224    let ts_init = UnixNanos::from(msg.local_timestamp);
225
226    let best_bid = &msg.bids[0];
227    let bid_price = Price::new(best_bid.price, price_precision);
228    let bid_size = Quantity::new(best_bid.amount, size_precision);
229
230    let best_ask = &msg.asks[0];
231    let ask_price = Price::new(best_ask.price, price_precision);
232    let ask_size = Quantity::new(best_ask.amount, size_precision);
233
234    QuoteTick::new(
235        instrument_id,
236        bid_price,
237        ask_price,
238        bid_size,
239        ask_size,
240        ts_event,
241        ts_init,
242    )
243}
244
245#[must_use]
246pub fn parse_trade_msg(
247    msg: TradeMsg,
248    price_precision: u8,
249    size_precision: u8,
250    instrument_id: InstrumentId,
251) -> TradeTick {
252    let price = Price::new(msg.price, price_precision);
253    let size = Quantity::new(msg.amount, size_precision);
254    let aggressor_side = parse_aggressor_side(&msg.side);
255    let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
256    let ts_event = UnixNanos::from(msg.timestamp);
257    let ts_init = UnixNanos::from(msg.local_timestamp);
258
259    TradeTick::new(
260        instrument_id,
261        price,
262        size,
263        aggressor_side,
264        trade_id,
265        ts_event,
266        ts_init,
267    )
268}
269
270#[must_use]
271pub fn parse_bar_msg(
272    msg: BarMsg,
273    price_precision: u8,
274    size_precision: u8,
275    instrument_id: InstrumentId,
276) -> Bar {
277    let spec = parse_bar_spec(&msg.name);
278    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
279
280    let open = Price::new(msg.open, price_precision);
281    let high = Price::new(msg.high, price_precision);
282    let low = Price::new(msg.low, price_precision);
283    let close = Price::new(msg.close, price_precision);
284    let volume = Quantity::new(msg.volume, size_precision);
285    let ts_event = UnixNanos::from(msg.timestamp);
286    let ts_init = UnixNanos::from(msg.local_timestamp);
287
288    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
289}
290
291////////////////////////////////////////////////////////////////////////////////
292// Tests
293////////////////////////////////////////////////////////////////////////////////
294#[cfg(test)]
295mod tests {
296    use nautilus_model::enums::{AggressorSide, BookAction};
297    use rstest::rstest;
298
299    use super::*;
300    use crate::tests::load_test_json;
301
302    #[rstest]
303    fn test_parse_book_change_message() {
304        let json_data = load_test_json("book_change.json");
305        let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
306
307        let price_precision = 0;
308        let size_precision = 0;
309        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
310        let deltas =
311            parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
312
313        assert_eq!(deltas.deltas.len(), 1);
314        assert_eq!(deltas.instrument_id, instrument_id);
315        assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
316        assert_eq!(deltas.sequence, 0);
317        assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
318        assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
319        assert_eq!(
320            deltas.deltas[0].instrument_id,
321            InstrumentId::from("XBTUSD.BITMEX")
322        );
323        assert_eq!(deltas.deltas[0].action, BookAction::Update);
324        assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
325        assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
326        assert_eq!(deltas.deltas[0].order.order_id, 0);
327        assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
328        assert_eq!(deltas.deltas[0].sequence, 0);
329        assert_eq!(
330            deltas.deltas[0].ts_event,
331            UnixNanos::from(1571830193469000000)
332        );
333        assert_eq!(
334            deltas.deltas[0].ts_init,
335            UnixNanos::from(1571830193469000000)
336        );
337    }
338
339    #[rstest]
340    fn test_parse_book_snapshot_message_as_deltas() {
341        let json_data = load_test_json("book_snapshot.json");
342        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
343
344        let price_precision = 1;
345        let size_precision = 0;
346        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
347        let deltas =
348            parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
349        let delta_0 = deltas.deltas[0];
350        let delta_2 = deltas.deltas[2];
351
352        assert_eq!(deltas.deltas.len(), 4);
353        assert_eq!(deltas.instrument_id, instrument_id);
354        assert_eq!(
355            deltas.flags,
356            RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
357        );
358        assert_eq!(deltas.sequence, 0);
359        assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
360        assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
361        assert_eq!(delta_0.instrument_id, instrument_id);
362        assert_eq!(delta_0.action, BookAction::Add);
363        assert_eq!(delta_0.order.side, OrderSide::Buy);
364        assert_eq!(delta_0.order.price, Price::from("7633.5"));
365        assert_eq!(delta_0.order.size, Quantity::from(1906067));
366        assert_eq!(delta_0.order.order_id, 0);
367        assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
368        assert_eq!(delta_0.sequence, 0);
369        assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
370        assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
371        assert_eq!(delta_2.instrument_id, instrument_id);
372        assert_eq!(delta_2.action, BookAction::Add);
373        assert_eq!(delta_2.order.side, OrderSide::Sell);
374        assert_eq!(delta_2.order.price, Price::from("7634.0"));
375        assert_eq!(delta_2.order.size, Quantity::from(1467849));
376        assert_eq!(delta_2.order.order_id, 0);
377        assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
378        assert_eq!(delta_2.sequence, 0);
379        assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
380        assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
381    }
382
383    #[rstest]
384    fn test_parse_book_snapshot_message_as_quote() {
385        let json_data = load_test_json("book_snapshot.json");
386        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
387
388        let price_precision = 1;
389        let size_precision = 0;
390        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
391        let quote =
392            parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id);
393
394        assert_eq!(quote.instrument_id, instrument_id);
395        assert_eq!(quote.bid_price, Price::from("7633.5"));
396        assert_eq!(quote.bid_size, Quantity::from(1906067));
397        assert_eq!(quote.ask_price, Price::from("7634.0"));
398        assert_eq!(quote.ask_size, Quantity::from(1467849));
399        assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
400        assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
401    }
402
403    #[rstest]
404    fn test_parse_trade_message() {
405        let json_data = load_test_json("trade.json");
406        let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
407
408        let price_precision = 0;
409        let size_precision = 0;
410        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
411        let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id);
412
413        assert_eq!(trade.instrument_id, instrument_id);
414        assert_eq!(trade.price, Price::from("7996"));
415        assert_eq!(trade.size, Quantity::from(50));
416        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
417        assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
418        assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
419    }
420
421    #[rstest]
422    fn test_parse_bar_message() {
423        let json_data = load_test_json("bar.json");
424        let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
425
426        let price_precision = 1;
427        let size_precision = 0;
428        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
429        let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
430
431        assert_eq!(
432            bar.bar_type,
433            BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
434        );
435        assert_eq!(bar.open, Price::from("7623.5"));
436        assert_eq!(bar.high, Price::from("7623.5"));
437        assert_eq!(bar.low, Price::from("7623"));
438        assert_eq!(bar.close, Price::from("7623.5"));
439        assert_eq!(bar.volume, Quantity::from(37034));
440        assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
441        assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
442    }
443}