1use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 Bar, BarType, BookOrder, Data, FundingRateUpdate, OrderBookDelta, OrderBookDeltas,
24 OrderBookDeltas_API, QuoteTick, TradeTick,
25 },
26 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
27 identifiers::{InstrumentId, TradeId},
28 types::{Price, Quantity},
29};
30use uuid::Uuid;
31
32use super::{
33 message::{
34 BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35 },
36 types::TardisInstrumentMiniInfo,
37};
38use crate::parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action};
39
40#[must_use]
41pub fn parse_tardis_ws_message(
42 msg: WsMessage,
43 info: Arc<TardisInstrumentMiniInfo>,
44) -> Option<Data> {
45 match msg {
46 WsMessage::BookChange(msg) => {
47 if msg.bids.is_empty() && msg.asks.is_empty() {
48 tracing::error!(
49 "Invalid book change for {} {} (empty bids and asks)",
50 msg.exchange,
51 msg.symbol
52 );
53 return None;
54 }
55
56 match parse_book_change_msg_as_deltas(
57 msg,
58 info.price_precision,
59 info.size_precision,
60 info.instrument_id,
61 ) {
62 Ok(deltas) => Some(Data::Deltas(deltas)),
63 Err(e) => {
64 tracing::error!("Failed to parse book change message: {e}");
65 None
66 }
67 }
68 }
69 WsMessage::BookSnapshot(msg) => match msg.bids.len() {
70 1 => {
71 match parse_book_snapshot_msg_as_quote(
72 msg,
73 info.price_precision,
74 info.size_precision,
75 info.instrument_id,
76 ) {
77 Ok(quote) => Some(Data::Quote(quote)),
78 Err(e) => {
79 tracing::error!("Failed to parse book snapshot quote message: {e}");
80 None
81 }
82 }
83 }
84 _ => {
85 match parse_book_snapshot_msg_as_deltas(
86 msg,
87 info.price_precision,
88 info.size_precision,
89 info.instrument_id,
90 ) {
91 Ok(deltas) => Some(Data::Deltas(deltas)),
92 Err(e) => {
93 tracing::error!("Failed to parse book snapshot message: {e}");
94 None
95 }
96 }
97 }
98 },
99 WsMessage::Trade(msg) => {
100 match parse_trade_msg(
101 msg,
102 info.price_precision,
103 info.size_precision,
104 info.instrument_id,
105 ) {
106 Ok(trade) => Some(Data::Trade(trade)),
107 Err(e) => {
108 tracing::error!("Failed to parse trade message: {e}");
109 None
110 }
111 }
112 }
113 WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
114 msg,
115 info.price_precision,
116 info.size_precision,
117 info.instrument_id,
118 ))),
119 WsMessage::DerivativeTicker(_) => None,
122 WsMessage::Disconnect(_) => None,
123 }
124}
125
126#[must_use]
129pub fn parse_tardis_ws_message_funding_rate(
130 msg: WsMessage,
131 info: Arc<TardisInstrumentMiniInfo>,
132) -> Option<FundingRateUpdate> {
133 match msg {
134 WsMessage::DerivativeTicker(msg) => {
135 match parse_derivative_ticker_msg(msg, info.instrument_id) {
136 Ok(funding_rate) => funding_rate,
137 Err(e) => {
138 tracing::error!(
139 "Failed to parse derivative ticker message for funding rate: {e}"
140 );
141 None
142 }
143 }
144 }
145 _ => None, }
147}
148
149pub fn parse_book_change_msg_as_deltas(
156 msg: BookChangeMsg,
157 price_precision: u8,
158 size_precision: u8,
159 instrument_id: InstrumentId,
160) -> anyhow::Result<OrderBookDeltas_API> {
161 parse_book_msg_as_deltas(
162 msg.bids,
163 msg.asks,
164 msg.is_snapshot,
165 price_precision,
166 size_precision,
167 instrument_id,
168 msg.timestamp,
169 msg.local_timestamp,
170 )
171}
172
173pub fn parse_book_snapshot_msg_as_deltas(
180 msg: BookSnapshotMsg,
181 price_precision: u8,
182 size_precision: u8,
183 instrument_id: InstrumentId,
184) -> anyhow::Result<OrderBookDeltas_API> {
185 parse_book_msg_as_deltas(
186 msg.bids,
187 msg.asks,
188 true,
189 price_precision,
190 size_precision,
191 instrument_id,
192 msg.timestamp,
193 msg.local_timestamp,
194 )
195}
196
197#[allow(clippy::too_many_arguments)]
199pub fn parse_book_msg_as_deltas(
205 bids: Vec<BookLevel>,
206 asks: Vec<BookLevel>,
207 is_snapshot: bool,
208 price_precision: u8,
209 size_precision: u8,
210 instrument_id: InstrumentId,
211 timestamp: DateTime<Utc>,
212 local_timestamp: DateTime<Utc>,
213) -> anyhow::Result<OrderBookDeltas_API> {
214 let event_nanos = timestamp
215 .timestamp_nanos_opt()
216 .context("invalid timestamp: cannot extract event nanoseconds")?;
217 let ts_event = UnixNanos::from(event_nanos as u64);
218 let init_nanos = local_timestamp
219 .timestamp_nanos_opt()
220 .context("invalid timestamp: cannot extract init nanoseconds")?;
221 let ts_init = UnixNanos::from(init_nanos as u64);
222
223 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
224
225 for level in bids {
226 deltas.push(parse_book_level(
227 instrument_id,
228 price_precision,
229 size_precision,
230 OrderSide::Buy,
231 level,
232 is_snapshot,
233 ts_event,
234 ts_init,
235 ));
236 }
237
238 for level in asks {
239 deltas.push(parse_book_level(
240 instrument_id,
241 price_precision,
242 size_precision,
243 OrderSide::Sell,
244 level,
245 is_snapshot,
246 ts_event,
247 ts_init,
248 ));
249 }
250
251 if let Some(last_delta) = deltas.last_mut() {
252 last_delta.flags += RecordFlag::F_LAST.value();
253 }
254
255 Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
257 instrument_id,
258 deltas,
259 )))
260}
261
262#[must_use]
263#[allow(clippy::too_many_arguments)]
269pub fn parse_book_level(
270 instrument_id: InstrumentId,
271 price_precision: u8,
272 size_precision: u8,
273 side: OrderSide,
274 level: BookLevel,
275 is_snapshot: bool,
276 ts_event: UnixNanos,
277 ts_init: UnixNanos,
278) -> OrderBookDelta {
279 let amount = normalize_amount(level.amount, size_precision);
280 let action = parse_book_action(is_snapshot, amount);
281 let price = Price::new(level.price, price_precision);
282 let size = Quantity::new(amount, size_precision);
283 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
285 let flags = if is_snapshot {
286 RecordFlag::F_SNAPSHOT.value()
287 } else {
288 0
289 };
290 let sequence = 0; assert!(
293 !(action != BookAction::Delete && size.is_zero()),
294 "Invalid zero size for {action}"
295 );
296
297 OrderBookDelta::new(
298 instrument_id,
299 action,
300 order,
301 flags,
302 sequence,
303 ts_event,
304 ts_init,
305 )
306}
307
308pub fn parse_book_snapshot_msg_as_quote(
315 msg: BookSnapshotMsg,
316 price_precision: u8,
317 size_precision: u8,
318 instrument_id: InstrumentId,
319) -> anyhow::Result<QuoteTick> {
320 let ts_event = UnixNanos::from(msg.timestamp);
321 let ts_init = UnixNanos::from(msg.local_timestamp);
322
323 let best_bid = msg
324 .bids
325 .first()
326 .context("missing best bid level for quote message")?;
327 let bid_price = Price::new(best_bid.price, price_precision);
328 let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
329 .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
330
331 let best_ask = msg
332 .asks
333 .first()
334 .context("missing best ask level for quote message")?;
335 let ask_price = Price::new(best_ask.price, price_precision);
336 let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
337 .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
338
339 Ok(QuoteTick::new(
340 instrument_id,
341 bid_price,
342 ask_price,
343 bid_size,
344 ask_size,
345 ts_event,
346 ts_init,
347 ))
348}
349
350pub fn parse_trade_msg(
357 msg: TradeMsg,
358 price_precision: u8,
359 size_precision: u8,
360 instrument_id: InstrumentId,
361) -> anyhow::Result<TradeTick> {
362 let price = Price::new(msg.price, price_precision);
363 let size = Quantity::non_zero_checked(msg.amount, size_precision)
364 .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
365 let aggressor_side = parse_aggressor_side(&msg.side);
366 let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
367 let ts_event = UnixNanos::from(msg.timestamp);
368 let ts_init = UnixNanos::from(msg.local_timestamp);
369
370 Ok(TradeTick::new(
371 instrument_id,
372 price,
373 size,
374 aggressor_side,
375 trade_id,
376 ts_event,
377 ts_init,
378 ))
379}
380
381#[must_use]
382pub fn parse_bar_msg(
383 msg: BarMsg,
384 price_precision: u8,
385 size_precision: u8,
386 instrument_id: InstrumentId,
387) -> Bar {
388 let spec = parse_bar_spec(&msg.name);
389 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
390
391 let open = Price::new(msg.open, price_precision);
392 let high = Price::new(msg.high, price_precision);
393 let low = Price::new(msg.low, price_precision);
394 let close = Price::new(msg.close, price_precision);
395 let volume = Quantity::non_zero(msg.volume, size_precision);
396 let ts_event = UnixNanos::from(msg.timestamp);
397 let ts_init = UnixNanos::from(msg.local_timestamp);
398
399 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
400}
401
402pub fn parse_derivative_ticker_msg(
408 msg: DerivativeTickerMsg,
409 instrument_id: InstrumentId,
410) -> anyhow::Result<Option<FundingRateUpdate>> {
411 let funding_rate = match msg.funding_rate {
413 Some(rate) => rate,
414 None => return Ok(None), };
416
417 let ts_event = msg
418 .timestamp
419 .timestamp_nanos_opt()
420 .context("invalid timestamp: cannot extract event nanoseconds")?;
421 let ts_event = UnixNanos::from(ts_event as u64);
422
423 let ts_init = msg
424 .local_timestamp
425 .timestamp_nanos_opt()
426 .context("invalid timestamp: cannot extract init nanoseconds")?;
427 let ts_init = UnixNanos::from(ts_init as u64);
428
429 let rate = rust_decimal::Decimal::try_from(funding_rate)
430 .with_context(|| format!("Failed to convert funding rate {funding_rate} to Decimal"))?
431 .normalize();
432
433 let next_funding_ns = None;
435
436 Ok(Some(FundingRateUpdate::new(
437 instrument_id,
438 rate,
439 next_funding_ns,
440 ts_event,
441 ts_init,
442 )))
443}
444
445#[cfg(test)]
450mod tests {
451 use nautilus_model::enums::{AggressorSide, BookAction};
452 use rstest::rstest;
453
454 use super::*;
455 use crate::tests::load_test_json;
456
457 #[rstest]
458 fn test_parse_book_change_message() {
459 let json_data = load_test_json("book_change.json");
460 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
461
462 let price_precision = 0;
463 let size_precision = 0;
464 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
465 let deltas =
466 parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
467 .unwrap();
468
469 assert_eq!(deltas.deltas.len(), 1);
470 assert_eq!(deltas.instrument_id, instrument_id);
471 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
472 assert_eq!(deltas.sequence, 0);
473 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
474 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
475 assert_eq!(
476 deltas.deltas[0].instrument_id,
477 InstrumentId::from("XBTUSD.BITMEX")
478 );
479 assert_eq!(deltas.deltas[0].action, BookAction::Update);
480 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
481 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
482 assert_eq!(deltas.deltas[0].order.order_id, 0);
483 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
484 assert_eq!(deltas.deltas[0].sequence, 0);
485 assert_eq!(
486 deltas.deltas[0].ts_event,
487 UnixNanos::from(1571830193469000000)
488 );
489 assert_eq!(
490 deltas.deltas[0].ts_init,
491 UnixNanos::from(1571830193469000000)
492 );
493 }
494
495 #[rstest]
496 fn test_parse_book_snapshot_message_as_deltas() {
497 let json_data = load_test_json("book_snapshot.json");
498 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
499
500 let price_precision = 1;
501 let size_precision = 0;
502 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
503 let deltas =
504 parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id)
505 .unwrap();
506 let delta_0 = deltas.deltas[0];
507 let delta_2 = deltas.deltas[2];
508
509 assert_eq!(deltas.deltas.len(), 4);
510 assert_eq!(deltas.instrument_id, instrument_id);
511 assert_eq!(
512 deltas.flags,
513 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
514 );
515 assert_eq!(deltas.sequence, 0);
516 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
517 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
518 assert_eq!(delta_0.instrument_id, instrument_id);
519 assert_eq!(delta_0.action, BookAction::Add);
520 assert_eq!(delta_0.order.side, OrderSide::Buy);
521 assert_eq!(delta_0.order.price, Price::from("7633.5"));
522 assert_eq!(delta_0.order.size, Quantity::from(1906067));
523 assert_eq!(delta_0.order.order_id, 0);
524 assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
525 assert_eq!(delta_0.sequence, 0);
526 assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
527 assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
528 assert_eq!(delta_2.instrument_id, instrument_id);
529 assert_eq!(delta_2.action, BookAction::Add);
530 assert_eq!(delta_2.order.side, OrderSide::Sell);
531 assert_eq!(delta_2.order.price, Price::from("7634.0"));
532 assert_eq!(delta_2.order.size, Quantity::from(1467849));
533 assert_eq!(delta_2.order.order_id, 0);
534 assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
535 assert_eq!(delta_2.sequence, 0);
536 assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
537 assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
538 }
539
540 #[rstest]
541 fn test_parse_book_snapshot_message_as_quote() {
542 let json_data = load_test_json("book_snapshot.json");
543 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
544
545 let price_precision = 1;
546 let size_precision = 0;
547 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
548 let quote =
549 parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id)
550 .expect("Failed to parse book snapshot quote message");
551
552 assert_eq!(quote.instrument_id, instrument_id);
553 assert_eq!(quote.bid_price, Price::from("7633.5"));
554 assert_eq!(quote.bid_size, Quantity::from(1906067));
555 assert_eq!(quote.ask_price, Price::from("7634.0"));
556 assert_eq!(quote.ask_size, Quantity::from(1467849));
557 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
558 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
559 }
560
561 #[rstest]
562 fn test_parse_trade_message() {
563 let json_data = load_test_json("trade.json");
564 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
565
566 let price_precision = 0;
567 let size_precision = 0;
568 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
569 let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id)
570 .expect("Failed to parse trade message");
571
572 assert_eq!(trade.instrument_id, instrument_id);
573 assert_eq!(trade.price, Price::from("7996"));
574 assert_eq!(trade.size, Quantity::from(50));
575 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
576 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
577 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
578 }
579
580 #[rstest]
581 fn test_parse_bar_message() {
582 let json_data = load_test_json("bar.json");
583 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
584
585 let price_precision = 1;
586 let size_precision = 0;
587 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
588 let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
589
590 assert_eq!(
591 bar.bar_type,
592 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
593 );
594 assert_eq!(bar.open, Price::from("7623.5"));
595 assert_eq!(bar.high, Price::from("7623.5"));
596 assert_eq!(bar.low, Price::from("7623"));
597 assert_eq!(bar.close, Price::from("7623.5"));
598 assert_eq!(bar.volume, Quantity::from(37034));
599 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
600 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
601 }
602}