nautilus_tardis/machine/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::{
23        Bar, BarType, BookOrder, DEPTH10_LEN, Data, FundingRateUpdate, NULL_ORDER, OrderBookDelta,
24        OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
25    },
26    enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27    identifiers::{InstrumentId, TradeId},
28    types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33    message::{
34        BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35    },
36    types::TardisInstrumentMiniInfo,
37};
38use crate::{
39    config::BookSnapshotOutput,
40    parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action},
41};
42
43#[must_use]
44pub fn parse_tardis_ws_message(
45    msg: WsMessage,
46    info: Arc<TardisInstrumentMiniInfo>,
47    book_snapshot_output: &BookSnapshotOutput,
48) -> Option<Data> {
49    match msg {
50        WsMessage::BookChange(msg) => {
51            if msg.bids.is_empty() && msg.asks.is_empty() {
52                let exchange = msg.exchange;
53                let symbol = &msg.symbol;
54                tracing::error!(
55                    "Invalid book change for {exchange} {symbol} (empty bids and asks)"
56                );
57                return None;
58            }
59
60            match parse_book_change_msg_as_deltas(
61                msg,
62                info.price_precision,
63                info.size_precision,
64                info.instrument_id,
65            ) {
66                Ok(deltas) => Some(Data::Deltas(deltas)),
67                Err(e) => {
68                    tracing::error!("Failed to parse book change message: {e}");
69                    None
70                }
71            }
72        }
73        WsMessage::BookSnapshot(msg) => match msg.bids.len() {
74            1 => {
75                match parse_book_snapshot_msg_as_quote(
76                    msg,
77                    info.price_precision,
78                    info.size_precision,
79                    info.instrument_id,
80                ) {
81                    Ok(quote) => Some(Data::Quote(quote)),
82                    Err(e) => {
83                        tracing::error!("Failed to parse book snapshot quote message: {e}");
84                        None
85                    }
86                }
87            }
88            _ => match book_snapshot_output {
89                BookSnapshotOutput::Depth10 => {
90                    match parse_book_snapshot_msg_as_depth10(
91                        msg,
92                        info.price_precision,
93                        info.size_precision,
94                        info.instrument_id,
95                    ) {
96                        Ok(depth10) => Some(Data::Depth10(Box::new(depth10))),
97                        Err(e) => {
98                            tracing::error!("Failed to parse book snapshot as depth10: {e}");
99                            None
100                        }
101                    }
102                }
103                BookSnapshotOutput::Deltas => {
104                    match parse_book_snapshot_msg_as_deltas(
105                        msg,
106                        info.price_precision,
107                        info.size_precision,
108                        info.instrument_id,
109                    ) {
110                        Ok(deltas) => Some(Data::Deltas(deltas)),
111                        Err(e) => {
112                            tracing::error!("Failed to parse book snapshot as deltas: {e}");
113                            None
114                        }
115                    }
116                }
117            },
118        },
119        WsMessage::Trade(msg) => {
120            match parse_trade_msg(
121                msg,
122                info.price_precision,
123                info.size_precision,
124                info.instrument_id,
125            ) {
126                Ok(trade) => Some(Data::Trade(trade)),
127                Err(e) => {
128                    tracing::error!("Failed to parse trade message: {e}");
129                    None
130                }
131            }
132        }
133        WsMessage::TradeBar(msg) => {
134            match parse_bar_msg(
135                msg,
136                info.price_precision,
137                info.size_precision,
138                info.instrument_id,
139            ) {
140                Ok(bar) => Some(Data::Bar(bar)),
141                Err(e) => {
142                    tracing::error!("Failed to parse bar message: {e}");
143                    None
144                }
145            }
146        }
147        // Derivative ticker messages are handled through a separate callback path
148        // for FundingRateUpdate since they're not part of the Data enum.
149        WsMessage::DerivativeTicker(_) => None,
150        WsMessage::Disconnect(_) => None,
151    }
152}
153
154/// Parse a Tardis WebSocket message specifically for funding rate updates.
155/// Returns `Some(FundingRateUpdate)` if the message contains funding rate data, `None` otherwise.
156#[must_use]
157pub fn parse_tardis_ws_message_funding_rate(
158    msg: WsMessage,
159    info: Arc<TardisInstrumentMiniInfo>,
160) -> Option<FundingRateUpdate> {
161    match msg {
162        WsMessage::DerivativeTicker(msg) => {
163            match parse_derivative_ticker_msg(msg, info.instrument_id) {
164                Ok(funding_rate) => funding_rate,
165                Err(e) => {
166                    tracing::error!(
167                        "Failed to parse derivative ticker message for funding rate: {e}"
168                    );
169                    None
170                }
171            }
172        }
173        _ => None, // Only derivative ticker messages can contain funding rates
174    }
175}
176
177/// Parse a book change message into order book deltas, returning an error if timestamps invalid.
178/// Parse a book change message into order book deltas.
179///
180/// # Errors
181///
182/// Returns an error if timestamp fields cannot be converted to nanoseconds.
183pub fn parse_book_change_msg_as_deltas(
184    msg: BookChangeMsg,
185    price_precision: u8,
186    size_precision: u8,
187    instrument_id: InstrumentId,
188) -> anyhow::Result<OrderBookDeltas_API> {
189    parse_book_msg_as_deltas(
190        msg.bids,
191        msg.asks,
192        msg.is_snapshot,
193        price_precision,
194        size_precision,
195        instrument_id,
196        msg.timestamp,
197        msg.local_timestamp,
198    )
199}
200
201/// Parse a book snapshot message into order book deltas, returning an error if timestamps invalid.
202/// Parse a book snapshot message into order book deltas.
203///
204/// # Errors
205///
206/// Returns an error if timestamp fields cannot be converted to nanoseconds.
207pub fn parse_book_snapshot_msg_as_deltas(
208    msg: BookSnapshotMsg,
209    price_precision: u8,
210    size_precision: u8,
211    instrument_id: InstrumentId,
212) -> anyhow::Result<OrderBookDeltas_API> {
213    parse_book_msg_as_deltas(
214        msg.bids,
215        msg.asks,
216        true,
217        price_precision,
218        size_precision,
219        instrument_id,
220        msg.timestamp,
221        msg.local_timestamp,
222    )
223}
224
225/// Parse a book snapshot message into an [`OrderBookDepth10`].
226///
227/// # Errors
228///
229/// Returns an error if timestamp fields cannot be converted to nanoseconds.
230pub fn parse_book_snapshot_msg_as_depth10(
231    msg: BookSnapshotMsg,
232    price_precision: u8,
233    size_precision: u8,
234    instrument_id: InstrumentId,
235) -> anyhow::Result<OrderBookDepth10> {
236    let ts_event_nanos = msg
237        .timestamp
238        .timestamp_nanos_opt()
239        .context("invalid timestamp: cannot extract event nanoseconds")?;
240    anyhow::ensure!(
241        ts_event_nanos >= 0,
242        "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
243    );
244    let ts_event = UnixNanos::from(ts_event_nanos as u64);
245
246    let ts_init_nanos = msg
247        .local_timestamp
248        .timestamp_nanos_opt()
249        .context("invalid timestamp: cannot extract init nanoseconds")?;
250    anyhow::ensure!(
251        ts_init_nanos >= 0,
252        "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
253    );
254    let ts_init = UnixNanos::from(ts_init_nanos as u64);
255
256    let mut bids = [NULL_ORDER; DEPTH10_LEN];
257    let mut asks = [NULL_ORDER; DEPTH10_LEN];
258    let mut bid_counts = [0u32; DEPTH10_LEN];
259    let mut ask_counts = [0u32; DEPTH10_LEN];
260
261    for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
262        bids[i] = BookOrder::new(
263            OrderSide::Buy,
264            Price::new(level.price, price_precision),
265            Quantity::new(level.amount, size_precision),
266            0,
267        );
268        bid_counts[i] = 1;
269    }
270
271    for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
272        asks[i] = BookOrder::new(
273            OrderSide::Sell,
274            Price::new(level.price, price_precision),
275            Quantity::new(level.amount, size_precision),
276            0,
277        );
278        ask_counts[i] = 1;
279    }
280
281    Ok(OrderBookDepth10::new(
282        instrument_id,
283        bids,
284        asks,
285        bid_counts,
286        ask_counts,
287        RecordFlag::F_SNAPSHOT.value(),
288        0, // Sequence not available from Tardis
289        ts_event,
290        ts_init,
291    ))
292}
293
294/// Parse raw book levels into order book deltas, returning error for invalid timestamps.
295#[allow(clippy::too_many_arguments)]
296/// Parse raw book levels into order book deltas.
297///
298/// # Errors
299///
300/// Returns an error if timestamp fields cannot be converted to nanoseconds.
301pub fn parse_book_msg_as_deltas(
302    bids: Vec<BookLevel>,
303    asks: Vec<BookLevel>,
304    is_snapshot: bool,
305    price_precision: u8,
306    size_precision: u8,
307    instrument_id: InstrumentId,
308    timestamp: DateTime<Utc>,
309    local_timestamp: DateTime<Utc>,
310) -> anyhow::Result<OrderBookDeltas_API> {
311    let event_nanos = timestamp
312        .timestamp_nanos_opt()
313        .context("invalid timestamp: cannot extract event nanoseconds")?;
314    anyhow::ensure!(
315        event_nanos >= 0,
316        "invalid timestamp: event nanoseconds {event_nanos} is before UNIX epoch"
317    );
318    let ts_event = UnixNanos::from(event_nanos as u64);
319    let init_nanos = local_timestamp
320        .timestamp_nanos_opt()
321        .context("invalid timestamp: cannot extract init nanoseconds")?;
322    anyhow::ensure!(
323        init_nanos >= 0,
324        "invalid timestamp: init nanoseconds {init_nanos} is before UNIX epoch"
325    );
326    let ts_init = UnixNanos::from(init_nanos as u64);
327
328    let capacity = if is_snapshot {
329        bids.len() + asks.len() + 1
330    } else {
331        bids.len() + asks.len()
332    };
333    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(capacity);
334
335    if is_snapshot {
336        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
337    }
338
339    for level in bids {
340        match parse_book_level(
341            instrument_id,
342            price_precision,
343            size_precision,
344            OrderSide::Buy,
345            level,
346            is_snapshot,
347            ts_event,
348            ts_init,
349        ) {
350            Ok(delta) => deltas.push(delta),
351            Err(e) => tracing::warn!("Skipping invalid bid level for {instrument_id}: {e}"),
352        }
353    }
354
355    for level in asks {
356        match parse_book_level(
357            instrument_id,
358            price_precision,
359            size_precision,
360            OrderSide::Sell,
361            level,
362            is_snapshot,
363            ts_event,
364            ts_init,
365        ) {
366            Ok(delta) => deltas.push(delta),
367            Err(e) => tracing::warn!("Skipping invalid ask level for {instrument_id}: {e}"),
368        }
369    }
370
371    if let Some(last_delta) = deltas.last_mut() {
372        last_delta.flags |= RecordFlag::F_LAST.value();
373    }
374
375    // TODO: Opaque pointer wrapper necessary for Cython (remove once Cython gone)
376    Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
377        instrument_id,
378        deltas,
379    )))
380}
381
382/// Parse a single book level into an order book delta.
383///
384/// # Errors
385///
386/// Returns an error if a non-delete action has a zero size after normalization.
387#[allow(clippy::too_many_arguments)]
388pub fn parse_book_level(
389    instrument_id: InstrumentId,
390    price_precision: u8,
391    size_precision: u8,
392    side: OrderSide,
393    level: BookLevel,
394    is_snapshot: bool,
395    ts_event: UnixNanos,
396    ts_init: UnixNanos,
397) -> anyhow::Result<OrderBookDelta> {
398    let amount = normalize_amount(level.amount, size_precision);
399    let action = parse_book_action(is_snapshot, amount);
400    let price = Price::new(level.price, price_precision);
401    let size = Quantity::new(amount, size_precision);
402    let order_id = 0; // Not applicable for L2 data
403    let order = BookOrder::new(side, price, size, order_id);
404    let flags = if is_snapshot {
405        RecordFlag::F_SNAPSHOT.value()
406    } else {
407        0
408    };
409    let sequence = 0; // Not available
410
411    anyhow::ensure!(
412        !(action != BookAction::Delete && size.is_zero()),
413        "Invalid zero size for {action}"
414    );
415
416    Ok(OrderBookDelta::new(
417        instrument_id,
418        action,
419        order,
420        flags,
421        sequence,
422        ts_event,
423        ts_init,
424    ))
425}
426
427/// Parse a book snapshot message into a quote tick, returning an error on invalid data.
428/// Parse a book snapshot message into a quote tick.
429///
430/// # Errors
431///
432/// Returns an error if missing bid/ask levels or invalid sizes.
433pub fn parse_book_snapshot_msg_as_quote(
434    msg: BookSnapshotMsg,
435    price_precision: u8,
436    size_precision: u8,
437    instrument_id: InstrumentId,
438) -> anyhow::Result<QuoteTick> {
439    let ts_event = UnixNanos::from(msg.timestamp);
440    let ts_init = UnixNanos::from(msg.local_timestamp);
441
442    let best_bid = msg
443        .bids
444        .first()
445        .context("missing best bid level for quote message")?;
446    let bid_price = Price::new(best_bid.price, price_precision);
447    let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
448        .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
449
450    let best_ask = msg
451        .asks
452        .first()
453        .context("missing best ask level for quote message")?;
454    let ask_price = Price::new(best_ask.price, price_precision);
455    let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
456        .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
457
458    Ok(QuoteTick::new(
459        instrument_id,
460        bid_price,
461        ask_price,
462        bid_size,
463        ask_size,
464        ts_event,
465        ts_init,
466    ))
467}
468
469/// Parse a trade message into a trade tick, returning an error on invalid data.
470/// Parse a trade message into a trade tick.
471///
472/// # Errors
473///
474/// Returns an error if invalid trade size is encountered.
475pub fn parse_trade_msg(
476    msg: TradeMsg,
477    price_precision: u8,
478    size_precision: u8,
479    instrument_id: InstrumentId,
480) -> anyhow::Result<TradeTick> {
481    let price = Price::new(msg.price, price_precision);
482    let size = Quantity::non_zero_checked(msg.amount, size_precision)
483        .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
484    let aggressor_side = parse_aggressor_side(&msg.side);
485    let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
486    let ts_event = UnixNanos::from(msg.timestamp);
487    let ts_init = UnixNanos::from(msg.local_timestamp);
488
489    Ok(TradeTick::new(
490        instrument_id,
491        price,
492        size,
493        aggressor_side,
494        trade_id,
495        ts_event,
496        ts_init,
497    ))
498}
499
500/// Parse a bar message into a Bar.
501///
502/// # Errors
503///
504/// Returns an error if the bar specification cannot be parsed.
505pub fn parse_bar_msg(
506    msg: BarMsg,
507    price_precision: u8,
508    size_precision: u8,
509    instrument_id: InstrumentId,
510) -> anyhow::Result<Bar> {
511    let spec = parse_bar_spec(&msg.name)?;
512    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
513
514    let open = Price::new(msg.open, price_precision);
515    let high = Price::new(msg.high, price_precision);
516    let low = Price::new(msg.low, price_precision);
517    let close = Price::new(msg.close, price_precision);
518    let volume = Quantity::non_zero(msg.volume, size_precision);
519    let ts_event = UnixNanos::from(msg.timestamp);
520    let ts_init = UnixNanos::from(msg.local_timestamp);
521
522    Ok(Bar::new(
523        bar_type, open, high, low, close, volume, ts_event, ts_init,
524    ))
525}
526
527/// Parse a derivative ticker message into a funding rate update.
528///
529/// # Errors
530///
531/// Returns an error if timestamp fields cannot be converted to nanoseconds or decimal conversion fails.
532pub fn parse_derivative_ticker_msg(
533    msg: DerivativeTickerMsg,
534    instrument_id: InstrumentId,
535) -> anyhow::Result<Option<FundingRateUpdate>> {
536    // Only process if we have funding rate data
537    let funding_rate = match msg.funding_rate {
538        Some(rate) => rate,
539        None => return Ok(None), // No funding rate data
540    };
541
542    let ts_event_nanos = msg
543        .timestamp
544        .timestamp_nanos_opt()
545        .context("invalid timestamp: cannot extract event nanoseconds")?;
546    anyhow::ensure!(
547        ts_event_nanos >= 0,
548        "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
549    );
550    let ts_event = UnixNanos::from(ts_event_nanos as u64);
551
552    let ts_init_nanos = msg
553        .local_timestamp
554        .timestamp_nanos_opt()
555        .context("invalid timestamp: cannot extract init nanoseconds")?;
556    anyhow::ensure!(
557        ts_init_nanos >= 0,
558        "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
559    );
560    let ts_init = UnixNanos::from(ts_init_nanos as u64);
561
562    let rate = rust_decimal::Decimal::try_from(funding_rate)
563        .with_context(|| format!("Failed to convert funding rate {funding_rate} to Decimal"))?
564        .normalize();
565
566    // For live data, we don't typically have funding timestamp info from derivative ticker
567    let next_funding_ns = None;
568
569    Ok(Some(FundingRateUpdate::new(
570        instrument_id,
571        rate,
572        next_funding_ns,
573        ts_event,
574        ts_init,
575    )))
576}
577
578#[cfg(test)]
579mod tests {
580    use nautilus_model::enums::AggressorSide;
581    use rstest::rstest;
582
583    use super::*;
584    use crate::{enums::TardisExchange, tests::load_test_json};
585
586    #[rstest]
587    fn test_parse_book_change_message() {
588        let json_data = load_test_json("book_change.json");
589        let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
590
591        let price_precision = 0;
592        let size_precision = 0;
593        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
594        let deltas =
595            parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
596                .unwrap();
597
598        assert_eq!(deltas.deltas.len(), 1);
599        assert_eq!(deltas.instrument_id, instrument_id);
600        assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
601        assert_eq!(deltas.sequence, 0);
602        assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
603        assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
604        assert_eq!(
605            deltas.deltas[0].instrument_id,
606            InstrumentId::from("XBTUSD.BITMEX")
607        );
608        assert_eq!(deltas.deltas[0].action, BookAction::Update);
609        assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
610        assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
611        assert_eq!(deltas.deltas[0].order.order_id, 0);
612        assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
613        assert_eq!(deltas.deltas[0].sequence, 0);
614        assert_eq!(
615            deltas.deltas[0].ts_event,
616            UnixNanos::from(1571830193469000000)
617        );
618        assert_eq!(
619            deltas.deltas[0].ts_init,
620            UnixNanos::from(1571830193469000000)
621        );
622    }
623
624    #[rstest]
625    fn test_parse_book_snapshot_message_as_deltas() {
626        let json_data = load_test_json("book_snapshot.json");
627        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
628
629        let price_precision = 1;
630        let size_precision = 0;
631        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
632        let deltas =
633            parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
634                .unwrap();
635
636        let clear_delta = deltas.deltas[0];
637        let bid_delta = deltas.deltas[1];
638        let ask_delta = deltas.deltas[3];
639
640        assert_eq!(deltas.deltas.len(), 5);
641        assert_eq!(deltas.instrument_id, instrument_id);
642        assert_eq!(
643            deltas.flags,
644            RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
645        );
646        assert_eq!(deltas.sequence, 0);
647        assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
648        assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
649
650        // CLEAR delta
651        assert_eq!(clear_delta.instrument_id, instrument_id);
652        assert_eq!(clear_delta.action, BookAction::Clear);
653        assert_eq!(clear_delta.flags, RecordFlag::F_SNAPSHOT.value());
654        assert_eq!(clear_delta.sequence, 0);
655        assert_eq!(clear_delta.ts_event, UnixNanos::from(1572010786950000000));
656        assert_eq!(clear_delta.ts_init, UnixNanos::from(1572010786961000000));
657
658        // First bid delta
659        assert_eq!(bid_delta.instrument_id, instrument_id);
660        assert_eq!(bid_delta.action, BookAction::Add);
661        assert_eq!(bid_delta.order.side, OrderSide::Buy);
662        assert_eq!(bid_delta.order.price, Price::from("7633.5"));
663        assert_eq!(bid_delta.order.size, Quantity::from(1906067));
664        assert_eq!(bid_delta.order.order_id, 0);
665        assert_eq!(bid_delta.flags, RecordFlag::F_SNAPSHOT.value());
666        assert_eq!(bid_delta.sequence, 0);
667        assert_eq!(bid_delta.ts_event, UnixNanos::from(1572010786950000000));
668        assert_eq!(bid_delta.ts_init, UnixNanos::from(1572010786961000000));
669
670        // First ask delta
671        assert_eq!(ask_delta.instrument_id, instrument_id);
672        assert_eq!(ask_delta.action, BookAction::Add);
673        assert_eq!(ask_delta.order.side, OrderSide::Sell);
674        assert_eq!(ask_delta.order.price, Price::from("7634.0"));
675        assert_eq!(ask_delta.order.size, Quantity::from(1467849));
676        assert_eq!(ask_delta.order.order_id, 0);
677        assert_eq!(ask_delta.flags, RecordFlag::F_SNAPSHOT.value());
678        assert_eq!(ask_delta.sequence, 0);
679        assert_eq!(ask_delta.ts_event, UnixNanos::from(1572010786950000000));
680        assert_eq!(ask_delta.ts_init, UnixNanos::from(1572010786961000000));
681    }
682
683    #[rstest]
684    fn test_parse_book_snapshot_message_as_depth10() {
685        let json_data = load_test_json("book_snapshot.json");
686        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
687
688        let price_precision = 1;
689        let size_precision = 0;
690        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
691
692        let depth10 =
693            parse_book_snapshot_msg_as_depth10(msg, price_precision, size_precision, instrument_id)
694                .unwrap();
695
696        assert_eq!(depth10.instrument_id, instrument_id);
697        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT.value());
698        assert_eq!(depth10.sequence, 0);
699        assert_eq!(depth10.ts_event, UnixNanos::from(1572010786950000000));
700        assert_eq!(depth10.ts_init, UnixNanos::from(1572010786961000000));
701
702        // Check first bid level
703        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
704        assert_eq!(depth10.bids[0].price, Price::from("7633.5"));
705        assert_eq!(depth10.bids[0].size, Quantity::from(1906067));
706        assert_eq!(depth10.bids[0].order_id, 0);
707        assert_eq!(depth10.bid_counts[0], 1);
708
709        // Check second bid level
710        assert_eq!(depth10.bids[1].side, OrderSide::Buy);
711        assert_eq!(depth10.bids[1].price, Price::from("7633.0"));
712        assert_eq!(depth10.bids[1].size, Quantity::from(65319));
713        assert_eq!(depth10.bid_counts[1], 1);
714
715        // Check first ask level
716        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
717        assert_eq!(depth10.asks[0].price, Price::from("7634.0"));
718        assert_eq!(depth10.asks[0].size, Quantity::from(1467849));
719        assert_eq!(depth10.asks[0].order_id, 0);
720        assert_eq!(depth10.ask_counts[0], 1);
721
722        // Check second ask level
723        assert_eq!(depth10.asks[1].side, OrderSide::Sell);
724        assert_eq!(depth10.asks[1].price, Price::from("7634.5"));
725        assert_eq!(depth10.asks[1].size, Quantity::from(67939));
726        assert_eq!(depth10.ask_counts[1], 1);
727
728        // Check empty levels are NULL_ORDER
729        assert_eq!(depth10.bids[2], NULL_ORDER);
730        assert_eq!(depth10.bid_counts[2], 0);
731        assert_eq!(depth10.asks[2], NULL_ORDER);
732        assert_eq!(depth10.ask_counts[2], 0);
733    }
734
735    #[rstest]
736    fn test_parse_book_snapshot_message_as_quote() {
737        let json_data = load_test_json("book_snapshot.json");
738        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
739
740        let price_precision = 1;
741        let size_precision = 0;
742        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
743        let quote =
744            parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
745                .expect("Failed to parse book snapshot quote message");
746
747        assert_eq!(quote.instrument_id, instrument_id);
748        assert_eq!(quote.bid_price, Price::from("7633.5"));
749        assert_eq!(quote.bid_size, Quantity::from(1906067));
750        assert_eq!(quote.ask_price, Price::from("7634.0"));
751        assert_eq!(quote.ask_size, Quantity::from(1467849));
752        assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
753        assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
754    }
755
756    #[rstest]
757    fn test_parse_trade_message() {
758        let json_data = load_test_json("trade.json");
759        let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
760
761        let price_precision = 0;
762        let size_precision = 0;
763        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
764        let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
765            .expect("Failed to parse trade message");
766
767        assert_eq!(trade.instrument_id, instrument_id);
768        assert_eq!(trade.price, Price::from("7996"));
769        assert_eq!(trade.size, Quantity::from(50));
770        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
771        assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
772        assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
773    }
774
775    #[rstest]
776    fn test_parse_bar_message() {
777        let json_data = load_test_json("bar.json");
778        let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
779
780        let price_precision = 1;
781        let size_precision = 0;
782        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
783        let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id).unwrap();
784
785        assert_eq!(
786            bar.bar_type,
787            BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
788        );
789        assert_eq!(bar.open, Price::from("7623.5"));
790        assert_eq!(bar.high, Price::from("7623.5"));
791        assert_eq!(bar.low, Price::from("7623"));
792        assert_eq!(bar.close, Price::from("7623.5"));
793        assert_eq!(bar.volume, Quantity::from(37034));
794        assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
795        assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
796    }
797
798    #[rstest]
799    fn test_parse_tardis_ws_message_book_snapshot_routes_to_depth10() {
800        let json_data = load_test_json("book_snapshot.json");
801        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
802        let ws_msg = WsMessage::BookSnapshot(msg);
803
804        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
805        let info = Arc::new(TardisInstrumentMiniInfo::new(
806            instrument_id,
807            None,
808            TardisExchange::Bitmex,
809            1,
810            0,
811        ));
812
813        let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Depth10);
814
815        assert!(result.is_some());
816        assert!(matches!(result.unwrap(), Data::Depth10(_)));
817    }
818
819    #[rstest]
820    fn test_parse_tardis_ws_message_book_snapshot_routes_to_deltas() {
821        let json_data = load_test_json("book_snapshot.json");
822        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
823        let ws_msg = WsMessage::BookSnapshot(msg);
824
825        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
826        let info = Arc::new(TardisInstrumentMiniInfo::new(
827            instrument_id,
828            None,
829            TardisExchange::Bitmex,
830            1,
831            0,
832        ));
833
834        let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Deltas);
835
836        assert!(result.is_some());
837        assert!(matches!(result.unwrap(), Data::Deltas(_)));
838    }
839}