1use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 Bar, BarType, BookOrder, DEPTH10_LEN, Data, FundingRateUpdate, NULL_ORDER, OrderBookDelta,
24 OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
25 },
26 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27 identifiers::{InstrumentId, TradeId},
28 types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33 message::{
34 BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35 },
36 types::TardisInstrumentMiniInfo,
37};
38use crate::{
39 config::BookSnapshotOutput,
40 parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action},
41};
42
43#[must_use]
44pub fn parse_tardis_ws_message(
45 msg: WsMessage,
46 info: Arc<TardisInstrumentMiniInfo>,
47 book_snapshot_output: &BookSnapshotOutput,
48) -> Option<Data> {
49 match msg {
50 WsMessage::BookChange(msg) => {
51 if msg.bids.is_empty() && msg.asks.is_empty() {
52 let exchange = msg.exchange;
53 let symbol = &msg.symbol;
54 tracing::error!(
55 "Invalid book change for {exchange} {symbol} (empty bids and asks)"
56 );
57 return None;
58 }
59
60 match parse_book_change_msg_as_deltas(
61 msg,
62 info.price_precision,
63 info.size_precision,
64 info.instrument_id,
65 ) {
66 Ok(deltas) => Some(Data::Deltas(deltas)),
67 Err(e) => {
68 tracing::error!("Failed to parse book change message: {e}");
69 None
70 }
71 }
72 }
73 WsMessage::BookSnapshot(msg) => match msg.bids.len() {
74 1 => {
75 match parse_book_snapshot_msg_as_quote(
76 msg,
77 info.price_precision,
78 info.size_precision,
79 info.instrument_id,
80 ) {
81 Ok(quote) => Some(Data::Quote(quote)),
82 Err(e) => {
83 tracing::error!("Failed to parse book snapshot quote message: {e}");
84 None
85 }
86 }
87 }
88 _ => match book_snapshot_output {
89 BookSnapshotOutput::Depth10 => {
90 match parse_book_snapshot_msg_as_depth10(
91 msg,
92 info.price_precision,
93 info.size_precision,
94 info.instrument_id,
95 ) {
96 Ok(depth10) => Some(Data::Depth10(Box::new(depth10))),
97 Err(e) => {
98 tracing::error!("Failed to parse book snapshot as depth10: {e}");
99 None
100 }
101 }
102 }
103 BookSnapshotOutput::Deltas => {
104 match parse_book_snapshot_msg_as_deltas(
105 msg,
106 info.price_precision,
107 info.size_precision,
108 info.instrument_id,
109 ) {
110 Ok(deltas) => Some(Data::Deltas(deltas)),
111 Err(e) => {
112 tracing::error!("Failed to parse book snapshot as deltas: {e}");
113 None
114 }
115 }
116 }
117 },
118 },
119 WsMessage::Trade(msg) => {
120 match parse_trade_msg(
121 msg,
122 info.price_precision,
123 info.size_precision,
124 info.instrument_id,
125 ) {
126 Ok(trade) => Some(Data::Trade(trade)),
127 Err(e) => {
128 tracing::error!("Failed to parse trade message: {e}");
129 None
130 }
131 }
132 }
133 WsMessage::TradeBar(msg) => {
134 match parse_bar_msg(
135 msg,
136 info.price_precision,
137 info.size_precision,
138 info.instrument_id,
139 ) {
140 Ok(bar) => Some(Data::Bar(bar)),
141 Err(e) => {
142 tracing::error!("Failed to parse bar message: {e}");
143 None
144 }
145 }
146 }
147 WsMessage::DerivativeTicker(_) => None,
150 WsMessage::Disconnect(_) => None,
151 }
152}
153
154#[must_use]
157pub fn parse_tardis_ws_message_funding_rate(
158 msg: WsMessage,
159 info: Arc<TardisInstrumentMiniInfo>,
160) -> Option<FundingRateUpdate> {
161 match msg {
162 WsMessage::DerivativeTicker(msg) => {
163 match parse_derivative_ticker_msg(msg, info.instrument_id) {
164 Ok(funding_rate) => funding_rate,
165 Err(e) => {
166 tracing::error!(
167 "Failed to parse derivative ticker message for funding rate: {e}"
168 );
169 None
170 }
171 }
172 }
173 _ => None, }
175}
176
177pub fn parse_book_change_msg_as_deltas(
184 msg: BookChangeMsg,
185 price_precision: u8,
186 size_precision: u8,
187 instrument_id: InstrumentId,
188) -> anyhow::Result<OrderBookDeltas_API> {
189 parse_book_msg_as_deltas(
190 msg.bids,
191 msg.asks,
192 msg.is_snapshot,
193 price_precision,
194 size_precision,
195 instrument_id,
196 msg.timestamp,
197 msg.local_timestamp,
198 )
199}
200
201pub fn parse_book_snapshot_msg_as_deltas(
208 msg: BookSnapshotMsg,
209 price_precision: u8,
210 size_precision: u8,
211 instrument_id: InstrumentId,
212) -> anyhow::Result<OrderBookDeltas_API> {
213 parse_book_msg_as_deltas(
214 msg.bids,
215 msg.asks,
216 true,
217 price_precision,
218 size_precision,
219 instrument_id,
220 msg.timestamp,
221 msg.local_timestamp,
222 )
223}
224
225pub fn parse_book_snapshot_msg_as_depth10(
231 msg: BookSnapshotMsg,
232 price_precision: u8,
233 size_precision: u8,
234 instrument_id: InstrumentId,
235) -> anyhow::Result<OrderBookDepth10> {
236 let ts_event_nanos = msg
237 .timestamp
238 .timestamp_nanos_opt()
239 .context("invalid timestamp: cannot extract event nanoseconds")?;
240 anyhow::ensure!(
241 ts_event_nanos >= 0,
242 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
243 );
244 let ts_event = UnixNanos::from(ts_event_nanos as u64);
245
246 let ts_init_nanos = msg
247 .local_timestamp
248 .timestamp_nanos_opt()
249 .context("invalid timestamp: cannot extract init nanoseconds")?;
250 anyhow::ensure!(
251 ts_init_nanos >= 0,
252 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
253 );
254 let ts_init = UnixNanos::from(ts_init_nanos as u64);
255
256 let mut bids = [NULL_ORDER; DEPTH10_LEN];
257 let mut asks = [NULL_ORDER; DEPTH10_LEN];
258 let mut bid_counts = [0u32; DEPTH10_LEN];
259 let mut ask_counts = [0u32; DEPTH10_LEN];
260
261 for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
262 bids[i] = BookOrder::new(
263 OrderSide::Buy,
264 Price::new(level.price, price_precision),
265 Quantity::new(level.amount, size_precision),
266 0,
267 );
268 bid_counts[i] = 1;
269 }
270
271 for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
272 asks[i] = BookOrder::new(
273 OrderSide::Sell,
274 Price::new(level.price, price_precision),
275 Quantity::new(level.amount, size_precision),
276 0,
277 );
278 ask_counts[i] = 1;
279 }
280
281 Ok(OrderBookDepth10::new(
282 instrument_id,
283 bids,
284 asks,
285 bid_counts,
286 ask_counts,
287 RecordFlag::F_SNAPSHOT.value(),
288 0, ts_event,
290 ts_init,
291 ))
292}
293
294#[allow(clippy::too_many_arguments)]
296pub fn parse_book_msg_as_deltas(
302 bids: Vec<BookLevel>,
303 asks: Vec<BookLevel>,
304 is_snapshot: bool,
305 price_precision: u8,
306 size_precision: u8,
307 instrument_id: InstrumentId,
308 timestamp: DateTime<Utc>,
309 local_timestamp: DateTime<Utc>,
310) -> anyhow::Result<OrderBookDeltas_API> {
311 let event_nanos = timestamp
312 .timestamp_nanos_opt()
313 .context("invalid timestamp: cannot extract event nanoseconds")?;
314 anyhow::ensure!(
315 event_nanos >= 0,
316 "invalid timestamp: event nanoseconds {event_nanos} is before UNIX epoch"
317 );
318 let ts_event = UnixNanos::from(event_nanos as u64);
319 let init_nanos = local_timestamp
320 .timestamp_nanos_opt()
321 .context("invalid timestamp: cannot extract init nanoseconds")?;
322 anyhow::ensure!(
323 init_nanos >= 0,
324 "invalid timestamp: init nanoseconds {init_nanos} is before UNIX epoch"
325 );
326 let ts_init = UnixNanos::from(init_nanos as u64);
327
328 let capacity = if is_snapshot {
329 bids.len() + asks.len() + 1
330 } else {
331 bids.len() + asks.len()
332 };
333 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(capacity);
334
335 if is_snapshot {
336 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
337 }
338
339 for level in bids {
340 match parse_book_level(
341 instrument_id,
342 price_precision,
343 size_precision,
344 OrderSide::Buy,
345 level,
346 is_snapshot,
347 ts_event,
348 ts_init,
349 ) {
350 Ok(delta) => deltas.push(delta),
351 Err(e) => tracing::warn!("Skipping invalid bid level for {instrument_id}: {e}"),
352 }
353 }
354
355 for level in asks {
356 match parse_book_level(
357 instrument_id,
358 price_precision,
359 size_precision,
360 OrderSide::Sell,
361 level,
362 is_snapshot,
363 ts_event,
364 ts_init,
365 ) {
366 Ok(delta) => deltas.push(delta),
367 Err(e) => tracing::warn!("Skipping invalid ask level for {instrument_id}: {e}"),
368 }
369 }
370
371 if let Some(last_delta) = deltas.last_mut() {
372 last_delta.flags |= RecordFlag::F_LAST.value();
373 }
374
375 Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
377 instrument_id,
378 deltas,
379 )))
380}
381
382#[allow(clippy::too_many_arguments)]
388pub fn parse_book_level(
389 instrument_id: InstrumentId,
390 price_precision: u8,
391 size_precision: u8,
392 side: OrderSide,
393 level: BookLevel,
394 is_snapshot: bool,
395 ts_event: UnixNanos,
396 ts_init: UnixNanos,
397) -> anyhow::Result<OrderBookDelta> {
398 let amount = normalize_amount(level.amount, size_precision);
399 let action = parse_book_action(is_snapshot, amount);
400 let price = Price::new(level.price, price_precision);
401 let size = Quantity::new(amount, size_precision);
402 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
404 let flags = if is_snapshot {
405 RecordFlag::F_SNAPSHOT.value()
406 } else {
407 0
408 };
409 let sequence = 0; anyhow::ensure!(
412 !(action != BookAction::Delete && size.is_zero()),
413 "Invalid zero size for {action}"
414 );
415
416 Ok(OrderBookDelta::new(
417 instrument_id,
418 action,
419 order,
420 flags,
421 sequence,
422 ts_event,
423 ts_init,
424 ))
425}
426
427pub fn parse_book_snapshot_msg_as_quote(
434 msg: BookSnapshotMsg,
435 price_precision: u8,
436 size_precision: u8,
437 instrument_id: InstrumentId,
438) -> anyhow::Result<QuoteTick> {
439 let ts_event = UnixNanos::from(msg.timestamp);
440 let ts_init = UnixNanos::from(msg.local_timestamp);
441
442 let best_bid = msg
443 .bids
444 .first()
445 .context("missing best bid level for quote message")?;
446 let bid_price = Price::new(best_bid.price, price_precision);
447 let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
448 .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
449
450 let best_ask = msg
451 .asks
452 .first()
453 .context("missing best ask level for quote message")?;
454 let ask_price = Price::new(best_ask.price, price_precision);
455 let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
456 .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
457
458 Ok(QuoteTick::new(
459 instrument_id,
460 bid_price,
461 ask_price,
462 bid_size,
463 ask_size,
464 ts_event,
465 ts_init,
466 ))
467}
468
469pub fn parse_trade_msg(
476 msg: TradeMsg,
477 price_precision: u8,
478 size_precision: u8,
479 instrument_id: InstrumentId,
480) -> anyhow::Result<TradeTick> {
481 let price = Price::new(msg.price, price_precision);
482 let size = Quantity::non_zero_checked(msg.amount, size_precision)
483 .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
484 let aggressor_side = parse_aggressor_side(&msg.side);
485 let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
486 let ts_event = UnixNanos::from(msg.timestamp);
487 let ts_init = UnixNanos::from(msg.local_timestamp);
488
489 Ok(TradeTick::new(
490 instrument_id,
491 price,
492 size,
493 aggressor_side,
494 trade_id,
495 ts_event,
496 ts_init,
497 ))
498}
499
500pub fn parse_bar_msg(
506 msg: BarMsg,
507 price_precision: u8,
508 size_precision: u8,
509 instrument_id: InstrumentId,
510) -> anyhow::Result<Bar> {
511 let spec = parse_bar_spec(&msg.name)?;
512 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
513
514 let open = Price::new(msg.open, price_precision);
515 let high = Price::new(msg.high, price_precision);
516 let low = Price::new(msg.low, price_precision);
517 let close = Price::new(msg.close, price_precision);
518 let volume = Quantity::non_zero(msg.volume, size_precision);
519 let ts_event = UnixNanos::from(msg.timestamp);
520 let ts_init = UnixNanos::from(msg.local_timestamp);
521
522 Ok(Bar::new(
523 bar_type, open, high, low, close, volume, ts_event, ts_init,
524 ))
525}
526
527pub fn parse_derivative_ticker_msg(
533 msg: DerivativeTickerMsg,
534 instrument_id: InstrumentId,
535) -> anyhow::Result<Option<FundingRateUpdate>> {
536 let funding_rate = match msg.funding_rate {
538 Some(rate) => rate,
539 None => return Ok(None), };
541
542 let ts_event_nanos = msg
543 .timestamp
544 .timestamp_nanos_opt()
545 .context("invalid timestamp: cannot extract event nanoseconds")?;
546 anyhow::ensure!(
547 ts_event_nanos >= 0,
548 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
549 );
550 let ts_event = UnixNanos::from(ts_event_nanos as u64);
551
552 let ts_init_nanos = msg
553 .local_timestamp
554 .timestamp_nanos_opt()
555 .context("invalid timestamp: cannot extract init nanoseconds")?;
556 anyhow::ensure!(
557 ts_init_nanos >= 0,
558 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
559 );
560 let ts_init = UnixNanos::from(ts_init_nanos as u64);
561
562 let rate = rust_decimal::Decimal::try_from(funding_rate)
563 .with_context(|| format!("Failed to convert funding rate {funding_rate} to Decimal"))?
564 .normalize();
565
566 let next_funding_ns = None;
568
569 Ok(Some(FundingRateUpdate::new(
570 instrument_id,
571 rate,
572 next_funding_ns,
573 ts_event,
574 ts_init,
575 )))
576}
577
578#[cfg(test)]
579mod tests {
580 use nautilus_model::enums::AggressorSide;
581 use rstest::rstest;
582
583 use super::*;
584 use crate::{enums::TardisExchange, tests::load_test_json};
585
586 #[rstest]
587 fn test_parse_book_change_message() {
588 let json_data = load_test_json("book_change.json");
589 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
590
591 let price_precision = 0;
592 let size_precision = 0;
593 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
594 let deltas =
595 parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
596 .unwrap();
597
598 assert_eq!(deltas.deltas.len(), 1);
599 assert_eq!(deltas.instrument_id, instrument_id);
600 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
601 assert_eq!(deltas.sequence, 0);
602 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
603 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
604 assert_eq!(
605 deltas.deltas[0].instrument_id,
606 InstrumentId::from("XBTUSD.BITMEX")
607 );
608 assert_eq!(deltas.deltas[0].action, BookAction::Update);
609 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
610 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
611 assert_eq!(deltas.deltas[0].order.order_id, 0);
612 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
613 assert_eq!(deltas.deltas[0].sequence, 0);
614 assert_eq!(
615 deltas.deltas[0].ts_event,
616 UnixNanos::from(1571830193469000000)
617 );
618 assert_eq!(
619 deltas.deltas[0].ts_init,
620 UnixNanos::from(1571830193469000000)
621 );
622 }
623
624 #[rstest]
625 fn test_parse_book_snapshot_message_as_deltas() {
626 let json_data = load_test_json("book_snapshot.json");
627 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
628
629 let price_precision = 1;
630 let size_precision = 0;
631 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
632 let deltas =
633 parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
634 .unwrap();
635
636 let clear_delta = deltas.deltas[0];
637 let bid_delta = deltas.deltas[1];
638 let ask_delta = deltas.deltas[3];
639
640 assert_eq!(deltas.deltas.len(), 5);
641 assert_eq!(deltas.instrument_id, instrument_id);
642 assert_eq!(
643 deltas.flags,
644 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
645 );
646 assert_eq!(deltas.sequence, 0);
647 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
648 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
649
650 assert_eq!(clear_delta.instrument_id, instrument_id);
652 assert_eq!(clear_delta.action, BookAction::Clear);
653 assert_eq!(clear_delta.flags, RecordFlag::F_SNAPSHOT.value());
654 assert_eq!(clear_delta.sequence, 0);
655 assert_eq!(clear_delta.ts_event, UnixNanos::from(1572010786950000000));
656 assert_eq!(clear_delta.ts_init, UnixNanos::from(1572010786961000000));
657
658 assert_eq!(bid_delta.instrument_id, instrument_id);
660 assert_eq!(bid_delta.action, BookAction::Add);
661 assert_eq!(bid_delta.order.side, OrderSide::Buy);
662 assert_eq!(bid_delta.order.price, Price::from("7633.5"));
663 assert_eq!(bid_delta.order.size, Quantity::from(1906067));
664 assert_eq!(bid_delta.order.order_id, 0);
665 assert_eq!(bid_delta.flags, RecordFlag::F_SNAPSHOT.value());
666 assert_eq!(bid_delta.sequence, 0);
667 assert_eq!(bid_delta.ts_event, UnixNanos::from(1572010786950000000));
668 assert_eq!(bid_delta.ts_init, UnixNanos::from(1572010786961000000));
669
670 assert_eq!(ask_delta.instrument_id, instrument_id);
672 assert_eq!(ask_delta.action, BookAction::Add);
673 assert_eq!(ask_delta.order.side, OrderSide::Sell);
674 assert_eq!(ask_delta.order.price, Price::from("7634.0"));
675 assert_eq!(ask_delta.order.size, Quantity::from(1467849));
676 assert_eq!(ask_delta.order.order_id, 0);
677 assert_eq!(ask_delta.flags, RecordFlag::F_SNAPSHOT.value());
678 assert_eq!(ask_delta.sequence, 0);
679 assert_eq!(ask_delta.ts_event, UnixNanos::from(1572010786950000000));
680 assert_eq!(ask_delta.ts_init, UnixNanos::from(1572010786961000000));
681 }
682
683 #[rstest]
684 fn test_parse_book_snapshot_message_as_depth10() {
685 let json_data = load_test_json("book_snapshot.json");
686 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
687
688 let price_precision = 1;
689 let size_precision = 0;
690 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
691
692 let depth10 =
693 parse_book_snapshot_msg_as_depth10(msg, price_precision, size_precision, instrument_id)
694 .unwrap();
695
696 assert_eq!(depth10.instrument_id, instrument_id);
697 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT.value());
698 assert_eq!(depth10.sequence, 0);
699 assert_eq!(depth10.ts_event, UnixNanos::from(1572010786950000000));
700 assert_eq!(depth10.ts_init, UnixNanos::from(1572010786961000000));
701
702 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
704 assert_eq!(depth10.bids[0].price, Price::from("7633.5"));
705 assert_eq!(depth10.bids[0].size, Quantity::from(1906067));
706 assert_eq!(depth10.bids[0].order_id, 0);
707 assert_eq!(depth10.bid_counts[0], 1);
708
709 assert_eq!(depth10.bids[1].side, OrderSide::Buy);
711 assert_eq!(depth10.bids[1].price, Price::from("7633.0"));
712 assert_eq!(depth10.bids[1].size, Quantity::from(65319));
713 assert_eq!(depth10.bid_counts[1], 1);
714
715 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
717 assert_eq!(depth10.asks[0].price, Price::from("7634.0"));
718 assert_eq!(depth10.asks[0].size, Quantity::from(1467849));
719 assert_eq!(depth10.asks[0].order_id, 0);
720 assert_eq!(depth10.ask_counts[0], 1);
721
722 assert_eq!(depth10.asks[1].side, OrderSide::Sell);
724 assert_eq!(depth10.asks[1].price, Price::from("7634.5"));
725 assert_eq!(depth10.asks[1].size, Quantity::from(67939));
726 assert_eq!(depth10.ask_counts[1], 1);
727
728 assert_eq!(depth10.bids[2], NULL_ORDER);
730 assert_eq!(depth10.bid_counts[2], 0);
731 assert_eq!(depth10.asks[2], NULL_ORDER);
732 assert_eq!(depth10.ask_counts[2], 0);
733 }
734
735 #[rstest]
736 fn test_parse_book_snapshot_message_as_quote() {
737 let json_data = load_test_json("book_snapshot.json");
738 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
739
740 let price_precision = 1;
741 let size_precision = 0;
742 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
743 let quote =
744 parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
745 .expect("Failed to parse book snapshot quote message");
746
747 assert_eq!(quote.instrument_id, instrument_id);
748 assert_eq!(quote.bid_price, Price::from("7633.5"));
749 assert_eq!(quote.bid_size, Quantity::from(1906067));
750 assert_eq!(quote.ask_price, Price::from("7634.0"));
751 assert_eq!(quote.ask_size, Quantity::from(1467849));
752 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
753 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
754 }
755
756 #[rstest]
757 fn test_parse_trade_message() {
758 let json_data = load_test_json("trade.json");
759 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
760
761 let price_precision = 0;
762 let size_precision = 0;
763 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
764 let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
765 .expect("Failed to parse trade message");
766
767 assert_eq!(trade.instrument_id, instrument_id);
768 assert_eq!(trade.price, Price::from("7996"));
769 assert_eq!(trade.size, Quantity::from(50));
770 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
771 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
772 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
773 }
774
775 #[rstest]
776 fn test_parse_bar_message() {
777 let json_data = load_test_json("bar.json");
778 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
779
780 let price_precision = 1;
781 let size_precision = 0;
782 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
783 let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id).unwrap();
784
785 assert_eq!(
786 bar.bar_type,
787 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
788 );
789 assert_eq!(bar.open, Price::from("7623.5"));
790 assert_eq!(bar.high, Price::from("7623.5"));
791 assert_eq!(bar.low, Price::from("7623"));
792 assert_eq!(bar.close, Price::from("7623.5"));
793 assert_eq!(bar.volume, Quantity::from(37034));
794 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
795 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
796 }
797
798 #[rstest]
799 fn test_parse_tardis_ws_message_book_snapshot_routes_to_depth10() {
800 let json_data = load_test_json("book_snapshot.json");
801 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
802 let ws_msg = WsMessage::BookSnapshot(msg);
803
804 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
805 let info = Arc::new(TardisInstrumentMiniInfo::new(
806 instrument_id,
807 None,
808 TardisExchange::Bitmex,
809 1,
810 0,
811 ));
812
813 let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Depth10);
814
815 assert!(result.is_some());
816 assert!(matches!(result.unwrap(), Data::Depth10(_)));
817 }
818
819 #[rstest]
820 fn test_parse_tardis_ws_message_book_snapshot_routes_to_deltas() {
821 let json_data = load_test_json("book_snapshot.json");
822 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
823 let ws_msg = WsMessage::BookSnapshot(msg);
824
825 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
826 let info = Arc::new(TardisInstrumentMiniInfo::new(
827 instrument_id,
828 None,
829 TardisExchange::Bitmex,
830 1,
831 0,
832 ));
833
834 let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Deltas);
835
836 assert!(result.is_some());
837 assert!(matches!(result.unwrap(), Data::Deltas(_)));
838 }
839}