1use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 Bar, BarType, BookOrder, DEPTH10_LEN, Data, FundingRateUpdate, NULL_ORDER, OrderBookDelta,
24 OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
25 },
26 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27 identifiers::{InstrumentId, TradeId},
28 types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33 message::{
34 BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35 },
36 types::TardisInstrumentMiniInfo,
37};
38use crate::{
39 config::BookSnapshotOutput,
40 parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action},
41};
42
43#[must_use]
44pub fn parse_tardis_ws_message(
45 msg: WsMessage,
46 info: Arc<TardisInstrumentMiniInfo>,
47 book_snapshot_output: &BookSnapshotOutput,
48) -> Option<Data> {
49 match msg {
50 WsMessage::BookChange(msg) => {
51 if msg.bids.is_empty() && msg.asks.is_empty() {
52 let exchange = msg.exchange;
53 let symbol = &msg.symbol;
54 log::error!("Invalid book change for {exchange} {symbol} (empty bids and asks)");
55 return None;
56 }
57
58 match parse_book_change_msg_as_deltas(
59 msg,
60 info.price_precision,
61 info.size_precision,
62 info.instrument_id,
63 ) {
64 Ok(deltas) => Some(Data::Deltas(deltas)),
65 Err(e) => {
66 log::error!("Failed to parse book change message: {e}");
67 None
68 }
69 }
70 }
71 WsMessage::BookSnapshot(msg) => match msg.bids.len() {
72 1 => {
73 match parse_book_snapshot_msg_as_quote(
74 msg,
75 info.price_precision,
76 info.size_precision,
77 info.instrument_id,
78 ) {
79 Ok(quote) => Some(Data::Quote(quote)),
80 Err(e) => {
81 log::error!("Failed to parse book snapshot quote message: {e}");
82 None
83 }
84 }
85 }
86 _ => match book_snapshot_output {
87 BookSnapshotOutput::Depth10 => {
88 match parse_book_snapshot_msg_as_depth10(
89 msg,
90 info.price_precision,
91 info.size_precision,
92 info.instrument_id,
93 ) {
94 Ok(depth10) => Some(Data::Depth10(Box::new(depth10))),
95 Err(e) => {
96 log::error!("Failed to parse book snapshot as depth10: {e}");
97 None
98 }
99 }
100 }
101 BookSnapshotOutput::Deltas => {
102 match parse_book_snapshot_msg_as_deltas(
103 msg,
104 info.price_precision,
105 info.size_precision,
106 info.instrument_id,
107 ) {
108 Ok(deltas) => Some(Data::Deltas(deltas)),
109 Err(e) => {
110 log::error!("Failed to parse book snapshot as deltas: {e}");
111 None
112 }
113 }
114 }
115 },
116 },
117 WsMessage::Trade(msg) => {
118 match parse_trade_msg(
119 msg,
120 info.price_precision,
121 info.size_precision,
122 info.instrument_id,
123 ) {
124 Ok(trade) => Some(Data::Trade(trade)),
125 Err(e) => {
126 log::error!("Failed to parse trade message: {e}");
127 None
128 }
129 }
130 }
131 WsMessage::TradeBar(msg) => {
132 match parse_bar_msg(
133 msg,
134 info.price_precision,
135 info.size_precision,
136 info.instrument_id,
137 ) {
138 Ok(bar) => Some(Data::Bar(bar)),
139 Err(e) => {
140 log::error!("Failed to parse bar message: {e}");
141 None
142 }
143 }
144 }
145 WsMessage::DerivativeTicker(_) => None,
148 WsMessage::Disconnect(_) => None,
149 }
150}
151
152#[must_use]
155pub fn parse_tardis_ws_message_funding_rate(
156 msg: WsMessage,
157 info: Arc<TardisInstrumentMiniInfo>,
158) -> Option<FundingRateUpdate> {
159 match msg {
160 WsMessage::DerivativeTicker(msg) => {
161 match parse_derivative_ticker_msg(msg, info.instrument_id) {
162 Ok(funding_rate) => funding_rate,
163 Err(e) => {
164 log::error!("Failed to parse derivative ticker message for funding rate: {e}");
165 None
166 }
167 }
168 }
169 _ => None, }
171}
172
173pub fn parse_book_change_msg_as_deltas(
180 msg: BookChangeMsg,
181 price_precision: u8,
182 size_precision: u8,
183 instrument_id: InstrumentId,
184) -> anyhow::Result<OrderBookDeltas_API> {
185 parse_book_msg_as_deltas(
186 msg.bids,
187 msg.asks,
188 msg.is_snapshot,
189 price_precision,
190 size_precision,
191 instrument_id,
192 msg.timestamp,
193 msg.local_timestamp,
194 )
195}
196
197pub fn parse_book_snapshot_msg_as_deltas(
204 msg: BookSnapshotMsg,
205 price_precision: u8,
206 size_precision: u8,
207 instrument_id: InstrumentId,
208) -> anyhow::Result<OrderBookDeltas_API> {
209 parse_book_msg_as_deltas(
210 msg.bids,
211 msg.asks,
212 true,
213 price_precision,
214 size_precision,
215 instrument_id,
216 msg.timestamp,
217 msg.local_timestamp,
218 )
219}
220
221pub fn parse_book_snapshot_msg_as_depth10(
227 msg: BookSnapshotMsg,
228 price_precision: u8,
229 size_precision: u8,
230 instrument_id: InstrumentId,
231) -> anyhow::Result<OrderBookDepth10> {
232 let ts_event_nanos = msg
233 .timestamp
234 .timestamp_nanos_opt()
235 .context("invalid timestamp: cannot extract event nanoseconds")?;
236 anyhow::ensure!(
237 ts_event_nanos >= 0,
238 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
239 );
240 let ts_event = UnixNanos::from(ts_event_nanos as u64);
241
242 let ts_init_nanos = msg
243 .local_timestamp
244 .timestamp_nanos_opt()
245 .context("invalid timestamp: cannot extract init nanoseconds")?;
246 anyhow::ensure!(
247 ts_init_nanos >= 0,
248 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
249 );
250 let ts_init = UnixNanos::from(ts_init_nanos as u64);
251
252 let mut bids = [NULL_ORDER; DEPTH10_LEN];
253 let mut asks = [NULL_ORDER; DEPTH10_LEN];
254 let mut bid_counts = [0u32; DEPTH10_LEN];
255 let mut ask_counts = [0u32; DEPTH10_LEN];
256
257 for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
258 bids[i] = BookOrder::new(
259 OrderSide::Buy,
260 Price::new(level.price, price_precision),
261 Quantity::new(level.amount, size_precision),
262 0,
263 );
264 bid_counts[i] = 1;
265 }
266
267 for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
268 asks[i] = BookOrder::new(
269 OrderSide::Sell,
270 Price::new(level.price, price_precision),
271 Quantity::new(level.amount, size_precision),
272 0,
273 );
274 ask_counts[i] = 1;
275 }
276
277 Ok(OrderBookDepth10::new(
278 instrument_id,
279 bids,
280 asks,
281 bid_counts,
282 ask_counts,
283 RecordFlag::F_SNAPSHOT.value(),
284 0, ts_event,
286 ts_init,
287 ))
288}
289
290#[allow(clippy::too_many_arguments)]
292pub fn parse_book_msg_as_deltas(
298 bids: Vec<BookLevel>,
299 asks: Vec<BookLevel>,
300 is_snapshot: bool,
301 price_precision: u8,
302 size_precision: u8,
303 instrument_id: InstrumentId,
304 timestamp: DateTime<Utc>,
305 local_timestamp: DateTime<Utc>,
306) -> anyhow::Result<OrderBookDeltas_API> {
307 let event_nanos = timestamp
308 .timestamp_nanos_opt()
309 .context("invalid timestamp: cannot extract event nanoseconds")?;
310 anyhow::ensure!(
311 event_nanos >= 0,
312 "invalid timestamp: event nanoseconds {event_nanos} is before UNIX epoch"
313 );
314 let ts_event = UnixNanos::from(event_nanos as u64);
315 let init_nanos = local_timestamp
316 .timestamp_nanos_opt()
317 .context("invalid timestamp: cannot extract init nanoseconds")?;
318 anyhow::ensure!(
319 init_nanos >= 0,
320 "invalid timestamp: init nanoseconds {init_nanos} is before UNIX epoch"
321 );
322 let ts_init = UnixNanos::from(init_nanos as u64);
323
324 let capacity = if is_snapshot {
325 bids.len() + asks.len() + 1
326 } else {
327 bids.len() + asks.len()
328 };
329 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(capacity);
330
331 if is_snapshot {
332 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
333 }
334
335 for level in bids {
336 match parse_book_level(
337 instrument_id,
338 price_precision,
339 size_precision,
340 OrderSide::Buy,
341 level,
342 is_snapshot,
343 ts_event,
344 ts_init,
345 ) {
346 Ok(delta) => deltas.push(delta),
347 Err(e) => log::warn!("Skipping invalid bid level for {instrument_id}: {e}"),
348 }
349 }
350
351 for level in asks {
352 match parse_book_level(
353 instrument_id,
354 price_precision,
355 size_precision,
356 OrderSide::Sell,
357 level,
358 is_snapshot,
359 ts_event,
360 ts_init,
361 ) {
362 Ok(delta) => deltas.push(delta),
363 Err(e) => log::warn!("Skipping invalid ask level for {instrument_id}: {e}"),
364 }
365 }
366
367 if let Some(last_delta) = deltas.last_mut() {
368 last_delta.flags |= RecordFlag::F_LAST.value();
369 }
370
371 Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
373 instrument_id,
374 deltas,
375 )))
376}
377
378#[allow(clippy::too_many_arguments)]
384pub fn parse_book_level(
385 instrument_id: InstrumentId,
386 price_precision: u8,
387 size_precision: u8,
388 side: OrderSide,
389 level: BookLevel,
390 is_snapshot: bool,
391 ts_event: UnixNanos,
392 ts_init: UnixNanos,
393) -> anyhow::Result<OrderBookDelta> {
394 let amount = normalize_amount(level.amount, size_precision);
395 let action = parse_book_action(is_snapshot, amount);
396 let price = Price::new(level.price, price_precision);
397 let size = Quantity::new(amount, size_precision);
398 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
400 let flags = if is_snapshot {
401 RecordFlag::F_SNAPSHOT.value()
402 } else {
403 0
404 };
405 let sequence = 0; anyhow::ensure!(
408 !(action != BookAction::Delete && size.is_zero()),
409 "Invalid zero size for {action}"
410 );
411
412 Ok(OrderBookDelta::new(
413 instrument_id,
414 action,
415 order,
416 flags,
417 sequence,
418 ts_event,
419 ts_init,
420 ))
421}
422
423pub fn parse_book_snapshot_msg_as_quote(
430 msg: BookSnapshotMsg,
431 price_precision: u8,
432 size_precision: u8,
433 instrument_id: InstrumentId,
434) -> anyhow::Result<QuoteTick> {
435 let ts_event = UnixNanos::from(msg.timestamp);
436 let ts_init = UnixNanos::from(msg.local_timestamp);
437
438 let best_bid = msg
439 .bids
440 .first()
441 .context("missing best bid level for quote message")?;
442 let bid_price = Price::new(best_bid.price, price_precision);
443 let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
444 .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
445
446 let best_ask = msg
447 .asks
448 .first()
449 .context("missing best ask level for quote message")?;
450 let ask_price = Price::new(best_ask.price, price_precision);
451 let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
452 .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
453
454 Ok(QuoteTick::new(
455 instrument_id,
456 bid_price,
457 ask_price,
458 bid_size,
459 ask_size,
460 ts_event,
461 ts_init,
462 ))
463}
464
465pub fn parse_trade_msg(
472 msg: TradeMsg,
473 price_precision: u8,
474 size_precision: u8,
475 instrument_id: InstrumentId,
476) -> anyhow::Result<TradeTick> {
477 let price = Price::new(msg.price, price_precision);
478 let size = Quantity::non_zero_checked(msg.amount, size_precision)
479 .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
480 let aggressor_side = parse_aggressor_side(&msg.side);
481 let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
482 let ts_event = UnixNanos::from(msg.timestamp);
483 let ts_init = UnixNanos::from(msg.local_timestamp);
484
485 Ok(TradeTick::new(
486 instrument_id,
487 price,
488 size,
489 aggressor_side,
490 trade_id,
491 ts_event,
492 ts_init,
493 ))
494}
495
496pub fn parse_bar_msg(
502 msg: BarMsg,
503 price_precision: u8,
504 size_precision: u8,
505 instrument_id: InstrumentId,
506) -> anyhow::Result<Bar> {
507 let spec = parse_bar_spec(&msg.name)?;
508 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
509
510 let open = Price::new(msg.open, price_precision);
511 let high = Price::new(msg.high, price_precision);
512 let low = Price::new(msg.low, price_precision);
513 let close = Price::new(msg.close, price_precision);
514 let volume = Quantity::non_zero(msg.volume, size_precision);
515 let ts_event = UnixNanos::from(msg.timestamp);
516 let ts_init = UnixNanos::from(msg.local_timestamp);
517
518 Ok(Bar::new(
519 bar_type, open, high, low, close, volume, ts_event, ts_init,
520 ))
521}
522
523pub fn parse_derivative_ticker_msg(
529 msg: DerivativeTickerMsg,
530 instrument_id: InstrumentId,
531) -> anyhow::Result<Option<FundingRateUpdate>> {
532 let funding_rate = match msg.funding_rate {
534 Some(rate) => rate,
535 None => return Ok(None), };
537
538 let ts_event_nanos = msg
539 .timestamp
540 .timestamp_nanos_opt()
541 .context("invalid timestamp: cannot extract event nanoseconds")?;
542 anyhow::ensure!(
543 ts_event_nanos >= 0,
544 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
545 );
546 let ts_event = UnixNanos::from(ts_event_nanos as u64);
547
548 let ts_init_nanos = msg
549 .local_timestamp
550 .timestamp_nanos_opt()
551 .context("invalid timestamp: cannot extract init nanoseconds")?;
552 anyhow::ensure!(
553 ts_init_nanos >= 0,
554 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
555 );
556 let ts_init = UnixNanos::from(ts_init_nanos as u64);
557
558 let rate = rust_decimal::Decimal::try_from(funding_rate)
559 .with_context(|| format!("Failed to convert funding rate {funding_rate} to Decimal"))?;
560
561 let next_funding_ns = None;
563
564 Ok(Some(FundingRateUpdate::new(
565 instrument_id,
566 rate,
567 next_funding_ns,
568 ts_event,
569 ts_init,
570 )))
571}
572
573#[cfg(test)]
574mod tests {
575 use nautilus_model::enums::AggressorSide;
576 use rstest::rstest;
577
578 use super::*;
579 use crate::{common::testing::load_test_json, enums::TardisExchange};
580
581 #[rstest]
582 fn test_parse_book_change_message() {
583 let json_data = load_test_json("book_change.json");
584 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
585
586 let price_precision = 0;
587 let size_precision = 0;
588 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
589 let deltas =
590 parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
591 .unwrap();
592
593 assert_eq!(deltas.deltas.len(), 1);
594 assert_eq!(deltas.instrument_id, instrument_id);
595 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
596 assert_eq!(deltas.sequence, 0);
597 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
598 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
599 assert_eq!(
600 deltas.deltas[0].instrument_id,
601 InstrumentId::from("XBTUSD.BITMEX")
602 );
603 assert_eq!(deltas.deltas[0].action, BookAction::Update);
604 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
605 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
606 assert_eq!(deltas.deltas[0].order.order_id, 0);
607 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
608 assert_eq!(deltas.deltas[0].sequence, 0);
609 assert_eq!(
610 deltas.deltas[0].ts_event,
611 UnixNanos::from(1571830193469000000)
612 );
613 assert_eq!(
614 deltas.deltas[0].ts_init,
615 UnixNanos::from(1571830193469000000)
616 );
617 }
618
619 #[rstest]
620 fn test_parse_book_snapshot_message_as_deltas() {
621 let json_data = load_test_json("book_snapshot.json");
622 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
623
624 let price_precision = 1;
625 let size_precision = 0;
626 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
627 let deltas =
628 parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
629 .unwrap();
630
631 let clear_delta = deltas.deltas[0];
632 let bid_delta = deltas.deltas[1];
633 let ask_delta = deltas.deltas[3];
634
635 assert_eq!(deltas.deltas.len(), 5);
636 assert_eq!(deltas.instrument_id, instrument_id);
637 assert_eq!(
638 deltas.flags,
639 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
640 );
641 assert_eq!(deltas.sequence, 0);
642 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
643 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
644
645 assert_eq!(clear_delta.instrument_id, instrument_id);
647 assert_eq!(clear_delta.action, BookAction::Clear);
648 assert_eq!(clear_delta.flags, RecordFlag::F_SNAPSHOT.value());
649 assert_eq!(clear_delta.sequence, 0);
650 assert_eq!(clear_delta.ts_event, UnixNanos::from(1572010786950000000));
651 assert_eq!(clear_delta.ts_init, UnixNanos::from(1572010786961000000));
652
653 assert_eq!(bid_delta.instrument_id, instrument_id);
655 assert_eq!(bid_delta.action, BookAction::Add);
656 assert_eq!(bid_delta.order.side, OrderSide::Buy);
657 assert_eq!(bid_delta.order.price, Price::from("7633.5"));
658 assert_eq!(bid_delta.order.size, Quantity::from(1906067));
659 assert_eq!(bid_delta.order.order_id, 0);
660 assert_eq!(bid_delta.flags, RecordFlag::F_SNAPSHOT.value());
661 assert_eq!(bid_delta.sequence, 0);
662 assert_eq!(bid_delta.ts_event, UnixNanos::from(1572010786950000000));
663 assert_eq!(bid_delta.ts_init, UnixNanos::from(1572010786961000000));
664
665 assert_eq!(ask_delta.instrument_id, instrument_id);
667 assert_eq!(ask_delta.action, BookAction::Add);
668 assert_eq!(ask_delta.order.side, OrderSide::Sell);
669 assert_eq!(ask_delta.order.price, Price::from("7634.0"));
670 assert_eq!(ask_delta.order.size, Quantity::from(1467849));
671 assert_eq!(ask_delta.order.order_id, 0);
672 assert_eq!(ask_delta.flags, RecordFlag::F_SNAPSHOT.value());
673 assert_eq!(ask_delta.sequence, 0);
674 assert_eq!(ask_delta.ts_event, UnixNanos::from(1572010786950000000));
675 assert_eq!(ask_delta.ts_init, UnixNanos::from(1572010786961000000));
676 }
677
678 #[rstest]
679 fn test_parse_book_snapshot_message_as_depth10() {
680 let json_data = load_test_json("book_snapshot.json");
681 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
682
683 let price_precision = 1;
684 let size_precision = 0;
685 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
686
687 let depth10 =
688 parse_book_snapshot_msg_as_depth10(msg, price_precision, size_precision, instrument_id)
689 .unwrap();
690
691 assert_eq!(depth10.instrument_id, instrument_id);
692 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT.value());
693 assert_eq!(depth10.sequence, 0);
694 assert_eq!(depth10.ts_event, UnixNanos::from(1572010786950000000));
695 assert_eq!(depth10.ts_init, UnixNanos::from(1572010786961000000));
696
697 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
699 assert_eq!(depth10.bids[0].price, Price::from("7633.5"));
700 assert_eq!(depth10.bids[0].size, Quantity::from(1906067));
701 assert_eq!(depth10.bids[0].order_id, 0);
702 assert_eq!(depth10.bid_counts[0], 1);
703
704 assert_eq!(depth10.bids[1].side, OrderSide::Buy);
706 assert_eq!(depth10.bids[1].price, Price::from("7633.0"));
707 assert_eq!(depth10.bids[1].size, Quantity::from(65319));
708 assert_eq!(depth10.bid_counts[1], 1);
709
710 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
712 assert_eq!(depth10.asks[0].price, Price::from("7634.0"));
713 assert_eq!(depth10.asks[0].size, Quantity::from(1467849));
714 assert_eq!(depth10.asks[0].order_id, 0);
715 assert_eq!(depth10.ask_counts[0], 1);
716
717 assert_eq!(depth10.asks[1].side, OrderSide::Sell);
719 assert_eq!(depth10.asks[1].price, Price::from("7634.5"));
720 assert_eq!(depth10.asks[1].size, Quantity::from(67939));
721 assert_eq!(depth10.ask_counts[1], 1);
722
723 assert_eq!(depth10.bids[2], NULL_ORDER);
725 assert_eq!(depth10.bid_counts[2], 0);
726 assert_eq!(depth10.asks[2], NULL_ORDER);
727 assert_eq!(depth10.ask_counts[2], 0);
728 }
729
730 #[rstest]
731 fn test_parse_book_snapshot_message_as_quote() {
732 let json_data = load_test_json("book_snapshot.json");
733 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
734
735 let price_precision = 1;
736 let size_precision = 0;
737 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
738 let quote =
739 parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
740 .expect("Failed to parse book snapshot quote message");
741
742 assert_eq!(quote.instrument_id, instrument_id);
743 assert_eq!(quote.bid_price, Price::from("7633.5"));
744 assert_eq!(quote.bid_size, Quantity::from(1906067));
745 assert_eq!(quote.ask_price, Price::from("7634.0"));
746 assert_eq!(quote.ask_size, Quantity::from(1467849));
747 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
748 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
749 }
750
751 #[rstest]
752 fn test_parse_trade_message() {
753 let json_data = load_test_json("trade.json");
754 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
755
756 let price_precision = 0;
757 let size_precision = 0;
758 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
759 let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
760 .expect("Failed to parse trade message");
761
762 assert_eq!(trade.instrument_id, instrument_id);
763 assert_eq!(trade.price, Price::from("7996"));
764 assert_eq!(trade.size, Quantity::from(50));
765 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
766 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
767 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
768 }
769
770 #[rstest]
771 fn test_parse_bar_message() {
772 let json_data = load_test_json("bar.json");
773 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
774
775 let price_precision = 1;
776 let size_precision = 0;
777 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
778 let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id).unwrap();
779
780 assert_eq!(
781 bar.bar_type,
782 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
783 );
784 assert_eq!(bar.open, Price::from("7623.5"));
785 assert_eq!(bar.high, Price::from("7623.5"));
786 assert_eq!(bar.low, Price::from("7623"));
787 assert_eq!(bar.close, Price::from("7623.5"));
788 assert_eq!(bar.volume, Quantity::from(37034));
789 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
790 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
791 }
792
793 #[rstest]
794 fn test_parse_tardis_ws_message_book_snapshot_routes_to_depth10() {
795 let json_data = load_test_json("book_snapshot.json");
796 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
797 let ws_msg = WsMessage::BookSnapshot(msg);
798
799 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
800 let info = Arc::new(TardisInstrumentMiniInfo::new(
801 instrument_id,
802 None,
803 TardisExchange::Bitmex,
804 1,
805 0,
806 ));
807
808 let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Depth10);
809
810 assert!(result.is_some());
811 assert!(matches!(result.unwrap(), Data::Depth10(_)));
812 }
813
814 #[rstest]
815 fn test_parse_tardis_ws_message_book_snapshot_routes_to_deltas() {
816 let json_data = load_test_json("book_snapshot.json");
817 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
818 let ws_msg = WsMessage::BookSnapshot(msg);
819
820 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
821 let info = Arc::new(TardisInstrumentMiniInfo::new(
822 instrument_id,
823 None,
824 TardisExchange::Bitmex,
825 1,
826 0,
827 ));
828
829 let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Deltas);
830
831 assert!(result.is_some());
832 assert!(matches!(result.unwrap(), Data::Deltas(_)));
833 }
834}