1use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24 },
25 enums::{
26 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28 },
29 identifiers::{InstrumentId, TradeId},
30 instruments::{
31 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32 },
33 types::{Currency, Price, Quantity, price::decode_raw_price_i64},
34};
35use ustr::Ustr;
36
37use super::{
38 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
39 types::{DatabentoImbalance, DatabentoStatistics},
40};
41
42const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
44
45const BAR_SPEC_1S: BarSpecification = BarSpecification {
46 step: STEP_ONE,
47 aggregation: BarAggregation::Second,
48 price_type: PriceType::Last,
49};
50const BAR_SPEC_1M: BarSpecification = BarSpecification {
51 step: STEP_ONE,
52 aggregation: BarAggregation::Minute,
53 price_type: PriceType::Last,
54};
55const BAR_SPEC_1H: BarSpecification = BarSpecification {
56 step: STEP_ONE,
57 aggregation: BarAggregation::Hour,
58 price_type: PriceType::Last,
59};
60const BAR_SPEC_1D: BarSpecification = BarSpecification {
61 step: STEP_ONE,
62 aggregation: BarAggregation::Day,
63 price_type: PriceType::Last,
64};
65
66const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
67const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
68const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
69const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
70
71#[must_use]
72pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
73 match c as u8 as char {
74 'Y' => Some(true),
75 'N' => Some(false),
76 _ => None,
77 }
78}
79
80#[must_use]
81pub const fn parse_order_side(c: c_char) -> OrderSide {
82 match c as u8 as char {
83 'A' => OrderSide::Sell,
84 'B' => OrderSide::Buy,
85 _ => OrderSide::NoOrderSide,
86 }
87}
88
89#[must_use]
90pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
91 match c as u8 as char {
92 'A' => AggressorSide::Seller,
93 'B' => AggressorSide::Buyer,
94 _ => AggressorSide::NoAggressor,
95 }
96}
97
98pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
104 match c as u8 as char {
105 'A' => Ok(BookAction::Add),
106 'C' => Ok(BookAction::Delete),
107 'F' => Ok(BookAction::Update),
108 'M' => Ok(BookAction::Update),
109 'R' => Ok(BookAction::Clear),
110 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
111 }
112}
113
114pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
120 match c as u8 as char {
121 'C' => Ok(OptionKind::Call),
122 'P' => Ok(OptionKind::Put),
123 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
124 }
125}
126
127fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
128 match value {
129 Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
130 tracing::warn!("Unknown currency code '{value}', defaulting to USD");
131 Currency::USD()
132 }),
133 Ok(_) => Currency::USD(),
134 Err(e) => {
135 tracing::error!("Error parsing currency: {e}");
136 Currency::USD()
137 }
138 }
139}
140
141pub fn parse_cfi_iso10926(
147 value: &str,
148) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
149 let chars: Vec<char> = value.chars().collect();
150 if chars.len() < 3 {
151 anyhow::bail!("Value string is too short");
152 }
153
154 let cfi_category = chars[0];
156 let cfi_group = chars[1];
157 let cfi_attribute1 = chars[2];
158 let mut asset_class = match cfi_category {
163 'D' => Some(AssetClass::Debt),
164 'E' => Some(AssetClass::Equity),
165 'S' => None,
166 _ => None,
167 };
168
169 let instrument_class = match cfi_group {
170 'I' => Some(InstrumentClass::Future),
171 _ => None,
172 };
173
174 if cfi_attribute1 == 'I' {
175 asset_class = Some(AssetClass::Index);
176 }
177
178 Ok((asset_class, instrument_class))
179}
180
181pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
189 let value_str = match value {
190 0 => return Ok(None),
191 1 => "Scheduled",
192 2 => "Surveillance intervention",
193 3 => "Market event",
194 4 => "Instrument activation",
195 5 => "Instrument expiration",
196 6 => "Recovery in process",
197 10 => "Regulatory",
198 11 => "Administrative",
199 12 => "Non-compliance",
200 13 => "Filings not current",
201 14 => "SEC trading suspension",
202 15 => "New issue",
203 16 => "Issue available",
204 17 => "Issues reviewed",
205 18 => "Filing requirements satisfied",
206 30 => "News pending",
207 31 => "News released",
208 32 => "News and resumption times",
209 33 => "News not forthcoming",
210 40 => "Order imbalance",
211 50 => "LULD pause",
212 60 => "Operational",
213 70 => "Additional information requested",
214 80 => "Merger effective",
215 90 => "ETF",
216 100 => "Corporate action",
217 110 => "New Security offering",
218 120 => "Market wide halt level 1",
219 121 => "Market wide halt level 2",
220 122 => "Market wide halt level 3",
221 123 => "Market wide halt carryover",
222 124 => "Market wide halt resumption",
223 130 => "Quotation not available",
224 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
225 };
226
227 Ok(Some(Ustr::from(value_str)))
228}
229
230pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
236 let value_str = match value {
237 0 => return Ok(None),
238 1 => "No cancel",
239 2 => "Change trading session",
240 3 => "Implied matching on",
241 4 => "Implied matching off",
242 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
243 };
244
245 Ok(Some(Ustr::from(value_str)))
246}
247
248#[must_use]
250pub fn decode_price(value: i64, precision: u8) -> Price {
251 Price::from_raw(decode_raw_price_i64(value), precision)
252}
253
254#[must_use]
256pub fn decode_quantity(value: u64) -> Quantity {
257 Quantity::from(value)
258}
259
260#[must_use]
262pub fn decode_price_increment(value: i64, precision: u8) -> Price {
263 match value {
264 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
265 _ => decode_price(value, precision),
266 }
267}
268
269#[must_use]
271pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
272 match value {
273 i64::MAX => None,
274 _ => Some(decode_price(value, precision)),
275 }
276}
277
278#[must_use]
280pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
281 match value {
282 i64::MAX => None,
283 _ => Some(Quantity::from(value)),
284 }
285}
286
287pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
294 match value {
295 0 | i64::MAX => Ok(Quantity::from(1)),
296 v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
297 v => {
298 let abs = v as u128;
301
302 const SCALE: u128 = 1_000_000_000;
303 let int_part = abs / SCALE;
304 let frac_part = abs % SCALE;
305
306 if frac_part == 0 {
309 Ok(Quantity::from(int_part as u64))
311 } else {
312 let mut frac_str = format!("{frac_part:09}");
313 while frac_str.ends_with('0') {
314 frac_str.pop();
315 }
316 let s = format!("{int_part}.{frac_str}");
317 Ok(Quantity::from(s))
318 }
319 }
320 }
321}
322
323#[must_use]
325pub fn decode_lot_size(value: i32) -> Quantity {
326 match value {
327 0 | i32::MAX => Quantity::from(1),
328 value => Quantity::from(value),
329 }
330}
331
332#[must_use]
333fn is_trade_msg(order_side: OrderSide, action: c_char) -> bool {
334 order_side == OrderSide::NoOrderSide || action as u8 as char == 'T'
335}
336
337pub fn decode_mbo_msg(
346 msg: &dbn::MboMsg,
347 instrument_id: InstrumentId,
348 price_precision: u8,
349 ts_init: Option<UnixNanos>,
350 include_trades: bool,
351) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
352 let side = parse_order_side(msg.side);
353 if is_trade_msg(side, msg.action) {
354 if include_trades && msg.size > 0 {
355 let ts_event = msg.ts_recv.into();
356 let ts_init = ts_init.unwrap_or(ts_event);
357
358 let trade = TradeTick::new(
359 instrument_id,
360 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
361 Quantity::from(msg.size),
362 parse_aggressor_side(msg.side),
363 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
364 ts_event,
365 ts_init,
366 );
367 return Ok((None, Some(trade)));
368 }
369
370 return Ok((None, None));
371 }
372
373 let order = BookOrder::new(
374 side,
375 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
376 Quantity::from(msg.size),
377 msg.order_id,
378 );
379 let ts_event = msg.ts_recv.into();
380 let ts_init = ts_init.unwrap_or(ts_event);
381
382 let delta = OrderBookDelta::new(
383 instrument_id,
384 parse_book_action(msg.action)?,
385 order,
386 msg.flags.raw(),
387 msg.sequence.into(),
388 ts_event,
389 ts_init,
390 );
391
392 Ok((Some(delta), None))
393}
394
395pub fn decode_trade_msg(
401 msg: &dbn::TradeMsg,
402 instrument_id: InstrumentId,
403 price_precision: u8,
404 ts_init: Option<UnixNanos>,
405) -> anyhow::Result<TradeTick> {
406 let ts_event = msg.ts_recv.into();
407 let ts_init = ts_init.unwrap_or(ts_event);
408
409 let trade = TradeTick::new(
410 instrument_id,
411 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
412 Quantity::from(msg.size),
413 parse_aggressor_side(msg.side),
414 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
415 ts_event,
416 ts_init,
417 );
418
419 Ok(trade)
420}
421
422pub fn decode_tbbo_msg(
428 msg: &dbn::TbboMsg,
429 instrument_id: InstrumentId,
430 price_precision: u8,
431 ts_init: Option<UnixNanos>,
432) -> anyhow::Result<(QuoteTick, TradeTick)> {
433 let top_level = &msg.levels[0];
434 let ts_event = msg.ts_recv.into();
435 let ts_init = ts_init.unwrap_or(ts_event);
436
437 let quote = QuoteTick::new(
438 instrument_id,
439 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
440 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
441 Quantity::from(top_level.bid_sz),
442 Quantity::from(top_level.ask_sz),
443 ts_event,
444 ts_init,
445 );
446
447 let trade = TradeTick::new(
448 instrument_id,
449 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
450 Quantity::from(msg.size),
451 parse_aggressor_side(msg.side),
452 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
453 ts_event,
454 ts_init,
455 );
456
457 Ok((quote, trade))
458}
459
460pub fn decode_mbp1_msg(
466 msg: &dbn::Mbp1Msg,
467 instrument_id: InstrumentId,
468 price_precision: u8,
469 ts_init: Option<UnixNanos>,
470 include_trades: bool,
471) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
472 let top_level = &msg.levels[0];
473 let ts_event = msg.ts_recv.into();
474 let ts_init = ts_init.unwrap_or(ts_event);
475
476 let quote = QuoteTick::new(
477 instrument_id,
478 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
479 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
480 Quantity::from(top_level.bid_sz),
481 Quantity::from(top_level.ask_sz),
482 ts_event,
483 ts_init,
484 );
485
486 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
487 Some(TradeTick::new(
488 instrument_id,
489 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
490 Quantity::from(msg.size),
491 parse_aggressor_side(msg.side),
492 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
493 ts_event,
494 ts_init,
495 ))
496 } else {
497 None
498 };
499
500 Ok((quote, maybe_trade))
501}
502
503pub fn decode_bbo_msg(
509 msg: &dbn::BboMsg,
510 instrument_id: InstrumentId,
511 price_precision: u8,
512 ts_init: Option<UnixNanos>,
513) -> anyhow::Result<QuoteTick> {
514 let top_level = &msg.levels[0];
515 let ts_event = msg.ts_recv.into();
516 let ts_init = ts_init.unwrap_or(ts_event);
517
518 let quote = QuoteTick::new(
519 instrument_id,
520 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
521 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
522 Quantity::from(top_level.bid_sz),
523 Quantity::from(top_level.ask_sz),
524 ts_event,
525 ts_init,
526 );
527
528 Ok(quote)
529}
530
531pub fn decode_mbp10_msg(
537 msg: &dbn::Mbp10Msg,
538 instrument_id: InstrumentId,
539 price_precision: u8,
540 ts_init: Option<UnixNanos>,
541) -> anyhow::Result<OrderBookDepth10> {
542 let mut bids = Vec::with_capacity(DEPTH10_LEN);
543 let mut asks = Vec::with_capacity(DEPTH10_LEN);
544 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
545 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
546
547 for level in &msg.levels {
548 let bid_order = BookOrder::new(
549 OrderSide::Buy,
550 Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
551 Quantity::from(level.bid_sz),
552 0,
553 );
554
555 let ask_order = BookOrder::new(
556 OrderSide::Sell,
557 Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
558 Quantity::from(level.ask_sz),
559 0,
560 );
561
562 bids.push(bid_order);
563 asks.push(ask_order);
564 bid_counts.push(level.bid_ct);
565 ask_counts.push(level.ask_ct);
566 }
567
568 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
569 anyhow::anyhow!(
570 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
571 v.len()
572 )
573 })?;
574
575 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
576 anyhow::anyhow!(
577 "Expected exactly {DEPTH10_LEN} ask levels, received {}",
578 v.len()
579 )
580 })?;
581
582 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
583 anyhow::anyhow!(
584 "Expected exactly {DEPTH10_LEN} bid counts, received {}",
585 v.len()
586 )
587 })?;
588
589 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
590 anyhow::anyhow!(
591 "Expected exactly {DEPTH10_LEN} ask counts, received {}",
592 v.len()
593 )
594 })?;
595
596 let ts_event = msg.ts_recv.into();
597 let ts_init = ts_init.unwrap_or(ts_event);
598
599 let depth = OrderBookDepth10::new(
600 instrument_id,
601 bids,
602 asks,
603 bid_counts,
604 ask_counts,
605 msg.flags.raw(),
606 msg.sequence.into(),
607 ts_event,
608 ts_init,
609 );
610
611 Ok(depth)
612}
613
614pub fn decode_cmbp1_msg(
622 msg: &dbn::Cmbp1Msg,
623 instrument_id: InstrumentId,
624 price_precision: u8,
625 ts_init: Option<UnixNanos>,
626 include_trades: bool,
627) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
628 let top_level = &msg.levels[0];
629 let ts_event = msg.ts_recv.into();
630 let ts_init = ts_init.unwrap_or(ts_event);
631
632 let quote = QuoteTick::new(
633 instrument_id,
634 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
635 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
636 Quantity::from(top_level.bid_sz),
637 Quantity::from(top_level.ask_sz),
638 ts_event,
639 ts_init,
640 );
641
642 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
643 Some(TradeTick::new(
645 instrument_id,
646 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
647 Quantity::from(msg.size),
648 parse_aggressor_side(msg.side),
649 TradeId::new(UUID4::new().to_string()),
650 ts_event,
651 ts_init,
652 ))
653 } else {
654 None
655 };
656
657 Ok((quote, maybe_trade))
658}
659
660pub fn decode_cbbo_msg(
668 msg: &dbn::CbboMsg,
669 instrument_id: InstrumentId,
670 price_precision: u8,
671 ts_init: Option<UnixNanos>,
672) -> anyhow::Result<QuoteTick> {
673 let top_level = &msg.levels[0];
674 let ts_event = msg.ts_recv.into();
675 let ts_init = ts_init.unwrap_or(ts_event);
676
677 let quote = QuoteTick::new(
678 instrument_id,
679 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
680 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
681 Quantity::from(top_level.bid_sz),
682 Quantity::from(top_level.ask_sz),
683 ts_event,
684 ts_init,
685 );
686
687 Ok(quote)
688}
689
690pub fn decode_tcbbo_msg(
698 msg: &dbn::CbboMsg,
699 instrument_id: InstrumentId,
700 price_precision: u8,
701 ts_init: Option<UnixNanos>,
702) -> anyhow::Result<(QuoteTick, TradeTick)> {
703 let top_level = &msg.levels[0];
704 let ts_event = msg.ts_recv.into();
705 let ts_init = ts_init.unwrap_or(ts_event);
706
707 let quote = QuoteTick::new(
708 instrument_id,
709 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
710 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
711 Quantity::from(top_level.bid_sz),
712 Quantity::from(top_level.ask_sz),
713 ts_event,
714 ts_init,
715 );
716
717 let trade = TradeTick::new(
719 instrument_id,
720 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
721 Quantity::from(msg.size),
722 parse_aggressor_side(msg.side),
723 TradeId::new(UUID4::new().to_string()),
724 ts_event,
725 ts_init,
726 );
727
728 Ok((quote, trade))
729}
730
731pub fn decode_bar_type(
735 msg: &dbn::OhlcvMsg,
736 instrument_id: InstrumentId,
737) -> anyhow::Result<BarType> {
738 let bar_type = match msg.hd.rtype {
739 32 => {
740 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
742 }
743 33 => {
744 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
746 }
747 34 => {
748 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
750 }
751 35 => {
752 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
754 }
755 36 => {
756 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
758 }
759 _ => anyhow::bail!(
760 "`rtype` is not a supported bar aggregation, was {}",
761 msg.hd.rtype
762 ),
763 };
764
765 Ok(bar_type)
766}
767
768pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
772 let adjustment = match msg.hd.rtype {
773 32 => {
774 BAR_CLOSE_ADJUSTMENT_1S
776 }
777 33 => {
778 BAR_CLOSE_ADJUSTMENT_1M
780 }
781 34 => {
782 BAR_CLOSE_ADJUSTMENT_1H
784 }
785 35 | 36 => {
786 BAR_CLOSE_ADJUSTMENT_1D
788 }
789 _ => anyhow::bail!(
790 "`rtype` is not a supported bar aggregation, was {}",
791 msg.hd.rtype
792 ),
793 };
794
795 Ok(adjustment.into())
796}
797
798pub fn decode_ohlcv_msg(
802 msg: &dbn::OhlcvMsg,
803 instrument_id: InstrumentId,
804 price_precision: u8,
805 ts_init: Option<UnixNanos>,
806 timestamp_on_close: bool,
807) -> anyhow::Result<Bar> {
808 let bar_type = decode_bar_type(msg, instrument_id)?;
809 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
810
811 let ts_event_raw = msg.hd.ts_event.into();
812 let ts_close = ts_event_raw + ts_event_adjustment;
813 let ts_init = ts_init.unwrap_or(ts_close); let ts_event = if timestamp_on_close {
816 ts_close
817 } else {
818 ts_event_raw
819 };
820
821 let bar = Bar::new(
822 bar_type,
823 Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
824 Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
825 Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
826 Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
827 Quantity::from(msg.volume),
828 ts_event,
829 ts_init,
830 );
831
832 Ok(bar)
833}
834
835pub fn decode_status_msg(
841 msg: &dbn::StatusMsg,
842 instrument_id: InstrumentId,
843 ts_init: Option<UnixNanos>,
844) -> anyhow::Result<InstrumentStatus> {
845 let ts_event = msg.hd.ts_event.into();
846 let ts_init = ts_init.unwrap_or(ts_event);
847
848 let action = MarketStatusAction::from_u16(msg.action)
849 .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
850
851 let status = InstrumentStatus::new(
852 instrument_id,
853 action,
854 ts_event,
855 ts_init,
856 parse_status_reason(msg.reason)?,
857 parse_status_trading_event(msg.trading_event)?,
858 parse_optional_bool(msg.is_trading),
859 parse_optional_bool(msg.is_quoting),
860 parse_optional_bool(msg.is_short_sell_restricted),
861 );
862
863 Ok(status)
864}
865
866pub fn decode_record(
870 record: &dbn::RecordRef,
871 instrument_id: InstrumentId,
872 price_precision: u8,
873 ts_init: Option<UnixNanos>,
874 include_trades: bool,
875 bars_timestamp_on_close: bool,
876) -> anyhow::Result<(Option<Data>, Option<Data>)> {
877 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
881 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
882 let result = decode_mbo_msg(
883 msg,
884 instrument_id,
885 price_precision,
886 Some(ts_init),
887 include_trades,
888 )?;
889 match result {
890 (Some(delta), None) => (Some(Data::Delta(delta)), None),
891 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
892 (None, None) => (None, None),
893 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
894 }
895 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
896 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
897 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
898 (Some(Data::Trade(trade)), None)
899 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
900 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
901 let result = decode_mbp1_msg(
902 msg,
903 instrument_id,
904 price_precision,
905 Some(ts_init),
906 include_trades,
907 )?;
908 match result {
909 (quote, None) => (Some(Data::Quote(quote)), None),
910 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
911 }
912 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
913 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
914 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
915 (Some(Data::Quote(quote)), None)
916 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
917 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
918 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
919 (Some(Data::Quote(quote)), None)
920 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
921 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
922 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
923 (Some(Data::from(depth)), None)
924 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
925 let bar = decode_ohlcv_msg(
928 msg,
929 instrument_id,
930 price_precision,
931 ts_init,
932 bars_timestamp_on_close,
933 )?;
934 (Some(Data::Bar(bar)), None)
935 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
936 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
937 let result = decode_cmbp1_msg(
938 msg,
939 instrument_id,
940 price_precision,
941 Some(ts_init),
942 include_trades,
943 )?;
944 match result {
945 (quote, None) => (Some(Data::Quote(quote)), None),
946 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
947 }
948 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
949 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
951 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
952 (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
953 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
954 if msg.price != i64::MAX && msg.size > 0 {
956 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
958 let (quote, trade) =
959 decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
960 (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
961 } else {
962 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
964 let quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
965 (Some(Data::Quote(quote)), None)
966 }
967 } else {
968 anyhow::bail!("DBN message type is not currently supported")
969 };
970
971 Ok(result)
972}
973
974const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
975 match ts_init {
976 Some(ts_init) => ts_init,
977 None => msg_timestamp,
978 }
979}
980
981pub fn decode_instrument_def_msg(
985 msg: &dbn::InstrumentDefMsg,
986 instrument_id: InstrumentId,
987 ts_init: Option<UnixNanos>,
988) -> anyhow::Result<InstrumentAny> {
989 match msg.instrument_class as u8 as char {
990 'K' => Ok(InstrumentAny::Equity(decode_equity(
991 msg,
992 instrument_id,
993 ts_init,
994 )?)),
995 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
996 msg,
997 instrument_id,
998 ts_init,
999 )?)),
1000 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1001 msg,
1002 instrument_id,
1003 ts_init,
1004 )?)),
1005 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1006 msg,
1007 instrument_id,
1008 ts_init,
1009 )?)),
1010 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1011 msg,
1012 instrument_id,
1013 ts_init,
1014 )?)),
1015 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1016 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1017 _ => anyhow::bail!(
1018 "Unsupported `instrument_class` '{}'",
1019 msg.instrument_class as u8 as char
1020 ),
1021 }
1022}
1023
1024pub fn decode_equity(
1030 msg: &dbn::InstrumentDefMsg,
1031 instrument_id: InstrumentId,
1032 ts_init: Option<UnixNanos>,
1033) -> anyhow::Result<Equity> {
1034 let currency = parse_currency_or_usd_default(msg.currency());
1035 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1036 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1037 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1039
1040 Ok(Equity::new(
1041 instrument_id,
1042 instrument_id.symbol,
1043 None, currency,
1045 price_increment.precision,
1046 price_increment,
1047 Some(lot_size),
1048 None, None, None, None, None, None, None, None, ts_event,
1057 ts_init,
1058 ))
1059}
1060
1061pub fn decode_futures_contract(
1067 msg: &dbn::InstrumentDefMsg,
1068 instrument_id: InstrumentId,
1069 ts_init: Option<UnixNanos>,
1070) -> anyhow::Result<FuturesContract> {
1071 let currency = parse_currency_or_usd_default(msg.currency());
1072 let exchange = Ustr::from(msg.exchange()?);
1073 let underlying = Ustr::from(msg.asset()?);
1074 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1075 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1076 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1077 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1078 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1080
1081 FuturesContract::new_checked(
1082 instrument_id,
1083 instrument_id.symbol,
1084 asset_class.unwrap_or(AssetClass::Commodity),
1085 Some(exchange),
1086 underlying,
1087 msg.activation.into(),
1088 msg.expiration.into(),
1089 currency,
1090 price_increment.precision,
1091 price_increment,
1092 multiplier,
1093 lot_size,
1094 None, None, None, None, None, None, None, None, ts_event,
1103 ts_init,
1104 )
1105}
1106
1107pub fn decode_futures_spread(
1113 msg: &dbn::InstrumentDefMsg,
1114 instrument_id: InstrumentId,
1115 ts_init: Option<UnixNanos>,
1116) -> anyhow::Result<FuturesSpread> {
1117 let exchange = Ustr::from(msg.exchange()?);
1118 let underlying = Ustr::from(msg.asset()?);
1119 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1120 let strategy_type = Ustr::from(msg.secsubtype()?);
1121 let currency = parse_currency_or_usd_default(msg.currency());
1122 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1123 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1124 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1125 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1127
1128 FuturesSpread::new_checked(
1129 instrument_id,
1130 instrument_id.symbol,
1131 asset_class.unwrap_or(AssetClass::Commodity),
1132 Some(exchange),
1133 underlying,
1134 strategy_type,
1135 msg.activation.into(),
1136 msg.expiration.into(),
1137 currency,
1138 price_increment.precision,
1139 price_increment,
1140 multiplier,
1141 lot_size,
1142 None, None, None, None, None, None, None, None, ts_event,
1151 ts_init,
1152 )
1153}
1154
1155pub fn decode_option_contract(
1161 msg: &dbn::InstrumentDefMsg,
1162 instrument_id: InstrumentId,
1163 ts_init: Option<UnixNanos>,
1164) -> anyhow::Result<OptionContract> {
1165 let currency = parse_currency_or_usd_default(msg.currency());
1166 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1167 let exchange = Ustr::from(msg.exchange()?);
1168 let underlying = Ustr::from(msg.underlying()?);
1169 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1170 Some(AssetClass::Equity)
1171 } else {
1172 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1173 asset_class
1174 };
1175 let option_kind = parse_option_kind(msg.instrument_class)?;
1176 let strike_price = Price::from_raw(
1177 decode_raw_price_i64(msg.strike_price),
1178 strike_price_currency.precision,
1179 );
1180 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1181 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1182 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1183 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1185
1186 OptionContract::new_checked(
1187 instrument_id,
1188 instrument_id.symbol,
1189 asset_class_opt.unwrap_or(AssetClass::Commodity),
1190 Some(exchange),
1191 underlying,
1192 option_kind,
1193 strike_price,
1194 currency,
1195 msg.activation.into(),
1196 msg.expiration.into(),
1197 price_increment.precision,
1198 price_increment,
1199 multiplier,
1200 lot_size,
1201 None, None, None, None, None, None, None, None, ts_event,
1210 ts_init,
1211 )
1212}
1213
1214pub fn decode_option_spread(
1220 msg: &dbn::InstrumentDefMsg,
1221 instrument_id: InstrumentId,
1222 ts_init: Option<UnixNanos>,
1223) -> anyhow::Result<OptionSpread> {
1224 let exchange = Ustr::from(msg.exchange()?);
1225 let underlying = Ustr::from(msg.underlying()?);
1226 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1227 Some(AssetClass::Equity)
1228 } else {
1229 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1230 asset_class
1231 };
1232 let strategy_type = Ustr::from(msg.secsubtype()?);
1233 let currency = parse_currency_or_usd_default(msg.currency());
1234 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1235 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1236 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1237 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1239
1240 OptionSpread::new_checked(
1241 instrument_id,
1242 instrument_id.symbol,
1243 asset_class_opt.unwrap_or(AssetClass::Commodity),
1244 Some(exchange),
1245 underlying,
1246 strategy_type,
1247 msg.activation.into(),
1248 msg.expiration.into(),
1249 currency,
1250 price_increment.precision,
1251 price_increment,
1252 multiplier,
1253 lot_size,
1254 None, None, None, None, None, None, None, None, ts_event,
1263 ts_init,
1264 )
1265}
1266
1267pub fn decode_imbalance_msg(
1273 msg: &dbn::ImbalanceMsg,
1274 instrument_id: InstrumentId,
1275 price_precision: u8,
1276 ts_init: Option<UnixNanos>,
1277) -> anyhow::Result<DatabentoImbalance> {
1278 let ts_event = msg.ts_recv.into();
1279 let ts_init = ts_init.unwrap_or(ts_event);
1280
1281 Ok(DatabentoImbalance::new(
1282 instrument_id,
1283 Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1284 Price::from_raw(
1285 decode_raw_price_i64(msg.cont_book_clr_price),
1286 price_precision,
1287 ),
1288 Price::from_raw(
1289 decode_raw_price_i64(msg.auct_interest_clr_price),
1290 price_precision,
1291 ),
1292 Quantity::new(f64::from(msg.paired_qty), 0),
1293 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1294 parse_order_side(msg.side),
1295 msg.significant_imbalance as c_char,
1296 msg.hd.ts_event.into(),
1297 ts_event,
1298 ts_init,
1299 ))
1300}
1301
1302pub fn decode_statistics_msg(
1309 msg: &dbn::StatMsg,
1310 instrument_id: InstrumentId,
1311 price_precision: u8,
1312 ts_init: Option<UnixNanos>,
1313) -> anyhow::Result<DatabentoStatistics> {
1314 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1315 .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1316 let update_action =
1317 DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1318 anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1319 })?;
1320 let ts_event = msg.ts_recv.into();
1321 let ts_init = ts_init.unwrap_or(ts_event);
1322
1323 Ok(DatabentoStatistics::new(
1324 instrument_id,
1325 stat_type,
1326 update_action,
1327 decode_optional_price(msg.price, price_precision),
1328 decode_optional_quantity(msg.quantity),
1329 msg.channel_id,
1330 msg.stat_flags,
1331 msg.sequence,
1332 msg.ts_ref.into(),
1333 msg.ts_in_delta,
1334 msg.hd.ts_event.into(),
1335 ts_event,
1336 ts_init,
1337 ))
1338}
1339
1340#[cfg(test)]
1344mod tests {
1345 use std::path::{Path, PathBuf};
1346
1347 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1348 use fallible_streaming_iterator::FallibleStreamingIterator;
1349 use nautilus_model::instruments::Instrument;
1350 use rstest::*;
1351
1352 use super::*;
1353
1354 fn test_data_path() -> PathBuf {
1355 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1356 }
1357
1358 #[rstest]
1359 #[case('Y' as c_char, Some(true))]
1360 #[case('N' as c_char, Some(false))]
1361 #[case('X' as c_char, None)]
1362 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1363 assert_eq!(parse_optional_bool(input), expected);
1364 }
1365
1366 #[rstest]
1367 #[case('A' as c_char, OrderSide::Sell)]
1368 #[case('B' as c_char, OrderSide::Buy)]
1369 #[case('X' as c_char, OrderSide::NoOrderSide)]
1370 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1371 assert_eq!(parse_order_side(input), expected);
1372 }
1373
1374 #[rstest]
1375 #[case('A' as c_char, AggressorSide::Seller)]
1376 #[case('B' as c_char, AggressorSide::Buyer)]
1377 #[case('X' as c_char, AggressorSide::NoAggressor)]
1378 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1379 assert_eq!(parse_aggressor_side(input), expected);
1380 }
1381
1382 #[rstest]
1383 #[case('A' as c_char, Ok(BookAction::Add))]
1384 #[case('C' as c_char, Ok(BookAction::Delete))]
1385 #[case('F' as c_char, Ok(BookAction::Update))]
1386 #[case('M' as c_char, Ok(BookAction::Update))]
1387 #[case('R' as c_char, Ok(BookAction::Clear))]
1388 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1389 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1390 match parse_book_action(input) {
1391 Ok(action) => assert_eq!(Ok(action), expected),
1392 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1393 }
1394 }
1395
1396 #[rstest]
1397 #[case('C' as c_char, Ok(OptionKind::Call))]
1398 #[case('P' as c_char, Ok(OptionKind::Put))]
1399 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1400 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1401 match parse_option_kind(input) {
1402 Ok(kind) => assert_eq!(Ok(kind), expected),
1403 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1404 }
1405 }
1406
1407 #[rstest]
1408 #[case(Ok("USD"), Currency::USD())]
1409 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1410 #[case(Ok(""), Currency::USD())]
1411 #[case(Err("Error"), Currency::USD())]
1412 fn test_parse_currency_or_usd_default(
1413 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1415 ) {
1416 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1417 assert_eq!(actual, expected);
1418 }
1419
1420 #[rstest]
1421 #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1422 #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1423 #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1424 #[case("XXX", Ok((None, None)))]
1425 #[case("D", Err("Value string is too short"))]
1426 fn test_parse_cfi_iso10926(
1427 #[case] input: &str,
1428 #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1429 ) {
1430 match parse_cfi_iso10926(input) {
1431 Ok(result) => assert_eq!(Ok(result), expected),
1432 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1433 }
1434 }
1435
1436 #[rstest]
1437 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1441 let actual = decode_price_increment(value, precision);
1442 assert_eq!(actual, expected);
1443 }
1444
1445 #[rstest]
1446 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] fn test_decode_optional_price(
1450 #[case] value: i64,
1451 #[case] precision: u8,
1452 #[case] expected: Option<Price>,
1453 ) {
1454 let actual = decode_optional_price(value, precision);
1455 assert_eq!(actual, expected);
1456 }
1457
1458 #[rstest]
1459 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1463 let actual = decode_optional_quantity(value);
1464 assert_eq!(actual, expected);
1465 }
1466
1467 #[rstest]
1468 #[case(0, Quantity::from(1))] #[case(i64::MAX, Quantity::from(1))] #[case(50_000_000_000, Quantity::from("50"))] #[case(12_500_000_000, Quantity::from("12.5"))] #[case(1_000_000_000, Quantity::from("1"))] #[case(1, Quantity::from("0.000000001"))] #[case(1_000_000_001, Quantity::from("1.000000001"))] #[case(999_999_999, Quantity::from("0.999999999"))] #[case(123_456_789_000, Quantity::from("123.456789"))] fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1478 assert_eq!(decode_multiplier(raw).unwrap(), expected);
1479 }
1480
1481 #[rstest]
1482 #[case(-1_500_000_000)] #[case(-1)] #[case(-999_999_999)] fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1486 let result = decode_multiplier(raw);
1487 assert!(result.is_err());
1488 assert!(
1489 result
1490 .unwrap_err()
1491 .to_string()
1492 .contains("Invalid negative multiplier")
1493 );
1494 }
1495
1496 #[rstest]
1497 fn test_decode_mbo_msg() {
1498 let path = test_data_path().join("test_data.mbo.dbn.zst");
1499 let mut dbn_stream = Decoder::from_zstd_file(path)
1500 .unwrap()
1501 .decode_stream::<dbn::MboMsg>();
1502 let msg = dbn_stream.next().unwrap().unwrap();
1503
1504 let instrument_id = InstrumentId::from("ESM4.GLBX");
1505 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1506 let delta = delta.unwrap();
1507
1508 assert_eq!(delta.instrument_id, instrument_id);
1509 assert_eq!(delta.action, BookAction::Delete);
1510 assert_eq!(delta.order.side, OrderSide::Sell);
1511 assert_eq!(delta.order.price, Price::from("3722.75"));
1512 assert_eq!(delta.order.size, Quantity::from("1"));
1513 assert_eq!(delta.order.order_id, 647_784_973_705);
1514 assert_eq!(delta.flags, 128);
1515 assert_eq!(delta.sequence, 1_170_352);
1516 assert_eq!(delta.ts_event, msg.ts_recv);
1517 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1518 assert_eq!(delta.ts_init, 0);
1519 }
1520
1521 #[rstest]
1522 fn test_decode_mbp1_msg() {
1523 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1524 let mut dbn_stream = Decoder::from_zstd_file(path)
1525 .unwrap()
1526 .decode_stream::<dbn::Mbp1Msg>();
1527 let msg = dbn_stream.next().unwrap().unwrap();
1528
1529 let instrument_id = InstrumentId::from("ESM4.GLBX");
1530 let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1531
1532 assert_eq!(quote.instrument_id, instrument_id);
1533 assert_eq!(quote.bid_price, Price::from("3720.25"));
1534 assert_eq!(quote.ask_price, Price::from("3720.50"));
1535 assert_eq!(quote.bid_size, Quantity::from("24"));
1536 assert_eq!(quote.ask_size, Quantity::from("11"));
1537 assert_eq!(quote.ts_event, msg.ts_recv);
1538 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1539 assert_eq!(quote.ts_init, 0);
1540 }
1541
1542 #[rstest]
1543 fn test_decode_bbo_1s_msg() {
1544 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1545 let mut dbn_stream = Decoder::from_zstd_file(path)
1546 .unwrap()
1547 .decode_stream::<dbn::BboMsg>();
1548 let msg = dbn_stream.next().unwrap().unwrap();
1549
1550 let instrument_id = InstrumentId::from("ESM4.GLBX");
1551 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1552
1553 assert_eq!(quote.instrument_id, instrument_id);
1554 assert_eq!(quote.bid_price, Price::from("5199.50"));
1555 assert_eq!(quote.ask_price, Price::from("5199.75"));
1556 assert_eq!(quote.bid_size, Quantity::from("26"));
1557 assert_eq!(quote.ask_size, Quantity::from("23"));
1558 assert_eq!(quote.ts_event, msg.ts_recv);
1559 assert_eq!(quote.ts_event, 1715248801000000000);
1560 assert_eq!(quote.ts_init, 0);
1561 }
1562
1563 #[rstest]
1564 fn test_decode_bbo_1m_msg() {
1565 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1566 let mut dbn_stream = Decoder::from_zstd_file(path)
1567 .unwrap()
1568 .decode_stream::<dbn::BboMsg>();
1569 let msg = dbn_stream.next().unwrap().unwrap();
1570
1571 let instrument_id = InstrumentId::from("ESM4.GLBX");
1572 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1573
1574 assert_eq!(quote.instrument_id, instrument_id);
1575 assert_eq!(quote.bid_price, Price::from("5199.50"));
1576 assert_eq!(quote.ask_price, Price::from("5199.75"));
1577 assert_eq!(quote.bid_size, Quantity::from("33"));
1578 assert_eq!(quote.ask_size, Quantity::from("17"));
1579 assert_eq!(quote.ts_event, msg.ts_recv);
1580 assert_eq!(quote.ts_event, 1715248800000000000);
1581 assert_eq!(quote.ts_init, 0);
1582 }
1583
1584 #[rstest]
1585 fn test_decode_mbp10_msg() {
1586 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1587 let mut dbn_stream = Decoder::from_zstd_file(path)
1588 .unwrap()
1589 .decode_stream::<dbn::Mbp10Msg>();
1590 let msg = dbn_stream.next().unwrap().unwrap();
1591
1592 let instrument_id = InstrumentId::from("ESM4.GLBX");
1593 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1594
1595 assert_eq!(depth10.instrument_id, instrument_id);
1596 assert_eq!(depth10.bids.len(), 10);
1597 assert_eq!(depth10.asks.len(), 10);
1598 assert_eq!(depth10.bid_counts.len(), 10);
1599 assert_eq!(depth10.ask_counts.len(), 10);
1600 assert_eq!(depth10.flags, 128);
1601 assert_eq!(depth10.sequence, 1_170_352);
1602 assert_eq!(depth10.ts_event, msg.ts_recv);
1603 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1604 assert_eq!(depth10.ts_init, 0);
1605 }
1606
1607 #[rstest]
1608 fn test_decode_trade_msg() {
1609 let path = test_data_path().join("test_data.trades.dbn.zst");
1610 let mut dbn_stream = Decoder::from_zstd_file(path)
1611 .unwrap()
1612 .decode_stream::<dbn::TradeMsg>();
1613 let msg = dbn_stream.next().unwrap().unwrap();
1614
1615 let instrument_id = InstrumentId::from("ESM4.GLBX");
1616 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1617
1618 assert_eq!(trade.instrument_id, instrument_id);
1619 assert_eq!(trade.price, Price::from("3720.25"));
1620 assert_eq!(trade.size, Quantity::from("5"));
1621 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1622 assert_eq!(trade.trade_id.to_string(), "1170380");
1623 assert_eq!(trade.ts_event, msg.ts_recv);
1624 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1625 assert_eq!(trade.ts_init, 0);
1626 }
1627
1628 #[rstest]
1629 fn test_decode_tbbo_msg() {
1630 let path = test_data_path().join("test_data.tbbo.dbn.zst");
1631 let mut dbn_stream = Decoder::from_zstd_file(path)
1632 .unwrap()
1633 .decode_stream::<dbn::Mbp1Msg>();
1634 let msg = dbn_stream.next().unwrap().unwrap();
1635
1636 let instrument_id = InstrumentId::from("ESM4.GLBX");
1637 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1638
1639 assert_eq!(quote.instrument_id, instrument_id);
1640 assert_eq!(quote.bid_price, Price::from("3720.25"));
1641 assert_eq!(quote.ask_price, Price::from("3720.50"));
1642 assert_eq!(quote.bid_size, Quantity::from("26"));
1643 assert_eq!(quote.ask_size, Quantity::from("7"));
1644 assert_eq!(quote.ts_event, msg.ts_recv);
1645 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1646 assert_eq!(quote.ts_init, 0);
1647
1648 assert_eq!(trade.instrument_id, instrument_id);
1649 assert_eq!(trade.price, Price::from("3720.25"));
1650 assert_eq!(trade.size, Quantity::from("5"));
1651 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1652 assert_eq!(trade.trade_id.to_string(), "1170380");
1653 assert_eq!(trade.ts_event, msg.ts_recv);
1654 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1655 assert_eq!(trade.ts_init, 0);
1656 }
1657
1658 #[ignore = "Requires updated test data"]
1659 #[rstest]
1660 fn test_decode_ohlcv_msg() {
1661 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1662 let mut dbn_stream = Decoder::from_zstd_file(path)
1663 .unwrap()
1664 .decode_stream::<dbn::OhlcvMsg>();
1665 let msg = dbn_stream.next().unwrap().unwrap();
1666
1667 let instrument_id = InstrumentId::from("ESM4.GLBX");
1668 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1669
1670 assert_eq!(
1671 bar.bar_type,
1672 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1673 );
1674 assert_eq!(bar.open, Price::from("3720.25"));
1675 assert_eq!(bar.high, Price::from("3720.50"));
1676 assert_eq!(bar.low, Price::from("3720.25"));
1677 assert_eq!(bar.close, Price::from("3720.50"));
1678 assert_eq!(bar.ts_event, 1_609_160_400_000_000_000);
1679 assert_eq!(bar.ts_init, 1_609_160_401_000_000_000); }
1681
1682 #[rstest]
1683 fn test_decode_definition_msg() {
1684 let path = test_data_path().join("test_data.definition.dbn.zst");
1685 let mut dbn_stream = Decoder::from_zstd_file(path)
1686 .unwrap()
1687 .decode_stream::<dbn::InstrumentDefMsg>();
1688 let msg = dbn_stream.next().unwrap().unwrap();
1689
1690 let instrument_id = InstrumentId::from("ESM4.GLBX");
1691 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1692
1693 assert!(result.is_ok());
1694 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1695 }
1696
1697 #[rstest]
1698 fn test_decode_status_msg() {
1699 let path = test_data_path().join("test_data.status.dbn.zst");
1700 let mut dbn_stream = Decoder::from_zstd_file(path)
1701 .unwrap()
1702 .decode_stream::<dbn::StatusMsg>();
1703 let msg = dbn_stream.next().unwrap().unwrap();
1704
1705 let instrument_id = InstrumentId::from("ESM4.GLBX");
1706 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1707
1708 assert_eq!(status.instrument_id, instrument_id);
1709 assert_eq!(status.action, MarketStatusAction::Trading);
1710 assert_eq!(status.ts_event, msg.hd.ts_event);
1711 assert_eq!(status.ts_init, 0);
1712 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1713 assert_eq!(status.trading_event, None);
1714 assert_eq!(status.is_trading, Some(true));
1715 assert_eq!(status.is_quoting, Some(true));
1716 assert_eq!(status.is_short_sell_restricted, None);
1717 }
1718
1719 #[rstest]
1720 fn test_decode_imbalance_msg() {
1721 let path = test_data_path().join("test_data.imbalance.dbn.zst");
1722 let mut dbn_stream = Decoder::from_zstd_file(path)
1723 .unwrap()
1724 .decode_stream::<dbn::ImbalanceMsg>();
1725 let msg = dbn_stream.next().unwrap().unwrap();
1726
1727 let instrument_id = InstrumentId::from("ESM4.GLBX");
1728 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1729
1730 assert_eq!(imbalance.instrument_id, instrument_id);
1731 assert_eq!(imbalance.ref_price, Price::from("229.43"));
1732 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1733 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1734 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1735 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1736 assert_eq!(imbalance.side, OrderSide::Buy);
1737 assert_eq!(imbalance.significant_imbalance, 126);
1738 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1739 assert_eq!(imbalance.ts_recv, msg.ts_recv);
1740 assert_eq!(imbalance.ts_init, 0);
1741 }
1742
1743 #[rstest]
1744 fn test_decode_statistics_msg() {
1745 let path = test_data_path().join("test_data.statistics.dbn.zst");
1746 let mut dbn_stream = Decoder::from_zstd_file(path)
1747 .unwrap()
1748 .decode_stream::<dbn::StatMsg>();
1749 let msg = dbn_stream.next().unwrap().unwrap();
1750
1751 let instrument_id = InstrumentId::from("ESM4.GLBX");
1752 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1753
1754 assert_eq!(statistics.instrument_id, instrument_id);
1755 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1756 assert_eq!(
1757 statistics.update_action,
1758 DatabentoStatisticUpdateAction::Added
1759 );
1760 assert_eq!(statistics.price, Some(Price::from("100.00")));
1761 assert_eq!(statistics.quantity, None);
1762 assert_eq!(statistics.channel_id, 13);
1763 assert_eq!(statistics.stat_flags, 255);
1764 assert_eq!(statistics.sequence, 2);
1765 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1766 assert_eq!(statistics.ts_in_delta, 26961);
1767 assert_eq!(statistics.ts_event, msg.hd.ts_event);
1768 assert_eq!(statistics.ts_recv, msg.ts_recv);
1769 assert_eq!(statistics.ts_init, 0);
1770 }
1771
1772 #[rstest]
1773 fn test_decode_cmbp1_msg() {
1774 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1775 let mut dbn_stream = Decoder::from_zstd_file(path)
1776 .unwrap()
1777 .decode_stream::<dbn::Cmbp1Msg>();
1778 let msg = dbn_stream.next().unwrap().unwrap();
1779
1780 let instrument_id = InstrumentId::from("ESM4.GLBX");
1781 let (quote, trade) = decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1782
1783 assert_eq!(quote.instrument_id, instrument_id);
1784 assert!(quote.bid_price.raw > 0);
1785 assert!(quote.ask_price.raw > 0);
1786 assert!(quote.bid_size.raw > 0);
1787 assert!(quote.ask_size.raw > 0);
1788 assert_eq!(quote.ts_event, msg.ts_recv);
1789 assert_eq!(quote.ts_init, 0);
1790
1791 if msg.action as u8 as char == 'T' {
1793 assert!(trade.is_some());
1794 let trade = trade.unwrap();
1795 assert_eq!(trade.instrument_id, instrument_id);
1796 } else {
1797 assert!(trade.is_none());
1798 }
1799 }
1800
1801 #[rstest]
1804 #[ignore]
1805 fn test_decode_cbbo_msg() {
1806 let path = test_data_path().join("test_data.cbbo.dbn.zst");
1807 let mut dbn_stream = Decoder::from_zstd_file(path)
1808 .unwrap()
1809 .decode_stream::<dbn::CbboMsg>();
1810 let msg = dbn_stream.next().unwrap().unwrap();
1811
1812 let instrument_id = InstrumentId::from("ESM4.GLBX");
1813 let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1814
1815 assert_eq!(quote.instrument_id, instrument_id);
1816 assert!(quote.bid_price.raw > 0);
1817 assert!(quote.ask_price.raw > 0);
1818 assert!(quote.bid_size.raw > 0);
1819 assert!(quote.ask_size.raw > 0);
1820 assert_eq!(quote.ts_event, msg.ts_recv);
1821 assert_eq!(quote.ts_init, 0);
1822 }
1823
1824 #[rstest]
1825 #[ignore]
1826 fn test_decode_cbbo_1s_msg() {
1827 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1828 let mut dbn_stream = Decoder::from_zstd_file(path)
1829 .unwrap()
1830 .decode_stream::<dbn::CbboMsg>();
1831 let msg = dbn_stream.next().unwrap().unwrap();
1832
1833 let instrument_id = InstrumentId::from("ESM4.GLBX");
1834 let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1835
1836 assert_eq!(quote.instrument_id, instrument_id);
1837 assert!(quote.bid_price.raw > 0);
1838 assert!(quote.ask_price.raw > 0);
1839 assert!(quote.bid_size.raw > 0);
1840 assert!(quote.ask_size.raw > 0);
1841 assert_eq!(quote.ts_event, msg.ts_recv);
1842 assert_eq!(quote.ts_init, 0);
1843 }
1844
1845 #[rstest]
1847 fn test_decode_mbp10_msg_with_all_levels() {
1848 let mut msg = dbn::Mbp10Msg::default();
1849 for i in 0..10 {
1850 msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
1851 msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
1852 msg.levels[i].bid_sz = 10 + i as u32;
1853 msg.levels[i].ask_sz = 10 + i as u32;
1854 msg.levels[i].bid_ct = 1 + i as u32;
1855 msg.levels[i].ask_ct = 1 + i as u32;
1856 }
1857 msg.ts_recv = 1_609_160_400_000_704_060;
1858
1859 let instrument_id = InstrumentId::from("TEST.VENUE");
1860 let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
1861
1862 assert!(result.is_ok());
1863 let depth = result.unwrap();
1864 assert_eq!(depth.bids.len(), 10);
1865 assert_eq!(depth.asks.len(), 10);
1866 assert_eq!(depth.bid_counts.len(), 10);
1867 assert_eq!(depth.ask_counts.len(), 10);
1868 }
1869
1870 #[rstest]
1871 fn test_array_conversion_error_handling() {
1872 use nautilus_model::{data::BookOrder, enums::OrderSide};
1873
1874 let mut bids = Vec::new();
1875 let mut asks = Vec::new();
1876
1877 for i in 0..5 {
1879 bids.push(BookOrder::new(
1880 OrderSide::Buy,
1881 Price::from(format!("{}.00", 100 - i)),
1882 Quantity::from(10),
1883 i as u64,
1884 ));
1885 asks.push(BookOrder::new(
1886 OrderSide::Sell,
1887 Price::from(format!("{}.00", 101 + i)),
1888 Quantity::from(10),
1889 i as u64,
1890 ));
1891 }
1892
1893 let result: Result<[BookOrder; DEPTH10_LEN], _> =
1894 bids.try_into().map_err(|v: Vec<BookOrder>| {
1895 anyhow::anyhow!(
1896 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
1897 v.len()
1898 )
1899 });
1900 assert!(result.is_err());
1901 assert!(
1902 result
1903 .unwrap_err()
1904 .to_string()
1905 .contains("Expected exactly 10 bid levels, received 5")
1906 );
1907 }
1908
1909 #[rstest]
1910 #[ignore]
1911 fn test_decode_tcbbo_msg() {
1912 let path = test_data_path().join("test_data.cbbo.dbn.zst");
1913 let mut dbn_stream = Decoder::from_zstd_file(path)
1914 .unwrap()
1915 .decode_stream::<dbn::CbboMsg>();
1916 let msg = dbn_stream.next().unwrap().unwrap();
1917
1918 let mut tcbbo_msg = msg.clone();
1920 tcbbo_msg.price = 372025;
1921 tcbbo_msg.size = 10;
1922
1923 let instrument_id = InstrumentId::from("ESM4.GLBX");
1924 let (quote, trade) =
1925 decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
1926
1927 assert_eq!(quote.instrument_id, instrument_id);
1928 assert!(quote.bid_price.raw > 0);
1929 assert!(quote.ask_price.raw > 0);
1930 assert!(quote.bid_size.raw > 0);
1931 assert!(quote.ask_size.raw > 0);
1932 assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
1933 assert_eq!(quote.ts_init, 0);
1934
1935 assert_eq!(trade.instrument_id, instrument_id);
1936 assert_eq!(trade.price, Price::from_raw(372025, 2));
1937 assert_eq!(trade.size, Quantity::from(10));
1938 assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
1939 assert_eq!(trade.ts_init, 0);
1940 }
1941}