1use std::{ffi::c_char, num::NonZeroUsize};
48
49use databento::dbn::{self};
50use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
51use nautilus_model::{
52 data::{
53 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
54 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
55 },
56 enums::{
57 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
58 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
59 },
60 identifiers::{InstrumentId, TradeId},
61 instruments::{
62 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
63 },
64 types::{
65 Currency, Price, Quantity,
66 price::{PRICE_UNDEF, decode_raw_price_i64},
67 },
68};
69use ustr::Ustr;
70
71use super::{
72 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
73 types::{DatabentoImbalance, DatabentoStatistics},
74};
75
76const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
78
79const BAR_SPEC_1S: BarSpecification = BarSpecification {
80 step: STEP_ONE,
81 aggregation: BarAggregation::Second,
82 price_type: PriceType::Last,
83};
84const BAR_SPEC_1M: BarSpecification = BarSpecification {
85 step: STEP_ONE,
86 aggregation: BarAggregation::Minute,
87 price_type: PriceType::Last,
88};
89const BAR_SPEC_1H: BarSpecification = BarSpecification {
90 step: STEP_ONE,
91 aggregation: BarAggregation::Hour,
92 price_type: PriceType::Last,
93};
94const BAR_SPEC_1D: BarSpecification = BarSpecification {
95 step: STEP_ONE,
96 aggregation: BarAggregation::Day,
97 price_type: PriceType::Last,
98};
99
100const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
101const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
102const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
103const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
104
105#[must_use]
106pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
107 match c as u8 as char {
108 'Y' => Some(true),
109 'N' => Some(false),
110 _ => None,
111 }
112}
113
114#[must_use]
115pub const fn parse_order_side(c: c_char) -> OrderSide {
116 match c as u8 as char {
117 'A' => OrderSide::Sell,
118 'B' => OrderSide::Buy,
119 _ => OrderSide::NoOrderSide,
120 }
121}
122
123#[must_use]
124pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
125 match c as u8 as char {
126 'A' => AggressorSide::Seller,
127 'B' => AggressorSide::Buyer,
128 _ => AggressorSide::NoAggressor,
129 }
130}
131
132pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
138 match c as u8 as char {
139 'A' => Ok(BookAction::Add),
140 'C' => Ok(BookAction::Delete),
141 'F' => Ok(BookAction::Update),
142 'M' => Ok(BookAction::Update),
143 'R' => Ok(BookAction::Clear),
144 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
145 }
146}
147
148pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
154 match c as u8 as char {
155 'C' => Ok(OptionKind::Call),
156 'P' => Ok(OptionKind::Put),
157 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
158 }
159}
160
161fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
162 match value {
163 Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
164 log::warn!("Unknown currency code '{value}', defaulting to USD");
165 Currency::USD()
166 }),
167 Ok(_) => Currency::USD(),
168 Err(e) => {
169 log::error!("Error parsing currency: {e}");
170 Currency::USD()
171 }
172 }
173}
174
175pub fn parse_cfi_iso10926(
181 value: &str,
182) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
183 let chars: Vec<char> = value.chars().collect();
184 if chars.len() < 3 {
185 anyhow::bail!("Value string is too short");
186 }
187
188 let cfi_category = chars[0];
190 let cfi_group = chars[1];
191 let cfi_attribute1 = chars[2];
192 let mut asset_class = match cfi_category {
197 'D' => Some(AssetClass::Debt),
198 'E' => Some(AssetClass::Equity),
199 'S' => None,
200 _ => None,
201 };
202
203 let instrument_class = match cfi_group {
204 'I' => Some(InstrumentClass::Future),
205 _ => None,
206 };
207
208 if cfi_attribute1 == 'I' {
209 asset_class = Some(AssetClass::Index);
210 }
211
212 Ok((asset_class, instrument_class))
213}
214
215pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
223 let value_str = match value {
224 0 => return Ok(None),
225 1 => "Scheduled",
226 2 => "Surveillance intervention",
227 3 => "Market event",
228 4 => "Instrument activation",
229 5 => "Instrument expiration",
230 6 => "Recovery in process",
231 10 => "Regulatory",
232 11 => "Administrative",
233 12 => "Non-compliance",
234 13 => "Filings not current",
235 14 => "SEC trading suspension",
236 15 => "New issue",
237 16 => "Issue available",
238 17 => "Issues reviewed",
239 18 => "Filing requirements satisfied",
240 30 => "News pending",
241 31 => "News released",
242 32 => "News and resumption times",
243 33 => "News not forthcoming",
244 40 => "Order imbalance",
245 50 => "LULD pause",
246 60 => "Operational",
247 70 => "Additional information requested",
248 80 => "Merger effective",
249 90 => "ETF",
250 100 => "Corporate action",
251 110 => "New Security offering",
252 120 => "Market wide halt level 1",
253 121 => "Market wide halt level 2",
254 122 => "Market wide halt level 3",
255 123 => "Market wide halt carryover",
256 124 => "Market wide halt resumption",
257 130 => "Quotation not available",
258 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
259 };
260
261 Ok(Some(Ustr::from(value_str)))
262}
263
264pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
270 let value_str = match value {
271 0 => return Ok(None),
272 1 => "No cancel",
273 2 => "Change trading session",
274 3 => "Implied matching on",
275 4 => "Implied matching off",
276 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
277 };
278
279 Ok(Some(Ustr::from(value_str)))
280}
281
282#[inline(always)]
291pub fn decode_price(value: i64, precision: u8, field_name: &str) -> anyhow::Result<Price> {
292 if value == i64::MAX {
293 anyhow::bail!("Missing required price for `{field_name}`")
294 } else {
295 Ok(Price::from_raw(decode_raw_price_i64(value), precision))
296 }
297}
298
299#[inline(always)]
304#[must_use]
305pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
306 if value == i64::MAX {
307 None
308 } else {
309 Some(Price::from_raw(decode_raw_price_i64(value), precision))
310 }
311}
312
313#[inline(always)]
318#[must_use]
319pub fn decode_price_or_undef(value: i64, precision: u8) -> Price {
320 if value == i64::MAX {
321 Price::from_raw(PRICE_UNDEF, 0)
322 } else {
323 Price::from_raw(decode_raw_price_i64(value), precision)
324 }
325}
326
327#[inline(always)]
329#[must_use]
330pub fn decode_price_increment(value: i64, precision: u8) -> Price {
331 match value {
332 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
333 _ => Price::from_raw(decode_raw_price_i64(value), precision),
334 }
335}
336
337#[inline(always)]
339#[must_use]
340pub fn decode_quantity(value: u64) -> Quantity {
341 Quantity::from(value)
342}
343
344#[inline(always)]
346#[must_use]
347pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
348 match value {
349 i64::MAX => None,
350 _ => Some(Quantity::from(value)),
351 }
352}
353
354#[inline(always)]
362pub fn decode_timestamp(value: u64, field_name: &str) -> anyhow::Result<UnixNanos> {
363 if value == dbn::UNDEF_TIMESTAMP {
364 anyhow::bail!("Missing required timestamp for `{field_name}`")
365 } else {
366 Ok(UnixNanos::from(value))
367 }
368}
369
370#[inline(always)]
374#[must_use]
375pub fn decode_optional_timestamp(value: u64) -> Option<UnixNanos> {
376 if value == dbn::UNDEF_TIMESTAMP {
377 None
378 } else {
379 Some(UnixNanos::from(value))
380 }
381}
382
383pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
390 const SCALE: u128 = 1_000_000_000;
391
392 match value {
393 0 | i64::MAX => Ok(Quantity::from(1)),
394 v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
395 v => {
396 let abs = v as u128;
399 let int_part = abs / SCALE;
400 let frac_part = abs % SCALE;
401
402 if frac_part == 0 {
405 Ok(Quantity::from(int_part as u64))
407 } else {
408 let mut frac_str = format!("{frac_part:09}");
409 while frac_str.ends_with('0') {
410 frac_str.pop();
411 }
412 let s = format!("{int_part}.{frac_str}");
413 Ok(Quantity::from(s))
414 }
415 }
416 }
417}
418
419#[inline(always)]
421#[must_use]
422pub fn decode_lot_size(value: i32) -> Quantity {
423 match value {
424 0 | i32::MAX => Quantity::from(1),
425 value => Quantity::from(value),
426 }
427}
428
429#[inline(always)]
430#[must_use]
431fn is_trade_msg(action: c_char) -> bool {
432 action as u8 as char == 'T'
433}
434
435#[inline(always)]
440#[must_use]
441fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
442 bid_px != i64::MAX && ask_px != i64::MAX
443}
444
445pub fn decode_mbo_msg(
454 msg: &dbn::MboMsg,
455 instrument_id: InstrumentId,
456 price_precision: u8,
457 ts_init: Option<UnixNanos>,
458 include_trades: bool,
459) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
460 let side = parse_order_side(msg.side);
461 if is_trade_msg(msg.action) {
462 if include_trades && msg.size > 0 {
463 let price = decode_price_or_undef(msg.price, price_precision);
464 let size = decode_quantity(msg.size as u64);
465 let aggressor_side = parse_aggressor_side(msg.side);
466 let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
467 let ts_event = msg.ts_recv.into();
468 let ts_init = ts_init.unwrap_or(ts_event);
469
470 let trade = TradeTick::new(
471 instrument_id,
472 price,
473 size,
474 aggressor_side,
475 trade_id,
476 ts_event,
477 ts_init,
478 );
479 return Ok((None, Some(trade)));
480 }
481
482 return Ok((None, None));
483 }
484
485 let action = parse_book_action(msg.action)?;
486 let price = decode_price_or_undef(msg.price, price_precision);
487 let size = decode_quantity(msg.size as u64);
488 let order = BookOrder::new(side, price, size, msg.order_id);
489
490 let ts_event = msg.ts_recv.into();
491 let ts_init = ts_init.unwrap_or(ts_event);
492
493 let delta = OrderBookDelta::new(
494 instrument_id,
495 action,
496 order,
497 msg.flags.raw(),
498 msg.sequence.into(),
499 ts_event,
500 ts_init,
501 );
502
503 Ok((Some(delta), None))
504}
505
506pub fn decode_trade_msg(
512 msg: &dbn::TradeMsg,
513 instrument_id: InstrumentId,
514 price_precision: u8,
515 ts_init: Option<UnixNanos>,
516) -> anyhow::Result<TradeTick> {
517 let ts_event = msg.ts_recv.into();
518 let ts_init = ts_init.unwrap_or(ts_event);
519
520 let trade = TradeTick::new(
521 instrument_id,
522 decode_price_or_undef(msg.price, price_precision),
523 decode_quantity(msg.size as u64),
524 parse_aggressor_side(msg.side),
525 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
526 ts_event,
527 ts_init,
528 );
529
530 Ok(trade)
531}
532
533pub fn decode_tbbo_msg(
542 msg: &dbn::TbboMsg,
543 instrument_id: InstrumentId,
544 price_precision: u8,
545 ts_init: Option<UnixNanos>,
546) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
547 let top_level = &msg.levels[0];
548 let ts_event = msg.ts_recv.into();
549 let ts_init = ts_init.unwrap_or(ts_event);
550
551 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
552 Some(QuoteTick::new(
553 instrument_id,
554 decode_price_or_undef(top_level.bid_px, price_precision),
555 decode_price_or_undef(top_level.ask_px, price_precision),
556 decode_quantity(top_level.bid_sz as u64),
557 decode_quantity(top_level.ask_sz as u64),
558 ts_event,
559 ts_init,
560 ))
561 } else {
562 None
563 };
564
565 let trade = TradeTick::new(
566 instrument_id,
567 decode_price_or_undef(msg.price, price_precision),
568 decode_quantity(msg.size as u64),
569 parse_aggressor_side(msg.side),
570 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
571 ts_event,
572 ts_init,
573 );
574
575 Ok((maybe_quote, trade))
576}
577
578pub fn decode_mbp1_msg(
586 msg: &dbn::Mbp1Msg,
587 instrument_id: InstrumentId,
588 price_precision: u8,
589 ts_init: Option<UnixNanos>,
590 include_trades: bool,
591) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
592 let top_level = &msg.levels[0];
593 let ts_event = msg.ts_recv.into();
594 let ts_init = ts_init.unwrap_or(ts_event);
595
596 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
597 Some(QuoteTick::new(
598 instrument_id,
599 decode_price_or_undef(top_level.bid_px, price_precision),
600 decode_price_or_undef(top_level.ask_px, price_precision),
601 decode_quantity(top_level.bid_sz as u64),
602 decode_quantity(top_level.ask_sz as u64),
603 ts_event,
604 ts_init,
605 ))
606 } else {
607 None
608 };
609
610 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
611 Some(TradeTick::new(
612 instrument_id,
613 decode_price_or_undef(msg.price, price_precision),
614 decode_quantity(msg.size as u64),
615 parse_aggressor_side(msg.side),
616 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
617 ts_event,
618 ts_init,
619 ))
620 } else {
621 None
622 };
623
624 Ok((maybe_quote, maybe_trade))
625}
626
627pub fn decode_bbo_msg(
635 msg: &dbn::BboMsg,
636 instrument_id: InstrumentId,
637 price_precision: u8,
638 ts_init: Option<UnixNanos>,
639) -> anyhow::Result<Option<QuoteTick>> {
640 let top_level = &msg.levels[0];
641 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
642 return Ok(None);
643 }
644
645 let ts_event = msg.ts_recv.into();
646 let ts_init = ts_init.unwrap_or(ts_event);
647
648 let quote = QuoteTick::new(
649 instrument_id,
650 decode_price_or_undef(top_level.bid_px, price_precision),
651 decode_price_or_undef(top_level.ask_px, price_precision),
652 decode_quantity(top_level.bid_sz as u64),
653 decode_quantity(top_level.ask_sz as u64),
654 ts_event,
655 ts_init,
656 );
657
658 Ok(Some(quote))
659}
660
661pub fn decode_mbp10_msg(
667 msg: &dbn::Mbp10Msg,
668 instrument_id: InstrumentId,
669 price_precision: u8,
670 ts_init: Option<UnixNanos>,
671) -> anyhow::Result<OrderBookDepth10> {
672 let mut bids = Vec::with_capacity(DEPTH10_LEN);
673 let mut asks = Vec::with_capacity(DEPTH10_LEN);
674 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
675 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
676
677 for level in &msg.levels {
678 let bid_order = BookOrder::new(
679 OrderSide::Buy,
680 decode_price_or_undef(level.bid_px, price_precision),
681 decode_quantity(level.bid_sz as u64),
682 0,
683 );
684
685 let ask_order = BookOrder::new(
686 OrderSide::Sell,
687 decode_price_or_undef(level.ask_px, price_precision),
688 decode_quantity(level.ask_sz as u64),
689 0,
690 );
691
692 bids.push(bid_order);
693 asks.push(ask_order);
694 bid_counts.push(level.bid_ct);
695 ask_counts.push(level.ask_ct);
696 }
697
698 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
699 anyhow::anyhow!(
700 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
701 v.len()
702 )
703 })?;
704
705 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
706 anyhow::anyhow!(
707 "Expected exactly {DEPTH10_LEN} ask levels, received {}",
708 v.len()
709 )
710 })?;
711
712 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
713 anyhow::anyhow!(
714 "Expected exactly {DEPTH10_LEN} bid counts, received {}",
715 v.len()
716 )
717 })?;
718
719 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
720 anyhow::anyhow!(
721 "Expected exactly {DEPTH10_LEN} ask counts, received {}",
722 v.len()
723 )
724 })?;
725
726 let ts_event = msg.ts_recv.into();
727 let ts_init = ts_init.unwrap_or(ts_event);
728
729 let depth = OrderBookDepth10::new(
730 instrument_id,
731 bids,
732 asks,
733 bid_counts,
734 ask_counts,
735 msg.flags.raw(),
736 msg.sequence.into(),
737 ts_event,
738 ts_init,
739 );
740
741 Ok(depth)
742}
743
744pub fn decode_cmbp1_msg(
753 msg: &dbn::Cmbp1Msg,
754 instrument_id: InstrumentId,
755 price_precision: u8,
756 ts_init: Option<UnixNanos>,
757 include_trades: bool,
758) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
759 let top_level = &msg.levels[0];
760 let ts_event = msg.ts_recv.into();
761 let ts_init = ts_init.unwrap_or(ts_event);
762
763 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
764 Some(QuoteTick::new(
765 instrument_id,
766 decode_price_or_undef(top_level.bid_px, price_precision),
767 decode_price_or_undef(top_level.ask_px, price_precision),
768 decode_quantity(top_level.bid_sz as u64),
769 decode_quantity(top_level.ask_sz as u64),
770 ts_event,
771 ts_init,
772 ))
773 } else {
774 None
775 };
776
777 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
778 Some(TradeTick::new(
780 instrument_id,
781 decode_price_or_undef(msg.price, price_precision),
782 decode_quantity(msg.size as u64),
783 parse_aggressor_side(msg.side),
784 TradeId::new(UUID4::new().as_str()),
785 ts_event,
786 ts_init,
787 ))
788 } else {
789 None
790 };
791
792 Ok((maybe_quote, maybe_trade))
793}
794
795pub fn decode_cbbo_msg(
803 msg: &dbn::CbboMsg,
804 instrument_id: InstrumentId,
805 price_precision: u8,
806 ts_init: Option<UnixNanos>,
807) -> anyhow::Result<Option<QuoteTick>> {
808 let top_level = &msg.levels[0];
809 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
810 return Ok(None);
811 }
812
813 let ts_event = msg.ts_recv.into();
814 let ts_init = ts_init.unwrap_or(ts_event);
815
816 let quote = QuoteTick::new(
817 instrument_id,
818 decode_price_or_undef(top_level.bid_px, price_precision),
819 decode_price_or_undef(top_level.ask_px, price_precision),
820 decode_quantity(top_level.bid_sz as u64),
821 decode_quantity(top_level.ask_sz as u64),
822 ts_event,
823 ts_init,
824 );
825
826 Ok(Some(quote))
827}
828
829pub fn decode_tcbbo_msg(
838 msg: &dbn::CbboMsg,
839 instrument_id: InstrumentId,
840 price_precision: u8,
841 ts_init: Option<UnixNanos>,
842) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
843 let top_level = &msg.levels[0];
844 let ts_event = msg.ts_recv.into();
845 let ts_init = ts_init.unwrap_or(ts_event);
846
847 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
848 Some(QuoteTick::new(
849 instrument_id,
850 decode_price_or_undef(top_level.bid_px, price_precision),
851 decode_price_or_undef(top_level.ask_px, price_precision),
852 decode_quantity(top_level.bid_sz as u64),
853 decode_quantity(top_level.ask_sz as u64),
854 ts_event,
855 ts_init,
856 ))
857 } else {
858 None
859 };
860
861 let trade = TradeTick::new(
863 instrument_id,
864 decode_price_or_undef(msg.price, price_precision),
865 decode_quantity(msg.size as u64),
866 parse_aggressor_side(msg.side),
867 TradeId::new(UUID4::new().as_str()),
868 ts_event,
869 ts_init,
870 );
871
872 Ok((maybe_quote, trade))
873}
874
875pub fn decode_bar_type(
879 msg: &dbn::OhlcvMsg,
880 instrument_id: InstrumentId,
881) -> anyhow::Result<BarType> {
882 let bar_type = match msg.hd.rtype {
883 32 => {
884 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
886 }
887 33 => {
888 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
890 }
891 34 => {
892 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
894 }
895 35 => {
896 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
898 }
899 36 => {
900 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
902 }
903 _ => anyhow::bail!(
904 "`rtype` is not a supported bar aggregation, was {}",
905 msg.hd.rtype
906 ),
907 };
908
909 Ok(bar_type)
910}
911
912pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
916 let adjustment = match msg.hd.rtype {
917 32 => {
918 BAR_CLOSE_ADJUSTMENT_1S
920 }
921 33 => {
922 BAR_CLOSE_ADJUSTMENT_1M
924 }
925 34 => {
926 BAR_CLOSE_ADJUSTMENT_1H
928 }
929 35 | 36 => {
930 BAR_CLOSE_ADJUSTMENT_1D
932 }
933 _ => anyhow::bail!(
934 "`rtype` is not a supported bar aggregation, was {}",
935 msg.hd.rtype
936 ),
937 };
938
939 Ok(adjustment.into())
940}
941
942pub fn decode_ohlcv_msg(
946 msg: &dbn::OhlcvMsg,
947 instrument_id: InstrumentId,
948 price_precision: u8,
949 ts_init: Option<UnixNanos>,
950 timestamp_on_close: bool,
951) -> anyhow::Result<Bar> {
952 let bar_type = decode_bar_type(msg, instrument_id)?;
953 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
954
955 let ts_event_raw = msg.hd.ts_event.into();
956 let ts_close = ts_event_raw + ts_event_adjustment;
957 let ts_init = ts_init.unwrap_or(ts_close); let ts_event = if timestamp_on_close {
960 ts_close
961 } else {
962 ts_event_raw
963 };
964
965 let bar = Bar::new(
966 bar_type,
967 decode_price_or_undef(msg.open, price_precision),
968 decode_price_or_undef(msg.high, price_precision),
969 decode_price_or_undef(msg.low, price_precision),
970 decode_price_or_undef(msg.close, price_precision),
971 decode_quantity(msg.volume),
972 ts_event,
973 ts_init,
974 );
975
976 Ok(bar)
977}
978
979pub fn decode_status_msg(
985 msg: &dbn::StatusMsg,
986 instrument_id: InstrumentId,
987 ts_init: Option<UnixNanos>,
988) -> anyhow::Result<InstrumentStatus> {
989 let ts_event = msg.hd.ts_event.into();
990 let ts_init = ts_init.unwrap_or(ts_event);
991
992 let action = MarketStatusAction::from_u16(msg.action)
993 .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
994
995 let status = InstrumentStatus::new(
996 instrument_id,
997 action,
998 ts_event,
999 ts_init,
1000 parse_status_reason(msg.reason)?,
1001 parse_status_trading_event(msg.trading_event)?,
1002 parse_optional_bool(msg.is_trading),
1003 parse_optional_bool(msg.is_quoting),
1004 parse_optional_bool(msg.is_short_sell_restricted),
1005 );
1006
1007 Ok(status)
1008}
1009
1010pub fn decode_record(
1014 record: &dbn::RecordRef,
1015 instrument_id: InstrumentId,
1016 price_precision: u8,
1017 ts_init: Option<UnixNanos>,
1018 include_trades: bool,
1019 bars_timestamp_on_close: bool,
1020) -> anyhow::Result<(Option<Data>, Option<Data>)> {
1021 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
1025 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1026 let result = decode_mbo_msg(
1027 msg,
1028 instrument_id,
1029 price_precision,
1030 Some(ts_init),
1031 include_trades,
1032 )?;
1033 match result {
1034 (Some(delta), None) => (Some(Data::Delta(delta)), None),
1035 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
1036 (None, None) => (None, None),
1037 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
1038 }
1039 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
1040 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1041 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1042 (Some(Data::Trade(trade)), None)
1043 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
1044 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1045 let (maybe_quote, maybe_trade) = decode_mbp1_msg(
1046 msg,
1047 instrument_id,
1048 price_precision,
1049 Some(ts_init),
1050 include_trades,
1051 )?;
1052 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1053 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
1054 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1055 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1056 (maybe_quote.map(Data::Quote), None)
1057 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
1058 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1059 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1060 (maybe_quote.map(Data::Quote), None)
1061 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
1062 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1063 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1064 (Some(Data::from(depth)), None)
1065 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1066 let bar = decode_ohlcv_msg(
1069 msg,
1070 instrument_id,
1071 price_precision,
1072 ts_init,
1073 bars_timestamp_on_close,
1074 )?;
1075 (Some(Data::Bar(bar)), None)
1076 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1077 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1078 let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1079 msg,
1080 instrument_id,
1081 price_precision,
1082 Some(ts_init),
1083 include_trades,
1084 )?;
1085 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1086 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1087 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1089 let (maybe_quote, trade) =
1090 decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1091 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1092 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1093 if msg.price != i64::MAX && msg.size > 0 {
1095 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1097 let (maybe_quote, trade) =
1098 decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1099 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1100 } else {
1101 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1103 let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1104 (maybe_quote.map(Data::Quote), None)
1105 }
1106 } else {
1107 anyhow::bail!("DBN message type is not currently supported")
1108 };
1109
1110 Ok(result)
1111}
1112
1113const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1114 match ts_init {
1115 Some(ts_init) => ts_init,
1116 None => msg_timestamp,
1117 }
1118}
1119
1120pub fn decode_instrument_def_msg(
1124 msg: &dbn::InstrumentDefMsg,
1125 instrument_id: InstrumentId,
1126 ts_init: Option<UnixNanos>,
1127) -> anyhow::Result<InstrumentAny> {
1128 match msg.instrument_class as u8 as char {
1129 'K' => Ok(InstrumentAny::Equity(decode_equity(
1130 msg,
1131 instrument_id,
1132 ts_init,
1133 )?)),
1134 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1135 msg,
1136 instrument_id,
1137 ts_init,
1138 )?)),
1139 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1140 msg,
1141 instrument_id,
1142 ts_init,
1143 )?)),
1144 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1145 msg,
1146 instrument_id,
1147 ts_init,
1148 )?)),
1149 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1150 msg,
1151 instrument_id,
1152 ts_init,
1153 )?)),
1154 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1155 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1156 _ => anyhow::bail!(
1157 "Unsupported `instrument_class` '{}'",
1158 msg.instrument_class as u8 as char
1159 ),
1160 }
1161}
1162
1163pub fn decode_equity(
1169 msg: &dbn::InstrumentDefMsg,
1170 instrument_id: InstrumentId,
1171 ts_init: Option<UnixNanos>,
1172) -> anyhow::Result<Equity> {
1173 let currency = parse_currency_or_usd_default(msg.currency());
1174 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1175 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1176 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1178
1179 Ok(Equity::new(
1180 instrument_id,
1181 instrument_id.symbol,
1182 None, currency,
1184 price_increment.precision,
1185 price_increment,
1186 Some(lot_size),
1187 None, None, None, None, None, None, None, None, ts_event,
1196 ts_init,
1197 ))
1198}
1199
1200pub fn decode_futures_contract(
1206 msg: &dbn::InstrumentDefMsg,
1207 instrument_id: InstrumentId,
1208 ts_init: Option<UnixNanos>,
1209) -> anyhow::Result<FuturesContract> {
1210 let currency = parse_currency_or_usd_default(msg.currency());
1211 let exchange = Ustr::from(msg.exchange()?);
1212 let underlying = Ustr::from(msg.asset()?);
1213 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1214 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1215 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1216 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1217 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1219
1220 FuturesContract::new_checked(
1221 instrument_id,
1222 instrument_id.symbol,
1223 asset_class.unwrap_or(AssetClass::Commodity),
1224 Some(exchange),
1225 underlying,
1226 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1227 decode_timestamp(msg.expiration, "expiration")?,
1228 currency,
1229 price_increment.precision,
1230 price_increment,
1231 multiplier,
1232 lot_size,
1233 None, None, None, None, None, None, None, None, ts_event,
1242 ts_init,
1243 )
1244}
1245
1246pub fn decode_futures_spread(
1252 msg: &dbn::InstrumentDefMsg,
1253 instrument_id: InstrumentId,
1254 ts_init: Option<UnixNanos>,
1255) -> anyhow::Result<FuturesSpread> {
1256 let exchange = Ustr::from(msg.exchange()?);
1257 let underlying = Ustr::from(msg.asset()?);
1258 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1259 let strategy_type = Ustr::from(msg.secsubtype()?);
1260 let currency = parse_currency_or_usd_default(msg.currency());
1261 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1262 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1263 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1264 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1266
1267 FuturesSpread::new_checked(
1268 instrument_id,
1269 instrument_id.symbol,
1270 asset_class.unwrap_or(AssetClass::Commodity),
1271 Some(exchange),
1272 underlying,
1273 strategy_type,
1274 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1275 decode_timestamp(msg.expiration, "expiration")?,
1276 currency,
1277 price_increment.precision,
1278 price_increment,
1279 multiplier,
1280 lot_size,
1281 None, None, None, None, None, None, None, None, ts_event,
1290 ts_init,
1291 )
1292}
1293
1294pub fn decode_option_contract(
1300 msg: &dbn::InstrumentDefMsg,
1301 instrument_id: InstrumentId,
1302 ts_init: Option<UnixNanos>,
1303) -> anyhow::Result<OptionContract> {
1304 let currency = parse_currency_or_usd_default(msg.currency());
1305 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1306 let exchange = Ustr::from(msg.exchange()?);
1307 let underlying = Ustr::from(msg.underlying()?);
1308 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1309 Some(AssetClass::Equity)
1310 } else {
1311 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1312 asset_class
1313 };
1314 let option_kind = parse_option_kind(msg.instrument_class)?;
1315 let strike_price = decode_price(
1316 msg.strike_price,
1317 strike_price_currency.precision,
1318 "strike_price",
1319 )?;
1320 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1321 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1322 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1323 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1325
1326 OptionContract::new_checked(
1327 instrument_id,
1328 instrument_id.symbol,
1329 asset_class_opt.unwrap_or(AssetClass::Commodity),
1330 Some(exchange),
1331 underlying,
1332 option_kind,
1333 strike_price,
1334 currency,
1335 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1336 decode_timestamp(msg.expiration, "expiration")?,
1337 price_increment.precision,
1338 price_increment,
1339 multiplier,
1340 lot_size,
1341 None, None, None, None, None, None, None, None, ts_event,
1350 ts_init,
1351 )
1352}
1353
1354pub fn decode_option_spread(
1360 msg: &dbn::InstrumentDefMsg,
1361 instrument_id: InstrumentId,
1362 ts_init: Option<UnixNanos>,
1363) -> anyhow::Result<OptionSpread> {
1364 let exchange = Ustr::from(msg.exchange()?);
1365 let underlying = Ustr::from(msg.underlying()?);
1366 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1367 Some(AssetClass::Equity)
1368 } else {
1369 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1370 asset_class
1371 };
1372 let strategy_type = Ustr::from(msg.secsubtype()?);
1373 let currency = parse_currency_or_usd_default(msg.currency());
1374 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1375 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1376 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1377 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1379
1380 OptionSpread::new_checked(
1381 instrument_id,
1382 instrument_id.symbol,
1383 asset_class_opt.unwrap_or(AssetClass::Commodity),
1384 Some(exchange),
1385 underlying,
1386 strategy_type,
1387 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1388 decode_timestamp(msg.expiration, "expiration")?,
1389 currency,
1390 price_increment.precision,
1391 price_increment,
1392 multiplier,
1393 lot_size,
1394 None, None, None, None, None, None, None, None, ts_event,
1403 ts_init,
1404 )
1405}
1406
1407pub fn decode_imbalance_msg(
1413 msg: &dbn::ImbalanceMsg,
1414 instrument_id: InstrumentId,
1415 price_precision: u8,
1416 ts_init: Option<UnixNanos>,
1417) -> anyhow::Result<DatabentoImbalance> {
1418 let ts_event = msg.ts_recv.into();
1419 let ts_init = ts_init.unwrap_or(ts_event);
1420
1421 Ok(DatabentoImbalance::new(
1422 instrument_id,
1423 decode_price_or_undef(msg.ref_price, price_precision),
1424 decode_price_or_undef(msg.cont_book_clr_price, price_precision),
1425 decode_price_or_undef(msg.auct_interest_clr_price, price_precision),
1426 Quantity::new(f64::from(msg.paired_qty), 0),
1427 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1428 parse_order_side(msg.side),
1429 msg.significant_imbalance as c_char,
1430 msg.hd.ts_event.into(),
1431 ts_event,
1432 ts_init,
1433 ))
1434}
1435
1436pub fn decode_statistics_msg(
1443 msg: &dbn::StatMsg,
1444 instrument_id: InstrumentId,
1445 price_precision: u8,
1446 ts_init: Option<UnixNanos>,
1447) -> anyhow::Result<DatabentoStatistics> {
1448 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1449 .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1450 let update_action =
1451 DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1452 anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1453 })?;
1454 let ts_event = msg.ts_recv.into();
1455 let ts_init = ts_init.unwrap_or(ts_event);
1456
1457 Ok(DatabentoStatistics::new(
1458 instrument_id,
1459 stat_type,
1460 update_action,
1461 decode_optional_price(msg.price, price_precision),
1462 decode_optional_quantity(msg.quantity),
1463 msg.channel_id,
1464 msg.stat_flags,
1465 msg.sequence,
1466 msg.ts_ref.into(),
1467 msg.ts_in_delta,
1468 msg.hd.ts_event.into(),
1469 ts_event,
1470 ts_init,
1471 ))
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476 use std::path::{Path, PathBuf};
1477
1478 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1479 use fallible_streaming_iterator::FallibleStreamingIterator;
1480 use nautilus_model::instruments::Instrument;
1481 use rstest::*;
1482
1483 use super::*;
1484
1485 fn test_data_path() -> PathBuf {
1486 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1487 }
1488
1489 #[rstest]
1490 #[case('Y' as c_char, Some(true))]
1491 #[case('N' as c_char, Some(false))]
1492 #[case('X' as c_char, None)]
1493 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1494 assert_eq!(parse_optional_bool(input), expected);
1495 }
1496
1497 #[rstest]
1498 #[case('A' as c_char, OrderSide::Sell)]
1499 #[case('B' as c_char, OrderSide::Buy)]
1500 #[case('X' as c_char, OrderSide::NoOrderSide)]
1501 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1502 assert_eq!(parse_order_side(input), expected);
1503 }
1504
1505 #[rstest]
1506 #[case('A' as c_char, AggressorSide::Seller)]
1507 #[case('B' as c_char, AggressorSide::Buyer)]
1508 #[case('X' as c_char, AggressorSide::NoAggressor)]
1509 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1510 assert_eq!(parse_aggressor_side(input), expected);
1511 }
1512
1513 #[rstest]
1514 #[case('T' as c_char, true)]
1515 #[case('A' as c_char, false)]
1516 #[case('C' as c_char, false)]
1517 #[case('F' as c_char, false)]
1518 #[case('M' as c_char, false)]
1519 #[case('R' as c_char, false)]
1520 fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1521 assert_eq!(is_trade_msg(action), expected);
1522 }
1523
1524 #[rstest]
1525 #[case('A' as c_char, Ok(BookAction::Add))]
1526 #[case('C' as c_char, Ok(BookAction::Delete))]
1527 #[case('F' as c_char, Ok(BookAction::Update))]
1528 #[case('M' as c_char, Ok(BookAction::Update))]
1529 #[case('R' as c_char, Ok(BookAction::Clear))]
1530 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1531 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1532 match parse_book_action(input) {
1533 Ok(action) => assert_eq!(Ok(action), expected),
1534 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1535 }
1536 }
1537
1538 #[rstest]
1539 #[case('C' as c_char, Ok(OptionKind::Call))]
1540 #[case('P' as c_char, Ok(OptionKind::Put))]
1541 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1542 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1543 match parse_option_kind(input) {
1544 Ok(kind) => assert_eq!(Ok(kind), expected),
1545 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1546 }
1547 }
1548
1549 #[rstest]
1550 #[case(Ok("USD"), Currency::USD())]
1551 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1552 #[case(Ok(""), Currency::USD())]
1553 #[case(Err("Error"), Currency::USD())]
1554 fn test_parse_currency_or_usd_default(
1555 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1557 ) {
1558 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1559 assert_eq!(actual, expected);
1560 }
1561
1562 #[rstest]
1563 #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1564 #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1565 #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1566 #[case("XXX", Ok((None, None)))]
1567 #[case("D", Err("Value string is too short"))]
1568 fn test_parse_cfi_iso10926(
1569 #[case] input: &str,
1570 #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1571 ) {
1572 match parse_cfi_iso10926(input) {
1573 Ok(result) => assert_eq!(Ok(result), expected),
1574 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1575 }
1576 }
1577
1578 #[rstest]
1579 #[case(0, 2, Price::from_raw(0, 2))]
1580 #[case(
1581 1_000_000_000,
1582 2,
1583 Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1584 )]
1585 fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1586 let actual = decode_price(value, precision, "test_field").unwrap();
1587 assert_eq!(actual, expected);
1588 }
1589
1590 #[rstest]
1591 fn test_decode_price_undefined_errors() {
1592 let result = decode_price(i64::MAX, 2, "strike_price");
1593 assert!(result.is_err());
1594 assert!(result.unwrap_err().to_string().contains("strike_price"));
1595 }
1596
1597 #[rstest]
1598 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(
1601 10_000_000_000,
1602 2,
1603 Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1604 )]
1605 fn test_decode_price_increment(
1606 #[case] value: i64,
1607 #[case] precision: u8,
1608 #[case] expected: Price,
1609 ) {
1610 let actual = decode_price_increment(value, precision);
1611 assert_eq!(actual, expected);
1612 }
1613
1614 #[rstest]
1615 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(
1618 10_000_000_000,
1619 2,
1620 Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1621 )]
1622 fn test_decode_optional_price(
1623 #[case] value: i64,
1624 #[case] precision: u8,
1625 #[case] expected: Option<Price>,
1626 ) {
1627 let actual = decode_optional_price(value, precision);
1628 assert_eq!(actual, expected);
1629 }
1630
1631 #[rstest]
1632 #[case(0, 2, Price::from_raw(0, 2))]
1633 #[case(
1634 1_000_000_000,
1635 2,
1636 Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1637 )]
1638 #[case(i64::MAX, 2, Price::from_raw(PRICE_UNDEF, 0))] fn test_decode_price_or_undef(
1640 #[case] value: i64,
1641 #[case] precision: u8,
1642 #[case] expected: Price,
1643 ) {
1644 let actual = decode_price_or_undef(value, precision);
1645 assert_eq!(actual, expected);
1646 }
1647
1648 #[rstest]
1649 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1653 let actual = decode_optional_quantity(value);
1654 assert_eq!(actual, expected);
1655 }
1656
1657 #[rstest]
1658 #[case(0, UnixNanos::from(0))]
1659 #[case(1_000_000_000, UnixNanos::from(1_000_000_000))]
1660 fn test_decode_timestamp(#[case] value: u64, #[case] expected: UnixNanos) {
1661 let actual = decode_timestamp(value, "test_field").unwrap();
1662 assert_eq!(actual, expected);
1663 }
1664
1665 #[rstest]
1666 fn test_decode_timestamp_undefined_errors() {
1667 let result = decode_timestamp(dbn::UNDEF_TIMESTAMP, "expiration");
1668 assert!(result.is_err());
1669 assert!(result.unwrap_err().to_string().contains("expiration"));
1670 }
1671
1672 #[rstest]
1673 #[case(0, Some(UnixNanos::from(0)))]
1674 #[case(1_000_000_000, Some(UnixNanos::from(1_000_000_000)))]
1675 #[case(dbn::UNDEF_TIMESTAMP, None)]
1676 fn test_decode_optional_timestamp(#[case] value: u64, #[case] expected: Option<UnixNanos>) {
1677 let actual = decode_optional_timestamp(value);
1678 assert_eq!(actual, expected);
1679 }
1680
1681 #[rstest]
1682 #[case(0, Quantity::from(1))] #[case(i64::MAX, Quantity::from(1))] #[case(50_000_000_000, Quantity::from("50"))] #[case(12_500_000_000, Quantity::from("12.5"))] #[case(1_000_000_000, Quantity::from("1"))] #[case(1, Quantity::from("0.000000001"))] #[case(1_000_000_001, Quantity::from("1.000000001"))] #[case(999_999_999, Quantity::from("0.999999999"))] #[case(123_456_789_000, Quantity::from("123.456789"))] fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1692 assert_eq!(decode_multiplier(raw).unwrap(), expected);
1693 }
1694
1695 #[rstest]
1696 #[case(-1_500_000_000)] #[case(-1)] #[case(-999_999_999)] fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1700 let result = decode_multiplier(raw);
1701 assert!(result.is_err());
1702 assert!(
1703 result
1704 .unwrap_err()
1705 .to_string()
1706 .contains("Invalid negative multiplier")
1707 );
1708 }
1709
1710 #[rstest]
1711 #[case(100, Quantity::from(100))]
1712 #[case(1000, Quantity::from(1000))]
1713 #[case(5, Quantity::from(5))]
1714 fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1715 assert_eq!(decode_quantity(value), expected);
1716 }
1717
1718 #[rstest]
1719 #[case(0, Quantity::from(1))] #[case(i32::MAX, Quantity::from(1))] #[case(100, Quantity::from(100))]
1722 #[case(1, Quantity::from(1))]
1723 #[case(1000, Quantity::from(1000))]
1724 fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1725 assert_eq!(decode_lot_size(value), expected);
1726 }
1727
1728 #[rstest]
1729 #[case(0, None)] #[case(1, Some(Ustr::from("Scheduled")))]
1731 #[case(2, Some(Ustr::from("Surveillance intervention")))]
1732 #[case(3, Some(Ustr::from("Market event")))]
1733 #[case(10, Some(Ustr::from("Regulatory")))]
1734 #[case(30, Some(Ustr::from("News pending")))]
1735 #[case(40, Some(Ustr::from("Order imbalance")))]
1736 #[case(50, Some(Ustr::from("LULD pause")))]
1737 #[case(60, Some(Ustr::from("Operational")))]
1738 #[case(100, Some(Ustr::from("Corporate action")))]
1739 #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1740 fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1741 assert_eq!(parse_status_reason(value).unwrap(), expected);
1742 }
1743
1744 #[rstest]
1745 #[case(999)] fn test_parse_status_reason_invalid(#[case] value: u16) {
1747 assert!(parse_status_reason(value).is_err());
1748 }
1749
1750 #[rstest]
1751 #[case(0, None)] #[case(1, Some(Ustr::from("No cancel")))]
1753 #[case(2, Some(Ustr::from("Change trading session")))]
1754 #[case(3, Some(Ustr::from("Implied matching on")))]
1755 #[case(4, Some(Ustr::from("Implied matching off")))]
1756 fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1757 assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1758 }
1759
1760 #[rstest]
1761 #[case(5)] #[case(100)] fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1764 assert!(parse_status_trading_event(value).is_err());
1765 }
1766
1767 #[rstest]
1768 fn test_decode_mbo_msg() {
1769 let path = test_data_path().join("test_data.mbo.dbn.zst");
1770 let mut dbn_stream = Decoder::from_zstd_file(path)
1771 .unwrap()
1772 .decode_stream::<dbn::MboMsg>();
1773 let msg = dbn_stream.next().unwrap().unwrap();
1774
1775 let instrument_id = InstrumentId::from("ESM4.GLBX");
1776 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1777 let delta = delta.unwrap();
1778
1779 assert_eq!(delta.instrument_id, instrument_id);
1780 assert_eq!(delta.action, BookAction::Delete);
1781 assert_eq!(delta.order.side, OrderSide::Sell);
1782 assert_eq!(delta.order.price, Price::from("3722.75"));
1783 assert_eq!(delta.order.size, Quantity::from("1"));
1784 assert_eq!(delta.order.order_id, 647_784_973_705);
1785 assert_eq!(delta.flags, 128);
1786 assert_eq!(delta.sequence, 1_170_352);
1787 assert_eq!(delta.ts_event, msg.ts_recv);
1788 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1789 assert_eq!(delta.ts_init, 0);
1790 }
1791
1792 #[rstest]
1793 fn test_decode_mbo_msg_clear_action() {
1794 let ts_recv = 1_609_160_400_000_000_000;
1796 let msg = dbn::MboMsg {
1797 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1798 order_id: 0,
1799 price: i64::MAX,
1800 size: 0,
1801 flags: dbn::FlagSet::empty(),
1802 channel_id: 0,
1803 action: 'R' as c_char,
1804 side: 'N' as c_char, ts_recv,
1806 ts_in_delta: 0,
1807 sequence: 1_000_000,
1808 };
1809
1810 let instrument_id = InstrumentId::from("ESM4.GLBX");
1811 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1812
1813 assert!(trade.is_none());
1815 let delta = delta.expect("Clear action should produce OrderBookDelta");
1816
1817 assert_eq!(delta.instrument_id, instrument_id);
1818 assert_eq!(delta.action, BookAction::Clear);
1819 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1820 assert_eq!(delta.order.size, Quantity::from("0"));
1821 assert_eq!(delta.order.order_id, 0);
1822 assert_eq!(delta.sequence, 1_000_000);
1823 assert_eq!(delta.ts_event, ts_recv);
1824 assert_eq!(delta.ts_init, 0);
1825 assert!(delta.order.price.is_undefined());
1826 assert_eq!(delta.order.price.precision, 0);
1827 }
1828
1829 #[rstest]
1830 fn test_decode_mbo_msg_price_undef_with_precision() {
1831 let ts_recv = 1_609_160_400_000_000_000;
1833 let msg = dbn::MboMsg {
1834 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1835 order_id: 0,
1836 price: i64::MAX, size: 0,
1838 flags: dbn::FlagSet::empty(),
1839 channel_id: 0,
1840 action: 'R' as c_char, side: 'N' as c_char, ts_recv,
1843 ts_in_delta: 0,
1844 sequence: 0,
1845 };
1846
1847 let instrument_id = InstrumentId::from("ESM4.GLBX");
1848 let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1849 let delta = delta.unwrap();
1850
1851 assert!(delta.order.price.is_undefined());
1852 assert_eq!(delta.order.price.precision, 0);
1853 assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1854 }
1855
1856 #[rstest]
1857 fn test_decode_mbo_msg_no_order_side_update() {
1858 let ts_recv = 1_609_160_400_000_000_000;
1861 let msg = dbn::MboMsg {
1862 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1863 order_id: 123_456_789,
1864 price: 4_800_250_000_000, size: 1,
1866 flags: dbn::FlagSet::empty(),
1867 channel_id: 1,
1868 action: 'M' as c_char, side: 'N' as c_char, ts_recv,
1871 ts_in_delta: 0,
1872 sequence: 1_000_000,
1873 };
1874
1875 let instrument_id = InstrumentId::from("ESM4.GLBX");
1876 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1877
1878 assert!(delta.is_some());
1880 assert!(trade.is_none());
1881 let delta = delta.unwrap();
1882 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1883 assert_eq!(delta.order.order_id, 123_456_789);
1884 assert_eq!(delta.action, BookAction::Update);
1885 }
1886
1887 #[rstest]
1888 fn test_decode_mbp1_msg() {
1889 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1890 let mut dbn_stream = Decoder::from_zstd_file(path)
1891 .unwrap()
1892 .decode_stream::<dbn::Mbp1Msg>();
1893 let msg = dbn_stream.next().unwrap().unwrap();
1894
1895 let instrument_id = InstrumentId::from("ESM4.GLBX");
1896 let (maybe_quote, _) =
1897 decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1898 let quote = maybe_quote.expect("Expected valid quote");
1899
1900 assert_eq!(quote.instrument_id, instrument_id);
1901 assert_eq!(quote.bid_price, Price::from("3720.25"));
1902 assert_eq!(quote.ask_price, Price::from("3720.50"));
1903 assert_eq!(quote.bid_size, Quantity::from("24"));
1904 assert_eq!(quote.ask_size, Quantity::from("11"));
1905 assert_eq!(quote.ts_event, msg.ts_recv);
1906 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1907 assert_eq!(quote.ts_init, 0);
1908 }
1909
1910 #[rstest]
1911 fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
1912 let ts_recv = 1_609_160_400_000_000_000;
1913 let msg = dbn::Mbp1Msg {
1914 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1915 price: 3_720_250_000_000, size: 5,
1917 action: 'A' as c_char,
1918 side: 'B' as c_char,
1919 flags: dbn::FlagSet::empty(),
1920 depth: 0,
1921 ts_recv,
1922 ts_in_delta: 0,
1923 sequence: 1_170_352,
1924 levels: [dbn::BidAskPair {
1925 bid_px: 3_720_250_000_000, ask_px: i64::MAX, bid_sz: 24,
1928 ask_sz: 0,
1929 bid_ct: 1,
1930 ask_ct: 0,
1931 }],
1932 };
1933
1934 let instrument_id = InstrumentId::from("ESM4.GLBX");
1935 let (maybe_quote, _) =
1936 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1937
1938 assert!(maybe_quote.is_none());
1940 }
1941
1942 #[rstest]
1943 fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
1944 let ts_recv = 1_609_160_400_000_000_000;
1945 let msg = dbn::Mbp1Msg {
1946 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1947 price: 3_720_500_000_000, size: 5,
1949 action: 'A' as c_char,
1950 side: 'A' as c_char,
1951 flags: dbn::FlagSet::empty(),
1952 depth: 0,
1953 ts_recv,
1954 ts_in_delta: 0,
1955 sequence: 1_170_352,
1956 levels: [dbn::BidAskPair {
1957 bid_px: i64::MAX, ask_px: 3_720_500_000_000, bid_sz: 0,
1960 ask_sz: 11,
1961 bid_ct: 0,
1962 ask_ct: 1,
1963 }],
1964 };
1965
1966 let instrument_id = InstrumentId::from("ESM4.GLBX");
1967 let (maybe_quote, _) =
1968 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1969
1970 assert!(maybe_quote.is_none());
1972 }
1973
1974 #[rstest]
1975 fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
1976 let ts_recv = 1_609_160_400_000_000_000;
1977 let msg = dbn::Mbp1Msg {
1978 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1979 price: 3_720_250_000_000, size: 5,
1981 action: 'T' as c_char, side: 'A' as c_char,
1983 flags: dbn::FlagSet::empty(),
1984 depth: 0,
1985 ts_recv,
1986 ts_in_delta: 0,
1987 sequence: 1_170_352,
1988 levels: [dbn::BidAskPair {
1989 bid_px: i64::MAX, ask_px: i64::MAX, bid_sz: 0,
1992 ask_sz: 0,
1993 bid_ct: 0,
1994 ask_ct: 0,
1995 }],
1996 };
1997
1998 let instrument_id = InstrumentId::from("ESM4.GLBX");
1999 let (maybe_quote, maybe_trade) =
2000 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
2001
2002 assert!(maybe_quote.is_none());
2004
2005 let trade = maybe_trade.expect("Expected trade");
2007 assert_eq!(trade.instrument_id, instrument_id);
2008 assert_eq!(trade.price, Price::from("3720.25"));
2009 assert_eq!(trade.size, Quantity::from("5"));
2010 }
2011
2012 #[rstest]
2013 fn test_decode_bbo_1s_msg() {
2014 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
2015 let mut dbn_stream = Decoder::from_zstd_file(path)
2016 .unwrap()
2017 .decode_stream::<dbn::BboMsg>();
2018 let msg = dbn_stream.next().unwrap().unwrap();
2019
2020 let instrument_id = InstrumentId::from("ESM4.GLBX");
2021 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2022 let quote = maybe_quote.expect("Expected valid quote");
2023
2024 assert_eq!(quote.instrument_id, instrument_id);
2025 assert_eq!(quote.bid_price, Price::from("3702.25"));
2026 assert_eq!(quote.ask_price, Price::from("3702.75"));
2027 assert_eq!(quote.bid_size, Quantity::from("18"));
2028 assert_eq!(quote.ask_size, Quantity::from("13"));
2029 assert_eq!(quote.ts_event, msg.ts_recv);
2030 assert_eq!(quote.ts_event, 1609113600000000000);
2031 assert_eq!(quote.ts_init, 0);
2032 }
2033
2034 #[rstest]
2035 fn test_decode_bbo_1m_msg() {
2036 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
2037 let mut dbn_stream = Decoder::from_zstd_file(path)
2038 .unwrap()
2039 .decode_stream::<dbn::BboMsg>();
2040 let msg = dbn_stream.next().unwrap().unwrap();
2041
2042 let instrument_id = InstrumentId::from("ESM4.GLBX");
2043 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2044 let quote = maybe_quote.expect("Expected valid quote");
2045
2046 assert_eq!(quote.instrument_id, instrument_id);
2047 assert_eq!(quote.bid_price, Price::from("3702.25"));
2048 assert_eq!(quote.ask_price, Price::from("3702.75"));
2049 assert_eq!(quote.bid_size, Quantity::from("18"));
2050 assert_eq!(quote.ask_size, Quantity::from("13"));
2051 assert_eq!(quote.ts_event, msg.ts_recv);
2052 assert_eq!(quote.ts_event, 1609113600000000000);
2053 assert_eq!(quote.ts_init, 0);
2054 }
2055
2056 #[rstest]
2057 fn test_decode_mbp10_msg() {
2058 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
2059 let mut dbn_stream = Decoder::from_zstd_file(path)
2060 .unwrap()
2061 .decode_stream::<dbn::Mbp10Msg>();
2062 let msg = dbn_stream.next().unwrap().unwrap();
2063
2064 let instrument_id = InstrumentId::from("ESM4.GLBX");
2065 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2066
2067 assert_eq!(depth10.instrument_id, instrument_id);
2068 assert_eq!(depth10.bids.len(), 10);
2069 assert_eq!(depth10.asks.len(), 10);
2070 assert_eq!(depth10.bid_counts.len(), 10);
2071 assert_eq!(depth10.ask_counts.len(), 10);
2072 assert_eq!(depth10.flags, 128);
2073 assert_eq!(depth10.sequence, 1_170_352);
2074 assert_eq!(depth10.ts_event, msg.ts_recv);
2075 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
2076 assert_eq!(depth10.ts_init, 0);
2077 }
2078
2079 #[rstest]
2080 fn test_decode_trade_msg() {
2081 let path = test_data_path().join("test_data.trades.dbn.zst");
2082 let mut dbn_stream = Decoder::from_zstd_file(path)
2083 .unwrap()
2084 .decode_stream::<dbn::TradeMsg>();
2085 let msg = dbn_stream.next().unwrap().unwrap();
2086
2087 let instrument_id = InstrumentId::from("ESM4.GLBX");
2088 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2089
2090 assert_eq!(trade.instrument_id, instrument_id);
2091 assert_eq!(trade.price, Price::from("3720.25"));
2092 assert_eq!(trade.size, Quantity::from("5"));
2093 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2094 assert_eq!(trade.trade_id.to_string(), "1170380");
2095 assert_eq!(trade.ts_event, msg.ts_recv);
2096 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2097 assert_eq!(trade.ts_init, 0);
2098 }
2099
2100 #[rstest]
2101 fn test_decode_tbbo_msg() {
2102 let path = test_data_path().join("test_data.tbbo.dbn.zst");
2103 let mut dbn_stream = Decoder::from_zstd_file(path)
2104 .unwrap()
2105 .decode_stream::<dbn::Mbp1Msg>();
2106 let msg = dbn_stream.next().unwrap().unwrap();
2107
2108 let instrument_id = InstrumentId::from("ESM4.GLBX");
2109 let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2110 let quote = maybe_quote.expect("Expected valid quote");
2111
2112 assert_eq!(quote.instrument_id, instrument_id);
2113 assert_eq!(quote.bid_price, Price::from("3720.25"));
2114 assert_eq!(quote.ask_price, Price::from("3720.50"));
2115 assert_eq!(quote.bid_size, Quantity::from("26"));
2116 assert_eq!(quote.ask_size, Quantity::from("7"));
2117 assert_eq!(quote.ts_event, msg.ts_recv);
2118 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2119 assert_eq!(quote.ts_init, 0);
2120
2121 assert_eq!(trade.instrument_id, instrument_id);
2122 assert_eq!(trade.price, Price::from("3720.25"));
2123 assert_eq!(trade.size, Quantity::from("5"));
2124 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2125 assert_eq!(trade.trade_id.to_string(), "1170380");
2126 assert_eq!(trade.ts_event, msg.ts_recv);
2127 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2128 assert_eq!(trade.ts_init, 0);
2129 }
2130
2131 #[rstest]
2132 fn test_decode_ohlcv_msg() {
2133 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2134 let mut dbn_stream = Decoder::from_zstd_file(path)
2135 .unwrap()
2136 .decode_stream::<dbn::OhlcvMsg>();
2137 let msg = dbn_stream.next().unwrap().unwrap();
2138
2139 let instrument_id = InstrumentId::from("ESM4.GLBX");
2140 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2141
2142 assert_eq!(
2143 bar.bar_type,
2144 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2145 );
2146 assert_eq!(bar.open, Price::from("372025.00"));
2147 assert_eq!(bar.high, Price::from("372050.00"));
2148 assert_eq!(bar.low, Price::from("372025.00"));
2149 assert_eq!(bar.close, Price::from("372050.00"));
2150 assert_eq!(bar.volume, Quantity::from("57"));
2151 assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); assert_eq!(bar.ts_init, 0); }
2154
2155 #[rstest]
2156 fn test_decode_definition_msg() {
2157 let path = test_data_path().join("test_data.definition.dbn.zst");
2158 let mut dbn_stream = Decoder::from_zstd_file(path)
2159 .unwrap()
2160 .decode_stream::<dbn::InstrumentDefMsg>();
2161 let msg = dbn_stream.next().unwrap().unwrap();
2162
2163 let instrument_id = InstrumentId::from("ESM4.GLBX");
2164 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2165
2166 assert!(result.is_ok());
2167 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2168 }
2169
2170 #[rstest]
2171 fn test_decode_status_msg() {
2172 let path = test_data_path().join("test_data.status.dbn.zst");
2173 let mut dbn_stream = Decoder::from_zstd_file(path)
2174 .unwrap()
2175 .decode_stream::<dbn::StatusMsg>();
2176 let msg = dbn_stream.next().unwrap().unwrap();
2177
2178 let instrument_id = InstrumentId::from("ESM4.GLBX");
2179 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2180
2181 assert_eq!(status.instrument_id, instrument_id);
2182 assert_eq!(status.action, MarketStatusAction::Trading);
2183 assert_eq!(status.ts_event, msg.hd.ts_event);
2184 assert_eq!(status.ts_init, 0);
2185 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2186 assert_eq!(status.trading_event, None);
2187 assert_eq!(status.is_trading, Some(true));
2188 assert_eq!(status.is_quoting, Some(true));
2189 assert_eq!(status.is_short_sell_restricted, None);
2190 }
2191
2192 #[rstest]
2193 fn test_decode_imbalance_msg() {
2194 let path = test_data_path().join("test_data.imbalance.dbn.zst");
2195 let mut dbn_stream = Decoder::from_zstd_file(path)
2196 .unwrap()
2197 .decode_stream::<dbn::ImbalanceMsg>();
2198 let msg = dbn_stream.next().unwrap().unwrap();
2199
2200 let instrument_id = InstrumentId::from("ESM4.GLBX");
2201 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2202
2203 assert_eq!(imbalance.instrument_id, instrument_id);
2204 assert_eq!(imbalance.ref_price, Price::from("229.43"));
2205 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2206 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2207 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2208 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2209 assert_eq!(imbalance.side, OrderSide::Buy);
2210 assert_eq!(imbalance.significant_imbalance, 126);
2211 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2212 assert_eq!(imbalance.ts_recv, msg.ts_recv);
2213 assert_eq!(imbalance.ts_init, 0);
2214 }
2215
2216 #[rstest]
2217 fn test_decode_statistics_msg() {
2218 let path = test_data_path().join("test_data.statistics.dbn.zst");
2219 let mut dbn_stream = Decoder::from_zstd_file(path)
2220 .unwrap()
2221 .decode_stream::<dbn::StatMsg>();
2222 let msg = dbn_stream.next().unwrap().unwrap();
2223
2224 let instrument_id = InstrumentId::from("ESM4.GLBX");
2225 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2226
2227 assert_eq!(statistics.instrument_id, instrument_id);
2228 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2229 assert_eq!(
2230 statistics.update_action,
2231 DatabentoStatisticUpdateAction::Added
2232 );
2233 assert_eq!(statistics.price, Some(Price::from("100.00")));
2234 assert_eq!(statistics.quantity, None);
2235 assert_eq!(statistics.channel_id, 13);
2236 assert_eq!(statistics.stat_flags, 255);
2237 assert_eq!(statistics.sequence, 2);
2238 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2239 assert_eq!(statistics.ts_in_delta, 26961);
2240 assert_eq!(statistics.ts_event, msg.hd.ts_event);
2241 assert_eq!(statistics.ts_recv, msg.ts_recv);
2242 assert_eq!(statistics.ts_init, 0);
2243 }
2244
2245 #[rstest]
2246 fn test_decode_cmbp1_msg() {
2247 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2248 let mut dbn_stream = Decoder::from_zstd_file(path)
2249 .unwrap()
2250 .decode_stream::<dbn::Cmbp1Msg>();
2251 let msg = dbn_stream.next().unwrap().unwrap();
2252
2253 let instrument_id = InstrumentId::from("ESM4.GLBX");
2254 let (maybe_quote, trade) =
2255 decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2256 let quote = maybe_quote.expect("Expected valid quote");
2257
2258 assert_eq!(quote.instrument_id, instrument_id);
2259 assert!(quote.bid_price.raw > 0);
2260 assert!(quote.ask_price.raw > 0);
2261 assert!(quote.bid_size.raw > 0);
2262 assert!(quote.ask_size.raw > 0);
2263 assert_eq!(quote.ts_event, msg.ts_recv);
2264 assert_eq!(quote.ts_init, 0);
2265
2266 if is_trade_msg(msg.action) {
2268 assert!(trade.is_some());
2269 let trade = trade.unwrap();
2270 assert_eq!(trade.instrument_id, instrument_id);
2271 } else {
2272 assert!(trade.is_none());
2273 }
2274 }
2275
2276 #[rstest]
2277 fn test_decode_cbbo_1s_msg() {
2278 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2279 let mut dbn_stream = Decoder::from_zstd_file(path)
2280 .unwrap()
2281 .decode_stream::<dbn::CbboMsg>();
2282 let msg = dbn_stream.next().unwrap().unwrap();
2283
2284 let instrument_id = InstrumentId::from("ESM4.GLBX");
2285 let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2286 let quote = maybe_quote.expect("Expected valid quote");
2287
2288 assert_eq!(quote.instrument_id, instrument_id);
2289 assert!(quote.bid_price.raw > 0);
2290 assert!(quote.ask_price.raw > 0);
2291 assert!(quote.bid_size.raw > 0);
2292 assert!(quote.ask_size.raw > 0);
2293 assert_eq!(quote.ts_event, msg.ts_recv);
2294 assert_eq!(quote.ts_init, 0);
2295 }
2296
2297 #[rstest]
2298 fn test_decode_mbp10_msg_with_all_levels() {
2299 let mut msg = dbn::Mbp10Msg::default();
2300 for i in 0..10 {
2301 msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2302 msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2303 msg.levels[i].bid_sz = 10 + i as u32;
2304 msg.levels[i].ask_sz = 10 + i as u32;
2305 msg.levels[i].bid_ct = 1 + i as u32;
2306 msg.levels[i].ask_ct = 1 + i as u32;
2307 }
2308 msg.ts_recv = 1_609_160_400_000_704_060;
2309
2310 let instrument_id = InstrumentId::from("TEST.VENUE");
2311 let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2312
2313 assert!(result.is_ok());
2314 let depth = result.unwrap();
2315 assert_eq!(depth.bids.len(), 10);
2316 assert_eq!(depth.asks.len(), 10);
2317 assert_eq!(depth.bid_counts.len(), 10);
2318 assert_eq!(depth.ask_counts.len(), 10);
2319 }
2320
2321 #[rstest]
2322 fn test_array_conversion_error_handling() {
2323 let mut bids = Vec::new();
2324 let mut asks = Vec::new();
2325
2326 for i in 0..5 {
2328 bids.push(BookOrder::new(
2329 OrderSide::Buy,
2330 Price::from(format!("{}.00", 100 - i)),
2331 Quantity::from(10),
2332 i as u64,
2333 ));
2334 asks.push(BookOrder::new(
2335 OrderSide::Sell,
2336 Price::from(format!("{}.00", 101 + i)),
2337 Quantity::from(10),
2338 i as u64,
2339 ));
2340 }
2341
2342 let result: Result<[BookOrder; DEPTH10_LEN], _> =
2343 bids.try_into().map_err(|v: Vec<BookOrder>| {
2344 anyhow::anyhow!(
2345 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2346 v.len()
2347 )
2348 });
2349 assert!(result.is_err());
2350 assert!(
2351 result
2352 .unwrap_err()
2353 .to_string()
2354 .contains("Expected exactly 10 bid levels, received 5")
2355 );
2356 }
2357
2358 #[rstest]
2359 fn test_decode_tcbbo_msg() {
2360 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2362 let mut dbn_stream = Decoder::from_zstd_file(path)
2363 .unwrap()
2364 .decode_stream::<dbn::CbboMsg>();
2365 let msg = dbn_stream.next().unwrap().unwrap();
2366
2367 let mut tcbbo_msg = msg.clone();
2369 tcbbo_msg.price = 3702500000000;
2370 tcbbo_msg.size = 10;
2371
2372 let instrument_id = InstrumentId::from("ESM4.GLBX");
2373 let (maybe_quote, trade) =
2374 decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2375 let quote = maybe_quote.expect("Expected valid quote");
2376
2377 assert_eq!(quote.instrument_id, instrument_id);
2378 assert!(quote.bid_price.raw > 0);
2379 assert!(quote.ask_price.raw > 0);
2380 assert!(quote.bid_size.raw > 0);
2381 assert!(quote.ask_size.raw > 0);
2382 assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2383 assert_eq!(quote.ts_init, 0);
2384
2385 assert_eq!(trade.instrument_id, instrument_id);
2386 assert_eq!(trade.price, Price::from("3702.50"));
2387 assert_eq!(trade.size, Quantity::from(10));
2388 assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2389 assert_eq!(trade.ts_init, 0);
2390 }
2391
2392 #[rstest]
2393 fn test_decode_bar_type() {
2394 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2395 let instrument_id = InstrumentId::from("ESM4.GLBX");
2396
2397 msg.hd.rtype = 32;
2399 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2400 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2401
2402 msg.hd.rtype = 33;
2404 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2405 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2406
2407 msg.hd.rtype = 34;
2409 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2410 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2411
2412 msg.hd.rtype = 35;
2414 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2415 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2416
2417 msg.hd.rtype = 99;
2419 let result = decode_bar_type(&msg, instrument_id);
2420 assert!(result.is_err());
2421 }
2422
2423 #[rstest]
2424 fn test_decode_ts_event_adjustment() {
2425 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2426
2427 msg.hd.rtype = 32;
2429 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2430 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2431
2432 msg.hd.rtype = 33;
2434 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2435 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2436
2437 msg.hd.rtype = 34;
2439 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2440 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2441
2442 msg.hd.rtype = 35;
2444 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2445 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2446
2447 msg.hd.rtype = 36;
2449 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2450 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2451
2452 msg.hd.rtype = 99;
2454 let result = decode_ts_event_adjustment(&msg);
2455 assert!(result.is_err());
2456 }
2457
2458 #[rstest]
2459 fn test_decode_record() {
2460 let path = test_data_path().join("test_data.mbo.dbn.zst");
2462 let decoder = Decoder::from_zstd_file(path).unwrap();
2463 let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2464 let msg = dbn_stream.next().unwrap().unwrap();
2465
2466 let record_ref = dbn::RecordRef::from(msg);
2467 let instrument_id = InstrumentId::from("ESM4.GLBX");
2468
2469 let (data1, data2) =
2470 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2471
2472 assert!(data1.is_some());
2473 assert!(data2.is_none());
2474
2475 let path = test_data_path().join("test_data.trades.dbn.zst");
2477 let decoder = Decoder::from_zstd_file(path).unwrap();
2478 let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2479 let msg = dbn_stream.next().unwrap().unwrap();
2480
2481 let record_ref = dbn::RecordRef::from(msg);
2482
2483 let (data1, data2) =
2484 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2485
2486 assert!(data1.is_some());
2487 assert!(data2.is_none());
2488 assert!(matches!(data1.unwrap(), Data::Trade(_)));
2489 }
2490}