1use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 Bar, BarType, BookOrder, DEPTH10_LEN, Data, FundingRateUpdate, NULL_ORDER, OrderBookDelta,
24 OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
25 },
26 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27 identifiers::{InstrumentId, TradeId},
28 types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33 message::{
34 BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35 },
36 types::TardisInstrumentMiniInfo,
37};
38use crate::{
39 config::BookSnapshotOutput,
40 parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action},
41};
42
43#[must_use]
44pub fn parse_tardis_ws_message(
45 msg: WsMessage,
46 info: Arc<TardisInstrumentMiniInfo>,
47 book_snapshot_output: &BookSnapshotOutput,
48) -> Option<Data> {
49 match msg {
50 WsMessage::BookChange(msg) => {
51 if msg.bids.is_empty() && msg.asks.is_empty() {
52 let exchange = msg.exchange;
53 let symbol = &msg.symbol;
54 log::error!("Invalid book change for {exchange} {symbol} (empty bids and asks)");
55 return None;
56 }
57
58 match parse_book_change_msg_as_deltas(
59 msg,
60 info.price_precision,
61 info.size_precision,
62 info.instrument_id,
63 ) {
64 Ok(deltas) => Some(Data::Deltas(deltas)),
65 Err(e) => {
66 log::error!("Failed to parse book change message: {e}");
67 None
68 }
69 }
70 }
71 WsMessage::BookSnapshot(msg) => match msg.bids.len() {
72 1 => {
73 match parse_book_snapshot_msg_as_quote(
74 msg,
75 info.price_precision,
76 info.size_precision,
77 info.instrument_id,
78 ) {
79 Ok(quote) => Some(Data::Quote(quote)),
80 Err(e) => {
81 log::error!("Failed to parse book snapshot quote message: {e}");
82 None
83 }
84 }
85 }
86 _ => match book_snapshot_output {
87 BookSnapshotOutput::Depth10 => {
88 match parse_book_snapshot_msg_as_depth10(
89 msg,
90 info.price_precision,
91 info.size_precision,
92 info.instrument_id,
93 ) {
94 Ok(depth10) => Some(Data::Depth10(Box::new(depth10))),
95 Err(e) => {
96 log::error!("Failed to parse book snapshot as depth10: {e}");
97 None
98 }
99 }
100 }
101 BookSnapshotOutput::Deltas => {
102 match parse_book_snapshot_msg_as_deltas(
103 msg,
104 info.price_precision,
105 info.size_precision,
106 info.instrument_id,
107 ) {
108 Ok(deltas) => Some(Data::Deltas(deltas)),
109 Err(e) => {
110 log::error!("Failed to parse book snapshot as deltas: {e}");
111 None
112 }
113 }
114 }
115 },
116 },
117 WsMessage::Trade(msg) => {
118 match parse_trade_msg(
119 msg,
120 info.price_precision,
121 info.size_precision,
122 info.instrument_id,
123 ) {
124 Ok(trade) => Some(Data::Trade(trade)),
125 Err(e) => {
126 log::error!("Failed to parse trade message: {e}");
127 None
128 }
129 }
130 }
131 WsMessage::TradeBar(msg) => {
132 match parse_bar_msg(
133 msg,
134 info.price_precision,
135 info.size_precision,
136 info.instrument_id,
137 ) {
138 Ok(bar) => Some(Data::Bar(bar)),
139 Err(e) => {
140 log::error!("Failed to parse bar message: {e}");
141 None
142 }
143 }
144 }
145 WsMessage::DerivativeTicker(_) => None,
148 WsMessage::Disconnect(_) => None,
149 }
150}
151
152#[must_use]
155pub fn parse_tardis_ws_message_funding_rate(
156 msg: WsMessage,
157 info: Arc<TardisInstrumentMiniInfo>,
158) -> Option<FundingRateUpdate> {
159 match msg {
160 WsMessage::DerivativeTicker(msg) => {
161 match parse_derivative_ticker_msg(msg, info.instrument_id) {
162 Ok(funding_rate) => funding_rate,
163 Err(e) => {
164 log::error!("Failed to parse derivative ticker message for funding rate: {e}");
165 None
166 }
167 }
168 }
169 _ => None, }
171}
172
173pub fn parse_book_change_msg_as_deltas(
180 msg: BookChangeMsg,
181 price_precision: u8,
182 size_precision: u8,
183 instrument_id: InstrumentId,
184) -> anyhow::Result<OrderBookDeltas_API> {
185 parse_book_msg_as_deltas(
186 msg.bids,
187 msg.asks,
188 msg.is_snapshot,
189 price_precision,
190 size_precision,
191 instrument_id,
192 msg.timestamp,
193 msg.local_timestamp,
194 )
195}
196
197pub fn parse_book_snapshot_msg_as_deltas(
204 msg: BookSnapshotMsg,
205 price_precision: u8,
206 size_precision: u8,
207 instrument_id: InstrumentId,
208) -> anyhow::Result<OrderBookDeltas_API> {
209 parse_book_msg_as_deltas(
210 msg.bids,
211 msg.asks,
212 true,
213 price_precision,
214 size_precision,
215 instrument_id,
216 msg.timestamp,
217 msg.local_timestamp,
218 )
219}
220
221pub fn parse_book_snapshot_msg_as_depth10(
227 msg: BookSnapshotMsg,
228 price_precision: u8,
229 size_precision: u8,
230 instrument_id: InstrumentId,
231) -> anyhow::Result<OrderBookDepth10> {
232 let ts_event_nanos = msg
233 .timestamp
234 .timestamp_nanos_opt()
235 .context("invalid timestamp: cannot extract event nanoseconds")?;
236 anyhow::ensure!(
237 ts_event_nanos >= 0,
238 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
239 );
240 let ts_event = UnixNanos::from(ts_event_nanos as u64);
241
242 let ts_init_nanos = msg
243 .local_timestamp
244 .timestamp_nanos_opt()
245 .context("invalid timestamp: cannot extract init nanoseconds")?;
246 anyhow::ensure!(
247 ts_init_nanos >= 0,
248 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
249 );
250 let ts_init = UnixNanos::from(ts_init_nanos as u64);
251
252 let mut bids = [NULL_ORDER; DEPTH10_LEN];
253 let mut asks = [NULL_ORDER; DEPTH10_LEN];
254 let mut bid_counts = [0u32; DEPTH10_LEN];
255 let mut ask_counts = [0u32; DEPTH10_LEN];
256
257 for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
258 bids[i] = BookOrder::new(
259 OrderSide::Buy,
260 Price::new(level.price, price_precision),
261 Quantity::new(level.amount, size_precision),
262 0,
263 );
264 bid_counts[i] = 1;
265 }
266
267 for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
268 asks[i] = BookOrder::new(
269 OrderSide::Sell,
270 Price::new(level.price, price_precision),
271 Quantity::new(level.amount, size_precision),
272 0,
273 );
274 ask_counts[i] = 1;
275 }
276
277 Ok(OrderBookDepth10::new(
278 instrument_id,
279 bids,
280 asks,
281 bid_counts,
282 ask_counts,
283 RecordFlag::F_SNAPSHOT.value(),
284 0, ts_event,
286 ts_init,
287 ))
288}
289
290#[allow(clippy::too_many_arguments)]
292pub fn parse_book_msg_as_deltas(
298 bids: Vec<BookLevel>,
299 asks: Vec<BookLevel>,
300 is_snapshot: bool,
301 price_precision: u8,
302 size_precision: u8,
303 instrument_id: InstrumentId,
304 timestamp: DateTime<Utc>,
305 local_timestamp: DateTime<Utc>,
306) -> anyhow::Result<OrderBookDeltas_API> {
307 let event_nanos = timestamp
308 .timestamp_nanos_opt()
309 .context("invalid timestamp: cannot extract event nanoseconds")?;
310 anyhow::ensure!(
311 event_nanos >= 0,
312 "invalid timestamp: event nanoseconds {event_nanos} is before UNIX epoch"
313 );
314 let ts_event = UnixNanos::from(event_nanos as u64);
315 let init_nanos = local_timestamp
316 .timestamp_nanos_opt()
317 .context("invalid timestamp: cannot extract init nanoseconds")?;
318 anyhow::ensure!(
319 init_nanos >= 0,
320 "invalid timestamp: init nanoseconds {init_nanos} is before UNIX epoch"
321 );
322 let ts_init = UnixNanos::from(init_nanos as u64);
323
324 let capacity = if is_snapshot {
325 bids.len() + asks.len() + 1
326 } else {
327 bids.len() + asks.len()
328 };
329 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(capacity);
330
331 if is_snapshot {
332 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
333 }
334
335 for level in bids {
336 match parse_book_level(
337 instrument_id,
338 price_precision,
339 size_precision,
340 OrderSide::Buy,
341 level,
342 is_snapshot,
343 ts_event,
344 ts_init,
345 ) {
346 Ok(delta) => deltas.push(delta),
347 Err(e) => log::warn!("Skipping invalid bid level for {instrument_id}: {e}"),
348 }
349 }
350
351 for level in asks {
352 match parse_book_level(
353 instrument_id,
354 price_precision,
355 size_precision,
356 OrderSide::Sell,
357 level,
358 is_snapshot,
359 ts_event,
360 ts_init,
361 ) {
362 Ok(delta) => deltas.push(delta),
363 Err(e) => log::warn!("Skipping invalid ask level for {instrument_id}: {e}"),
364 }
365 }
366
367 if let Some(last_delta) = deltas.last_mut() {
368 last_delta.flags |= RecordFlag::F_LAST.value();
369 }
370
371 Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
373 instrument_id,
374 deltas,
375 )))
376}
377
378#[allow(clippy::too_many_arguments)]
384pub fn parse_book_level(
385 instrument_id: InstrumentId,
386 price_precision: u8,
387 size_precision: u8,
388 side: OrderSide,
389 level: BookLevel,
390 is_snapshot: bool,
391 ts_event: UnixNanos,
392 ts_init: UnixNanos,
393) -> anyhow::Result<OrderBookDelta> {
394 let amount = normalize_amount(level.amount, size_precision);
395 let action = parse_book_action(is_snapshot, amount);
396 let price = Price::new(level.price, price_precision);
397 let size = Quantity::new(amount, size_precision);
398 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
400 let flags = if is_snapshot {
401 RecordFlag::F_SNAPSHOT.value()
402 } else {
403 0
404 };
405 let sequence = 0; anyhow::ensure!(
408 !(action != BookAction::Delete && size.is_zero()),
409 "Invalid zero size for {action}"
410 );
411
412 Ok(OrderBookDelta::new(
413 instrument_id,
414 action,
415 order,
416 flags,
417 sequence,
418 ts_event,
419 ts_init,
420 ))
421}
422
423pub fn parse_book_snapshot_msg_as_quote(
430 msg: BookSnapshotMsg,
431 price_precision: u8,
432 size_precision: u8,
433 instrument_id: InstrumentId,
434) -> anyhow::Result<QuoteTick> {
435 let ts_event = UnixNanos::from(msg.timestamp);
436 let ts_init = UnixNanos::from(msg.local_timestamp);
437
438 let best_bid = msg
439 .bids
440 .first()
441 .context("missing best bid level for quote message")?;
442 let bid_price = Price::new(best_bid.price, price_precision);
443 let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
444 .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
445
446 let best_ask = msg
447 .asks
448 .first()
449 .context("missing best ask level for quote message")?;
450 let ask_price = Price::new(best_ask.price, price_precision);
451 let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
452 .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
453
454 Ok(QuoteTick::new(
455 instrument_id,
456 bid_price,
457 ask_price,
458 bid_size,
459 ask_size,
460 ts_event,
461 ts_init,
462 ))
463}
464
465pub fn parse_trade_msg(
472 msg: TradeMsg,
473 price_precision: u8,
474 size_precision: u8,
475 instrument_id: InstrumentId,
476) -> anyhow::Result<TradeTick> {
477 let price = Price::new(msg.price, price_precision);
478 let size = Quantity::non_zero_checked(msg.amount, size_precision)
479 .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
480 let aggressor_side = parse_aggressor_side(&msg.side);
481 let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
482 let ts_event = UnixNanos::from(msg.timestamp);
483 let ts_init = UnixNanos::from(msg.local_timestamp);
484
485 Ok(TradeTick::new(
486 instrument_id,
487 price,
488 size,
489 aggressor_side,
490 trade_id,
491 ts_event,
492 ts_init,
493 ))
494}
495
496pub fn parse_bar_msg(
502 msg: BarMsg,
503 price_precision: u8,
504 size_precision: u8,
505 instrument_id: InstrumentId,
506) -> anyhow::Result<Bar> {
507 let spec = parse_bar_spec(&msg.name)?;
508 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
509
510 let open = Price::new(msg.open, price_precision);
511 let high = Price::new(msg.high, price_precision);
512 let low = Price::new(msg.low, price_precision);
513 let close = Price::new(msg.close, price_precision);
514 let volume = Quantity::non_zero(msg.volume, size_precision);
515 let ts_event = UnixNanos::from(msg.timestamp);
516 let ts_init = UnixNanos::from(msg.local_timestamp);
517
518 Ok(Bar::new(
519 bar_type, open, high, low, close, volume, ts_event, ts_init,
520 ))
521}
522
523pub fn parse_derivative_ticker_msg(
529 msg: DerivativeTickerMsg,
530 instrument_id: InstrumentId,
531) -> anyhow::Result<Option<FundingRateUpdate>> {
532 let funding_rate = match msg.funding_rate {
534 Some(rate) => rate,
535 None => return Ok(None), };
537
538 let ts_event_nanos = msg
539 .timestamp
540 .timestamp_nanos_opt()
541 .context("invalid timestamp: cannot extract event nanoseconds")?;
542 anyhow::ensure!(
543 ts_event_nanos >= 0,
544 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
545 );
546 let ts_event = UnixNanos::from(ts_event_nanos as u64);
547
548 let ts_init_nanos = msg
549 .local_timestamp
550 .timestamp_nanos_opt()
551 .context("invalid timestamp: cannot extract init nanoseconds")?;
552 anyhow::ensure!(
553 ts_init_nanos >= 0,
554 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
555 );
556 let ts_init = UnixNanos::from(ts_init_nanos as u64);
557
558 let rate = rust_decimal::Decimal::try_from(funding_rate)
559 .with_context(|| format!("Failed to convert funding rate {funding_rate} to Decimal"))?
560 .normalize();
561
562 let next_funding_ns = None;
564
565 Ok(Some(FundingRateUpdate::new(
566 instrument_id,
567 rate,
568 next_funding_ns,
569 ts_event,
570 ts_init,
571 )))
572}
573
574#[cfg(test)]
575mod tests {
576 use nautilus_model::enums::AggressorSide;
577 use rstest::rstest;
578
579 use super::*;
580 use crate::{enums::TardisExchange, tests::load_test_json};
581
582 #[rstest]
583 fn test_parse_book_change_message() {
584 let json_data = load_test_json("book_change.json");
585 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
586
587 let price_precision = 0;
588 let size_precision = 0;
589 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
590 let deltas =
591 parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
592 .unwrap();
593
594 assert_eq!(deltas.deltas.len(), 1);
595 assert_eq!(deltas.instrument_id, instrument_id);
596 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
597 assert_eq!(deltas.sequence, 0);
598 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
599 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
600 assert_eq!(
601 deltas.deltas[0].instrument_id,
602 InstrumentId::from("XBTUSD.BITMEX")
603 );
604 assert_eq!(deltas.deltas[0].action, BookAction::Update);
605 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
606 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
607 assert_eq!(deltas.deltas[0].order.order_id, 0);
608 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
609 assert_eq!(deltas.deltas[0].sequence, 0);
610 assert_eq!(
611 deltas.deltas[0].ts_event,
612 UnixNanos::from(1571830193469000000)
613 );
614 assert_eq!(
615 deltas.deltas[0].ts_init,
616 UnixNanos::from(1571830193469000000)
617 );
618 }
619
620 #[rstest]
621 fn test_parse_book_snapshot_message_as_deltas() {
622 let json_data = load_test_json("book_snapshot.json");
623 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
624
625 let price_precision = 1;
626 let size_precision = 0;
627 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
628 let deltas =
629 parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
630 .unwrap();
631
632 let clear_delta = deltas.deltas[0];
633 let bid_delta = deltas.deltas[1];
634 let ask_delta = deltas.deltas[3];
635
636 assert_eq!(deltas.deltas.len(), 5);
637 assert_eq!(deltas.instrument_id, instrument_id);
638 assert_eq!(
639 deltas.flags,
640 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
641 );
642 assert_eq!(deltas.sequence, 0);
643 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
644 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
645
646 assert_eq!(clear_delta.instrument_id, instrument_id);
648 assert_eq!(clear_delta.action, BookAction::Clear);
649 assert_eq!(clear_delta.flags, RecordFlag::F_SNAPSHOT.value());
650 assert_eq!(clear_delta.sequence, 0);
651 assert_eq!(clear_delta.ts_event, UnixNanos::from(1572010786950000000));
652 assert_eq!(clear_delta.ts_init, UnixNanos::from(1572010786961000000));
653
654 assert_eq!(bid_delta.instrument_id, instrument_id);
656 assert_eq!(bid_delta.action, BookAction::Add);
657 assert_eq!(bid_delta.order.side, OrderSide::Buy);
658 assert_eq!(bid_delta.order.price, Price::from("7633.5"));
659 assert_eq!(bid_delta.order.size, Quantity::from(1906067));
660 assert_eq!(bid_delta.order.order_id, 0);
661 assert_eq!(bid_delta.flags, RecordFlag::F_SNAPSHOT.value());
662 assert_eq!(bid_delta.sequence, 0);
663 assert_eq!(bid_delta.ts_event, UnixNanos::from(1572010786950000000));
664 assert_eq!(bid_delta.ts_init, UnixNanos::from(1572010786961000000));
665
666 assert_eq!(ask_delta.instrument_id, instrument_id);
668 assert_eq!(ask_delta.action, BookAction::Add);
669 assert_eq!(ask_delta.order.side, OrderSide::Sell);
670 assert_eq!(ask_delta.order.price, Price::from("7634.0"));
671 assert_eq!(ask_delta.order.size, Quantity::from(1467849));
672 assert_eq!(ask_delta.order.order_id, 0);
673 assert_eq!(ask_delta.flags, RecordFlag::F_SNAPSHOT.value());
674 assert_eq!(ask_delta.sequence, 0);
675 assert_eq!(ask_delta.ts_event, UnixNanos::from(1572010786950000000));
676 assert_eq!(ask_delta.ts_init, UnixNanos::from(1572010786961000000));
677 }
678
679 #[rstest]
680 fn test_parse_book_snapshot_message_as_depth10() {
681 let json_data = load_test_json("book_snapshot.json");
682 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
683
684 let price_precision = 1;
685 let size_precision = 0;
686 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
687
688 let depth10 =
689 parse_book_snapshot_msg_as_depth10(msg, price_precision, size_precision, instrument_id)
690 .unwrap();
691
692 assert_eq!(depth10.instrument_id, instrument_id);
693 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT.value());
694 assert_eq!(depth10.sequence, 0);
695 assert_eq!(depth10.ts_event, UnixNanos::from(1572010786950000000));
696 assert_eq!(depth10.ts_init, UnixNanos::from(1572010786961000000));
697
698 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
700 assert_eq!(depth10.bids[0].price, Price::from("7633.5"));
701 assert_eq!(depth10.bids[0].size, Quantity::from(1906067));
702 assert_eq!(depth10.bids[0].order_id, 0);
703 assert_eq!(depth10.bid_counts[0], 1);
704
705 assert_eq!(depth10.bids[1].side, OrderSide::Buy);
707 assert_eq!(depth10.bids[1].price, Price::from("7633.0"));
708 assert_eq!(depth10.bids[1].size, Quantity::from(65319));
709 assert_eq!(depth10.bid_counts[1], 1);
710
711 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
713 assert_eq!(depth10.asks[0].price, Price::from("7634.0"));
714 assert_eq!(depth10.asks[0].size, Quantity::from(1467849));
715 assert_eq!(depth10.asks[0].order_id, 0);
716 assert_eq!(depth10.ask_counts[0], 1);
717
718 assert_eq!(depth10.asks[1].side, OrderSide::Sell);
720 assert_eq!(depth10.asks[1].price, Price::from("7634.5"));
721 assert_eq!(depth10.asks[1].size, Quantity::from(67939));
722 assert_eq!(depth10.ask_counts[1], 1);
723
724 assert_eq!(depth10.bids[2], NULL_ORDER);
726 assert_eq!(depth10.bid_counts[2], 0);
727 assert_eq!(depth10.asks[2], NULL_ORDER);
728 assert_eq!(depth10.ask_counts[2], 0);
729 }
730
731 #[rstest]
732 fn test_parse_book_snapshot_message_as_quote() {
733 let json_data = load_test_json("book_snapshot.json");
734 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
735
736 let price_precision = 1;
737 let size_precision = 0;
738 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
739 let quote =
740 parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
741 .expect("Failed to parse book snapshot quote message");
742
743 assert_eq!(quote.instrument_id, instrument_id);
744 assert_eq!(quote.bid_price, Price::from("7633.5"));
745 assert_eq!(quote.bid_size, Quantity::from(1906067));
746 assert_eq!(quote.ask_price, Price::from("7634.0"));
747 assert_eq!(quote.ask_size, Quantity::from(1467849));
748 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
749 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
750 }
751
752 #[rstest]
753 fn test_parse_trade_message() {
754 let json_data = load_test_json("trade.json");
755 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
756
757 let price_precision = 0;
758 let size_precision = 0;
759 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
760 let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
761 .expect("Failed to parse trade message");
762
763 assert_eq!(trade.instrument_id, instrument_id);
764 assert_eq!(trade.price, Price::from("7996"));
765 assert_eq!(trade.size, Quantity::from(50));
766 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
767 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
768 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
769 }
770
771 #[rstest]
772 fn test_parse_bar_message() {
773 let json_data = load_test_json("bar.json");
774 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
775
776 let price_precision = 1;
777 let size_precision = 0;
778 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
779 let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id).unwrap();
780
781 assert_eq!(
782 bar.bar_type,
783 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
784 );
785 assert_eq!(bar.open, Price::from("7623.5"));
786 assert_eq!(bar.high, Price::from("7623.5"));
787 assert_eq!(bar.low, Price::from("7623"));
788 assert_eq!(bar.close, Price::from("7623.5"));
789 assert_eq!(bar.volume, Quantity::from(37034));
790 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
791 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
792 }
793
794 #[rstest]
795 fn test_parse_tardis_ws_message_book_snapshot_routes_to_depth10() {
796 let json_data = load_test_json("book_snapshot.json");
797 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
798 let ws_msg = WsMessage::BookSnapshot(msg);
799
800 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
801 let info = Arc::new(TardisInstrumentMiniInfo::new(
802 instrument_id,
803 None,
804 TardisExchange::Bitmex,
805 1,
806 0,
807 ));
808
809 let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Depth10);
810
811 assert!(result.is_some());
812 assert!(matches!(result.unwrap(), Data::Depth10(_)));
813 }
814
815 #[rstest]
816 fn test_parse_tardis_ws_message_book_snapshot_routes_to_deltas() {
817 let json_data = load_test_json("book_snapshot.json");
818 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
819 let ws_msg = WsMessage::BookSnapshot(msg);
820
821 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
822 let info = Arc::new(TardisInstrumentMiniInfo::new(
823 instrument_id,
824 None,
825 TardisExchange::Bitmex,
826 1,
827 0,
828 ));
829
830 let result = parse_tardis_ws_message(ws_msg, info, &BookSnapshotOutput::Deltas);
831
832 assert!(result.is_some());
833 assert!(matches!(result.unwrap(), Data::Deltas(_)));
834 }
835}