nautilus_tardis/machine/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::{
23        Bar, BarType, BookOrder, DEPTH10_LEN, Data, FundingRateUpdate, NULL_ORDER, OrderBookDelta,
24        OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
25    },
26    enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27    identifiers::{InstrumentId, TradeId},
28    types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33    message::{
34        BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35    },
36    types::TardisInstrumentMiniInfo,
37};
38use crate::{
39    config::BookSnapshotOutput,
40    parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action},
41};
42
43#[must_use]
44pub fn parse_tardis_ws_message(
45    msg: WsMessage,
46    info: Arc<TardisInstrumentMiniInfo>,
47    book_snapshot_output: &BookSnapshotOutput,
48) -> Option<Data> {
49    match msg {
50        WsMessage::BookChange(msg) => {
51            if msg.bids.is_empty() && msg.asks.is_empty() {
52                let exchange = msg.exchange;
53                let symbol = &msg.symbol;
54                log::error!("Invalid book change for {exchange} {symbol} (empty bids and asks)");
55                return None;
56            }
57
58            match parse_book_change_msg_as_deltas(
59                msg,
60                info.price_precision,
61                info.size_precision,
62                info.instrument_id,
63            ) {
64                Ok(deltas) => Some(Data::Deltas(deltas)),
65                Err(e) => {
66                    log::error!("Failed to parse book change message: {e}");
67                    None
68                }
69            }
70        }
71        WsMessage::BookSnapshot(msg) => match msg.bids.len() {
72            1 => {
73                match parse_book_snapshot_msg_as_quote(
74                    msg,
75                    info.price_precision,
76                    info.size_precision,
77                    info.instrument_id,
78                ) {
79                    Ok(quote) => Some(Data::Quote(quote)),
80                    Err(e) => {
81                        log::error!("Failed to parse book snapshot quote message: {e}");
82                        None
83                    }
84                }
85            }
86            _ => match book_snapshot_output {
87                BookSnapshotOutput::Depth10 => {
88                    match parse_book_snapshot_msg_as_depth10(
89                        msg,
90                        info.price_precision,
91                        info.size_precision,
92                        info.instrument_id,
93                    ) {
94                        Ok(depth10) => Some(Data::Depth10(Box::new(depth10))),
95                        Err(e) => {
96                            log::error!("Failed to parse book snapshot as depth10: {e}");
97                            None
98                        }
99                    }
100                }
101                BookSnapshotOutput::Deltas => {
102                    match parse_book_snapshot_msg_as_deltas(
103                        msg,
104                        info.price_precision,
105                        info.size_precision,
106                        info.instrument_id,
107                    ) {
108                        Ok(deltas) => Some(Data::Deltas(deltas)),
109                        Err(e) => {
110                            log::error!("Failed to parse book snapshot as deltas: {e}");
111                            None
112                        }
113                    }
114                }
115            },
116        },
117        WsMessage::Trade(msg) => {
118            match parse_trade_msg(
119                msg,
120                info.price_precision,
121                info.size_precision,
122                info.instrument_id,
123            ) {
124                Ok(trade) => Some(Data::Trade(trade)),
125                Err(e) => {
126                    log::error!("Failed to parse trade message: {e}");
127                    None
128                }
129            }
130        }
131        WsMessage::TradeBar(msg) => {
132            match parse_bar_msg(
133                msg,
134                info.price_precision,
135                info.size_precision,
136                info.instrument_id,
137            ) {
138                Ok(bar) => Some(Data::Bar(bar)),
139                Err(e) => {
140                    log::error!("Failed to parse bar message: {e}");
141                    None
142                }
143            }
144        }
145        // Derivative ticker messages are handled through a separate callback path
146        // for FundingRateUpdate since they're not part of the Data enum.
147        WsMessage::DerivativeTicker(_) => None,
148        WsMessage::Disconnect(_) => None,
149    }
150}
151
152/// Parse a Tardis WebSocket message specifically for funding rate updates.
153/// Returns `Some(FundingRateUpdate)` if the message contains funding rate data, `None` otherwise.
154#[must_use]
155pub fn parse_tardis_ws_message_funding_rate(
156    msg: WsMessage,
157    info: Arc<TardisInstrumentMiniInfo>,
158) -> Option<FundingRateUpdate> {
159    match msg {
160        WsMessage::DerivativeTicker(msg) => {
161            match parse_derivative_ticker_msg(msg, info.instrument_id) {
162                Ok(funding_rate) => funding_rate,
163                Err(e) => {
164                    log::error!("Failed to parse derivative ticker message for funding rate: {e}");
165                    None
166                }
167            }
168        }
169        _ => None, // Only derivative ticker messages can contain funding rates
170    }
171}
172
173/// Parse a book change message into order book deltas, returning an error if timestamps invalid.
174/// Parse a book change message into order book deltas.
175///
176/// # Errors
177///
178/// Returns an error if timestamp fields cannot be converted to nanoseconds.
179pub fn parse_book_change_msg_as_deltas(
180    msg: BookChangeMsg,
181    price_precision: u8,
182    size_precision: u8,
183    instrument_id: InstrumentId,
184) -> anyhow::Result<OrderBookDeltas_API> {
185    parse_book_msg_as_deltas(
186        msg.bids,
187        msg.asks,
188        msg.is_snapshot,
189        price_precision,
190        size_precision,
191        instrument_id,
192        msg.timestamp,
193        msg.local_timestamp,
194    )
195}
196
197/// Parse a book snapshot message into order book deltas, returning an error if timestamps invalid.
198/// Parse a book snapshot message into order book deltas.
199///
200/// # Errors
201///
202/// Returns an error if timestamp fields cannot be converted to nanoseconds.
203pub fn parse_book_snapshot_msg_as_deltas(
204    msg: BookSnapshotMsg,
205    price_precision: u8,
206    size_precision: u8,
207    instrument_id: InstrumentId,
208) -> anyhow::Result<OrderBookDeltas_API> {
209    parse_book_msg_as_deltas(
210        msg.bids,
211        msg.asks,
212        true,
213        price_precision,
214        size_precision,
215        instrument_id,
216        msg.timestamp,
217        msg.local_timestamp,
218    )
219}
220
221/// Parse a book snapshot message into an [`OrderBookDepth10`].
222///
223/// # Errors
224///
225/// Returns an error if timestamp fields cannot be converted to nanoseconds.
226pub fn parse_book_snapshot_msg_as_depth10(
227    msg: BookSnapshotMsg,
228    price_precision: u8,
229    size_precision: u8,
230    instrument_id: InstrumentId,
231) -> anyhow::Result<OrderBookDepth10> {
232    let ts_event_nanos = msg
233        .timestamp
234        .timestamp_nanos_opt()
235        .context("invalid timestamp: cannot extract event nanoseconds")?;
236    anyhow::ensure!(
237        ts_event_nanos >= 0,
238        "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
239    );
240    let ts_event = UnixNanos::from(ts_event_nanos as u64);
241
242    let ts_init_nanos = msg
243        .local_timestamp
244        .timestamp_nanos_opt()
245        .context("invalid timestamp: cannot extract init nanoseconds")?;
246    anyhow::ensure!(
247        ts_init_nanos >= 0,
248        "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
249    );
250    let ts_init = UnixNanos::from(ts_init_nanos as u64);
251
252    let mut bids = [NULL_ORDER; DEPTH10_LEN];
253    let mut asks = [NULL_ORDER; DEPTH10_LEN];
254    let mut bid_counts = [0u32; DEPTH10_LEN];
255    let mut ask_counts = [0u32; DEPTH10_LEN];
256
257    for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
258        bids[i] = BookOrder::new(
259            OrderSide::Buy,
260            Price::new(level.price, price_precision),
261            Quantity::new(level.amount, size_precision),
262            0,
263        );
264        bid_counts[i] = 1;
265    }
266
267    for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
268        asks[i] = BookOrder::new(
269            OrderSide::Sell,
270            Price::new(level.price, price_precision),
271            Quantity::new(level.amount, size_precision),
272            0,
273        );
274        ask_counts[i] = 1;
275    }
276
277    Ok(OrderBookDepth10::new(
278        instrument_id,
279        bids,
280        asks,
281        bid_counts,
282        ask_counts,
283        RecordFlag::F_SNAPSHOT.value(),
284        0, // Sequence not available from Tardis
285        ts_event,
286        ts_init,
287    ))
288}
289
290/// Parse raw book levels into order book deltas, returning error for invalid timestamps.
291#[allow(clippy::too_many_arguments)]
292/// Parse raw book levels into order book deltas.
293///
294/// # Errors
295///
296/// Returns an error if timestamp fields cannot be converted to nanoseconds.
297pub fn parse_book_msg_as_deltas(
298    bids: Vec<BookLevel>,
299    asks: Vec<BookLevel>,
300    is_snapshot: bool,
301    price_precision: u8,
302    size_precision: u8,
303    instrument_id: InstrumentId,
304    timestamp: DateTime<Utc>,
305    local_timestamp: DateTime<Utc>,
306) -> anyhow::Result<OrderBookDeltas_API> {
307    let event_nanos = timestamp
308        .timestamp_nanos_opt()
309        .context("invalid timestamp: cannot extract event nanoseconds")?;
310    anyhow::ensure!(
311        event_nanos >= 0,
312        "invalid timestamp: event nanoseconds {event_nanos} is before UNIX epoch"
313    );
314    let ts_event = UnixNanos::from(event_nanos as u64);
315    let init_nanos = local_timestamp
316        .timestamp_nanos_opt()
317        .context("invalid timestamp: cannot extract init nanoseconds")?;
318    anyhow::ensure!(
319        init_nanos >= 0,
320        "invalid timestamp: init nanoseconds {init_nanos} is before UNIX epoch"
321    );
322    let ts_init = UnixNanos::from(init_nanos as u64);
323
324    let capacity = if is_snapshot {
325        bids.len() + asks.len() + 1
326    } else {
327        bids.len() + asks.len()
328    };
329    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(capacity);
330
331    if is_snapshot {
332        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
333    }
334
335    for level in bids {
336        match parse_book_level(
337            instrument_id,
338            price_precision,
339            size_precision,
340            OrderSide::Buy,
341            level,
342            is_snapshot,
343            ts_event,
344            ts_init,
345        ) {
346            Ok(delta) => deltas.push(delta),
347            Err(e) => log::warn!("Skipping invalid bid level for {instrument_id}: {e}"),
348        }
349    }
350
351    for level in asks {
352        match parse_book_level(
353            instrument_id,
354            price_precision,
355            size_precision,
356            OrderSide::Sell,
357            level,
358            is_snapshot,
359            ts_event,
360            ts_init,
361        ) {
362            Ok(delta) => deltas.push(delta),
363            Err(e) => log::warn!("Skipping invalid ask level for {instrument_id}: {e}"),
364        }
365    }
366
367    if let Some(last_delta) = deltas.last_mut() {
368        last_delta.flags |= RecordFlag::F_LAST.value();
369    }
370
371    // TODO: Opaque pointer wrapper necessary for Cython (remove once Cython gone)
372    Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
373        instrument_id,
374        deltas,
375    )))
376}
377
378/// Parse a single book level into an order book delta.
379///
380/// # Errors
381///
382/// Returns an error if a non-delete action has a zero size after normalization.
383#[allow(clippy::too_many_arguments)]
384pub fn parse_book_level(
385    instrument_id: InstrumentId,
386    price_precision: u8,
387    size_precision: u8,
388    side: OrderSide,
389    level: BookLevel,
390    is_snapshot: bool,
391    ts_event: UnixNanos,
392    ts_init: UnixNanos,
393) -> anyhow::Result<OrderBookDelta> {
394    let amount = normalize_amount(level.amount, size_precision);
395    let action = parse_book_action(is_snapshot, amount);
396    let price = Price::new(level.price, price_precision);
397    let size = Quantity::new(amount, size_precision);
398    let order_id = 0; // Not applicable for L2 data
399    let order = BookOrder::new(side, price, size, order_id);
400    let flags = if is_snapshot {
401        RecordFlag::F_SNAPSHOT.value()
402    } else {
403        0
404    };
405    let sequence = 0; // Not available
406
407    anyhow::ensure!(
408        !(action != BookAction::Delete && size.is_zero()),
409        "Invalid zero size for {action}"
410    );
411
412    Ok(OrderBookDelta::new(
413        instrument_id,
414        action,
415        order,
416        flags,
417        sequence,
418        ts_event,
419        ts_init,
420    ))
421}
422
423/// Parse a book snapshot message into a quote tick, returning an error on invalid data.
424/// Parse a book snapshot message into a quote tick.
425///
426/// # Errors
427///
428/// Returns an error if missing bid/ask levels or invalid sizes.
429pub fn parse_book_snapshot_msg_as_quote(
430    msg: BookSnapshotMsg,
431    price_precision: u8,
432    size_precision: u8,
433    instrument_id: InstrumentId,
434) -> anyhow::Result<QuoteTick> {
435    let ts_event = UnixNanos::from(msg.timestamp);
436    let ts_init = UnixNanos::from(msg.local_timestamp);
437
438    let best_bid = msg
439        .bids
440        .first()
441        .context("missing best bid level for quote message")?;
442    let bid_price = Price::new(best_bid.price, price_precision);
443    let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
444        .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
445
446    let best_ask = msg
447        .asks
448        .first()
449        .context("missing best ask level for quote message")?;
450    let ask_price = Price::new(best_ask.price, price_precision);
451    let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
452        .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
453
454    Ok(QuoteTick::new(
455        instrument_id,
456        bid_price,
457        ask_price,
458        bid_size,
459        ask_size,
460        ts_event,
461        ts_init,
462    ))
463}
464
465/// Parse a trade message into a trade tick, returning an error on invalid data.
466/// Parse a trade message into a trade tick.
467///
468/// # Errors
469///
470/// Returns an error if invalid trade size is encountered.
471pub fn parse_trade_msg(
472    msg: TradeMsg,
473    price_precision: u8,
474    size_precision: u8,
475    instrument_id: InstrumentId,
476) -> anyhow::Result<TradeTick> {
477    let price = Price::new(msg.price, price_precision);
478    let size = Quantity::non_zero_checked(msg.amount, size_precision)
479        .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
480    let aggressor_side = parse_aggressor_side(&msg.side);
481    let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
482    let ts_event = UnixNanos::from(msg.timestamp);
483    let ts_init = UnixNanos::from(msg.local_timestamp);
484
485    Ok(TradeTick::new(
486        instrument_id,
487        price,
488        size,
489        aggressor_side,
490        trade_id,
491        ts_event,
492        ts_init,
493    ))
494}
495
496/// Parse a bar message into a Bar.
497///
498/// # Errors
499///
500/// Returns an error if the bar specification cannot be parsed.
501pub fn parse_bar_msg(
502    msg: BarMsg,
503    price_precision: u8,
504    size_precision: u8,
505    instrument_id: InstrumentId,
506) -> anyhow::Result<Bar> {
507    let spec = parse_bar_spec(&msg.name)?;
508    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
509
510    let open = Price::new(msg.open, price_precision);
511    let high = Price::new(msg.high, price_precision);
512    let low = Price::new(msg.low, price_precision);
513    let close = Price::new(msg.close, price_precision);
514    let volume = Quantity::non_zero(msg.volume, size_precision);
515    let ts_event = UnixNanos::from(msg.timestamp);
516    let ts_init = UnixNanos::from(msg.local_timestamp);
517
518    Ok(Bar::new(
519        bar_type, open, high, low, close, volume, ts_event, ts_init,
520    ))
521}
522
523/// Parse a derivative ticker message into a funding rate update.
524///
525/// # Errors
526///
527/// Returns an error if timestamp fields cannot be converted to nanoseconds or decimal conversion fails.
528pub fn parse_derivative_ticker_msg(
529    msg: DerivativeTickerMsg,
530    instrument_id: InstrumentId,
531) -> anyhow::Result<Option<FundingRateUpdate>> {
532    // Only process if we have funding rate data
533    let funding_rate = match msg.funding_rate {
534        Some(rate) => rate,
535        None => return Ok(None), // No funding rate data
536    };
537
538    let ts_event_nanos = msg
539        .timestamp
540        .timestamp_nanos_opt()
541        .context("invalid timestamp: cannot extract event nanoseconds")?;
542    anyhow::ensure!(
543        ts_event_nanos >= 0,
544        "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
545    );
546    let ts_event = UnixNanos::from(ts_event_nanos as u64);
547
548    let ts_init_nanos = msg
549        .local_timestamp
550        .timestamp_nanos_opt()
551        .context("invalid timestamp: cannot extract init nanoseconds")?;
552    anyhow::ensure!(
553        ts_init_nanos >= 0,
554        "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
555    );
556    let ts_init = UnixNanos::from(ts_init_nanos as u64);
557
558    let rate = rust_decimal::Decimal::try_from(funding_rate)
559        .with_context(|| format!("Failed to convert funding rate {funding_rate} to Decimal"))?
560        .normalize();
561
562    // For live data, we don't typically have funding timestamp info from derivative ticker
563    let next_funding_ns = None;
564
565    Ok(Some(FundingRateUpdate::new(
566        instrument_id,
567        rate,
568        next_funding_ns,
569        ts_event,
570        ts_init,
571    )))
572}
573
574#[cfg(test)]
575mod tests {
576    use nautilus_model::enums::AggressorSide;
577    use rstest::rstest;
578
579    use super::*;
580    use crate::{enums::TardisExchange, tests::load_test_json};
581
582    #[rstest]
583    fn test_parse_book_change_message() {
584        let json_data = load_test_json("book_change.json");
585        let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
586
587        let price_precision = 0;
588        let size_precision = 0;
589        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
590        let deltas =
591            parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
592                .unwrap();
593
594        assert_eq!(deltas.deltas.len(), 1);
595        assert_eq!(deltas.instrument_id, instrument_id);
596        assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
597        assert_eq!(deltas.sequence, 0);
598        assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
599        assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
600        assert_eq!(
601            deltas.deltas[0].instrument_id,
602            InstrumentId::from("XBTUSD.BITMEX")
603        );
604        assert_eq!(deltas.deltas[0].action, BookAction::Update);
605        assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
606        assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
607        assert_eq!(deltas.deltas[0].order.order_id, 0);
608        assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
609        assert_eq!(deltas.deltas[0].sequence, 0);
610        assert_eq!(
611            deltas.deltas[0].ts_event,
612            UnixNanos::from(1571830193469000000)
613        );
614        assert_eq!(
615            deltas.deltas[0].ts_init,
616            UnixNanos::from(1571830193469000000)
617        );
618    }
619
620    #[rstest]
621    fn test_parse_book_snapshot_message_as_deltas() {
622        let json_data = load_test_json("book_snapshot.json");
623        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
624
625        let price_precision = 1;
626        let size_precision = 0;
627        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
628        let deltas =
629            parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
630                .unwrap();
631
632        let clear_delta = deltas.deltas[0];
633        let bid_delta = deltas.deltas[1];
634        let ask_delta = deltas.deltas[3];
635
636        assert_eq!(deltas.deltas.len(), 5);
637        assert_eq!(deltas.instrument_id, instrument_id);
638        assert_eq!(
639            deltas.flags,
640            RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
641        );
642        assert_eq!(deltas.sequence, 0);
643        assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
644        assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
645
646        // CLEAR delta
647        assert_eq!(clear_delta.instrument_id, instrument_id);
648        assert_eq!(clear_delta.action, BookAction::Clear);
649        assert_eq!(clear_delta.flags, RecordFlag::F_SNAPSHOT.value());
650        assert_eq!(clear_delta.sequence, 0);
651        assert_eq!(clear_delta.ts_event, UnixNanos::from(1572010786950000000));
652        assert_eq!(clear_delta.ts_init, UnixNanos::from(1572010786961000000));
653
654        // First bid delta
655        assert_eq!(bid_delta.instrument_id, instrument_id);
656        assert_eq!(bid_delta.action, BookAction::Add);
657        assert_eq!(bid_delta.order.side, OrderSide::Buy);
658        assert_eq!(bid_delta.order.price, Price::from("7633.5"));
659        assert_eq!(bid_delta.order.size, Quantity::from(1906067));
660        assert_eq!(bid_delta.order.order_id, 0);
661        assert_eq!(bid_delta.flags, RecordFlag::F_SNAPSHOT.value());
662        assert_eq!(bid_delta.sequence, 0);
663        assert_eq!(bid_delta.ts_event, UnixNanos::from(1572010786950000000));
664        assert_eq!(bid_delta.ts_init, UnixNanos::from(1572010786961000000));
665
666        // First ask delta
667        assert_eq!(ask_delta.instrument_id, instrument_id);
668        assert_eq!(ask_delta.action, BookAction::Add);
669        assert_eq!(ask_delta.order.side, OrderSide::Sell);
670        assert_eq!(ask_delta.order.price, Price::from("7634.0"));
671        assert_eq!(ask_delta.order.size, Quantity::from(1467849));
672        assert_eq!(ask_delta.order.order_id, 0);
673        assert_eq!(ask_delta.flags, RecordFlag::F_SNAPSHOT.value());
674        assert_eq!(ask_delta.sequence, 0);
675        assert_eq!(ask_delta.ts_event, UnixNanos::from(1572010786950000000));
676        assert_eq!(ask_delta.ts_init, UnixNanos::from(1572010786961000000));
677    }
678
679    #[rstest]
680    fn test_parse_book_snapshot_message_as_depth10() {
681        let json_data = load_test_json("book_snapshot.json");
682        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
683
684        let price_precision = 1;
685        let size_precision = 0;
686        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
687
688        let depth10 =
689            parse_book_snapshot_msg_as_depth10(msg, price_precision, size_precision, instrument_id)
690                .unwrap();
691
692        assert_eq!(depth10.instrument_id, instrument_id);
693        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT.value());
694        assert_eq!(depth10.sequence, 0);
695        assert_eq!(depth10.ts_event, UnixNanos::from(1572010786950000000));
696        assert_eq!(depth10.ts_init, UnixNanos::from(1572010786961000000));
697
698        // Check first bid level
699        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
700        assert_eq!(depth10.bids[0].price, Price::from("7633.5"));
701        assert_eq!(depth10.bids[0].size, Quantity::from(1906067));
702        assert_eq!(depth10.bids[0].order_id, 0);
703        assert_eq!(depth10.bid_counts[0], 1);
704
705        // Check second bid level
706        assert_eq!(depth10.bids[1].side, OrderSide::Buy);
707        assert_eq!(depth10.bids[1].price, Price::from("7633.0"));
708        assert_eq!(depth10.bids[1].size, Quantity::from(65319));
709        assert_eq!(depth10.bid_counts[1], 1);
710
711        // Check first ask level
712        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
713        assert_eq!(depth10.asks[0].price, Price::from("7634.0"));
714        assert_eq!(depth10.asks[0].size, Quantity::from(1467849));
715        assert_eq!(depth10.asks[0].order_id, 0);
716        assert_eq!(depth10.ask_counts[0], 1);
717
718        // Check second ask level
719        assert_eq!(depth10.asks[1].side, OrderSide::Sell);
720        assert_eq!(depth10.asks[1].price, Price::from("7634.5"));
721        assert_eq!(depth10.asks[1].size, Quantity::from(67939));
722        assert_eq!(depth10.ask_counts[1], 1);
723
724        // Check empty levels are NULL_ORDER
725        assert_eq!(depth10.bids[2], NULL_ORDER);
726        assert_eq!(depth10.bid_counts[2], 0);
727        assert_eq!(depth10.asks[2], NULL_ORDER);
728        assert_eq!(depth10.ask_counts[2], 0);
729    }
730
731    #[rstest]
732    fn test_parse_book_snapshot_message_as_quote() {
733        let json_data = load_test_json("book_snapshot.json");
734        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
735
736        let price_precision = 1;
737        let size_precision = 0;
738        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
739        let quote =
740            parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
741                .expect("Failed to parse book snapshot quote message");
742
743        assert_eq!(quote.instrument_id, instrument_id);
744        assert_eq!(quote.bid_price, Price::from("7633.5"));
745        assert_eq!(quote.bid_size, Quantity::from(1906067));
746        assert_eq!(quote.ask_price, Price::from("7634.0"));
747        assert_eq!(quote.ask_size, Quantity::from(1467849));
748        assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
749        assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
750    }
751
752    #[rstest]
753    fn test_parse_trade_message() {
754        let json_data = load_test_json("trade.json");
755        let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
756
757        let price_precision = 0;
758        let size_precision = 0;
759        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
760        let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
761            .expect("Failed to parse trade message");
762
763        assert_eq!(trade.instrument_id, instrument_id);
764        assert_eq!(trade.price, Price::from("7996"));
765        assert_eq!(trade.size, Quantity::from(50));
766        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
767        assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
768        assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
769    }
770
771    #[rstest]
772    fn test_parse_bar_message() {
773        let json_data = load_test_json("bar.json");
774        let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
775
776        let price_precision = 1;
777        let size_precision = 0;
778        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
779        let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id).unwrap();
780
781        assert_eq!(
782            bar.bar_type,
783            BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
784        );
785        assert_eq!(bar.open, Price::from("7623.5"));
786        assert_eq!(bar.high, Price::from("7623.5"));
787        assert_eq!(bar.low, Price::from("7623"));
788        assert_eq!(bar.close, Price::from("7623.5"));
789        assert_eq!(bar.volume, Quantity::from(37034));
790        assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
791        assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
792    }
793
794    #[rstest]
795    fn test_parse_tardis_ws_message_book_snapshot_routes_to_depth10() {
796        let json_data = load_test_json("book_snapshot.json");
797        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
798        let ws_msg = WsMessage::BookSnapshot(msg);
799
800        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
801        let info = Arc::new(TardisInstrumentMiniInfo::new(
802            instrument_id,
803            None,
804            TardisExchange::Bitmex,
805            1,
806            0,
807        ));
808
809        let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Depth10);
810
811        assert!(result.is_some());
812        assert!(matches!(result.unwrap(), Data::Depth10(_)));
813    }
814
815    #[rstest]
816    fn test_parse_tardis_ws_message_book_snapshot_routes_to_deltas() {
817        let json_data = load_test_json("book_snapshot.json");
818        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
819        let ws_msg = WsMessage::BookSnapshot(msg);
820
821        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
822        let info = Arc::new(TardisInstrumentMiniInfo::new(
823            instrument_id,
824            None,
825            TardisExchange::Bitmex,
826            1,
827            0,
828        ));
829
830        let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Deltas);
831
832        assert!(result.is_some());
833        assert!(matches!(result.unwrap(), Data::Deltas(_)));
834    }
835}