1use std::{ffi::c_char, num::NonZeroUsize};
48
49use databento::dbn::{self};
50use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
51use nautilus_model::{
52 data::{
53 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
54 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
55 },
56 enums::{
57 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
58 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
59 },
60 identifiers::{InstrumentId, Symbol, TradeId},
61 instruments::{
62 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
63 },
64 types::{
65 Currency, Price, Quantity,
66 price::{PRICE_UNDEF, decode_raw_price_i64},
67 },
68};
69use ustr::Ustr;
70
71use super::{
72 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
73 types::{DatabentoImbalance, DatabentoStatistics},
74};
75
76const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
78
79const BAR_SPEC_1S: BarSpecification = BarSpecification {
80 step: STEP_ONE,
81 aggregation: BarAggregation::Second,
82 price_type: PriceType::Last,
83};
84const BAR_SPEC_1M: BarSpecification = BarSpecification {
85 step: STEP_ONE,
86 aggregation: BarAggregation::Minute,
87 price_type: PriceType::Last,
88};
89const BAR_SPEC_1H: BarSpecification = BarSpecification {
90 step: STEP_ONE,
91 aggregation: BarAggregation::Hour,
92 price_type: PriceType::Last,
93};
94const BAR_SPEC_1D: BarSpecification = BarSpecification {
95 step: STEP_ONE,
96 aggregation: BarAggregation::Day,
97 price_type: PriceType::Last,
98};
99
100const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
101const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
102const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
103const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
104
105#[must_use]
106pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
107 match c as u8 as char {
108 'Y' => Some(true),
109 'N' => Some(false),
110 _ => None,
111 }
112}
113
114#[must_use]
115pub const fn parse_order_side(c: c_char) -> OrderSide {
116 match c as u8 as char {
117 'A' => OrderSide::Sell,
118 'B' => OrderSide::Buy,
119 _ => OrderSide::NoOrderSide,
120 }
121}
122
123#[must_use]
124pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
125 match c as u8 as char {
126 'A' => AggressorSide::Seller,
127 'B' => AggressorSide::Buyer,
128 _ => AggressorSide::NoAggressor,
129 }
130}
131
132pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
138 match c as u8 as char {
139 'A' => Ok(BookAction::Add),
140 'C' => Ok(BookAction::Delete),
141 'F' => Ok(BookAction::Update),
142 'M' => Ok(BookAction::Update),
143 'R' => Ok(BookAction::Clear),
144 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
145 }
146}
147
148pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
154 match c as u8 as char {
155 'C' => Ok(OptionKind::Call),
156 'P' => Ok(OptionKind::Put),
157 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
158 }
159}
160
161fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
162 match value {
163 Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
164 log::warn!("Unknown currency code '{value}', defaulting to USD");
165 Currency::USD()
166 }),
167 Ok(_) => Currency::USD(),
168 Err(e) => {
169 log::error!("Error parsing currency: {e}");
170 Currency::USD()
171 }
172 }
173}
174
175#[must_use]
179pub fn parse_cfi_iso10926(value: &str) -> (Option<AssetClass>, Option<InstrumentClass>) {
180 let chars: Vec<char> = value.chars().collect();
181 if chars.len() < 3 {
182 return (None, None);
183 }
184
185 let cfi_category = chars[0];
187 let cfi_group = chars[1];
188 let cfi_attribute1 = chars[2];
189 let mut asset_class = match cfi_category {
194 'D' => Some(AssetClass::Debt),
195 'E' => Some(AssetClass::Equity),
196 'S' => None,
197 _ => None,
198 };
199
200 let instrument_class = match cfi_group {
201 'I' => Some(InstrumentClass::Future),
202 _ => None,
203 };
204
205 if cfi_attribute1 == 'I' {
206 asset_class = Some(AssetClass::Index);
207 }
208
209 (asset_class, instrument_class)
210}
211
212fn decode_underlying(underlying_str: &str, symbol: &Symbol) -> Ustr {
213 if underlying_str.is_empty() {
214 symbol
216 .as_str()
217 .split_whitespace()
218 .next()
219 .map_or_else(|| symbol.inner(), Ustr::from)
220 } else {
221 Ustr::from(underlying_str)
222 }
223}
224
225pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
233 let value_str = match value {
234 0 => return Ok(None),
235 1 => "Scheduled",
236 2 => "Surveillance intervention",
237 3 => "Market event",
238 4 => "Instrument activation",
239 5 => "Instrument expiration",
240 6 => "Recovery in process",
241 10 => "Regulatory",
242 11 => "Administrative",
243 12 => "Non-compliance",
244 13 => "Filings not current",
245 14 => "SEC trading suspension",
246 15 => "New issue",
247 16 => "Issue available",
248 17 => "Issues reviewed",
249 18 => "Filing requirements satisfied",
250 30 => "News pending",
251 31 => "News released",
252 32 => "News and resumption times",
253 33 => "News not forthcoming",
254 40 => "Order imbalance",
255 50 => "LULD pause",
256 60 => "Operational",
257 70 => "Additional information requested",
258 80 => "Merger effective",
259 90 => "ETF",
260 100 => "Corporate action",
261 110 => "New Security offering",
262 120 => "Market wide halt level 1",
263 121 => "Market wide halt level 2",
264 122 => "Market wide halt level 3",
265 123 => "Market wide halt carryover",
266 124 => "Market wide halt resumption",
267 130 => "Quotation not available",
268 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
269 };
270
271 Ok(Some(Ustr::from(value_str)))
272}
273
274pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
280 let value_str = match value {
281 0 => return Ok(None),
282 1 => "No cancel",
283 2 => "Change trading session",
284 3 => "Implied matching on",
285 4 => "Implied matching off",
286 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
287 };
288
289 Ok(Some(Ustr::from(value_str)))
290}
291
292#[inline(always)]
301pub fn decode_price(value: i64, precision: u8, field_name: &str) -> anyhow::Result<Price> {
302 if value == i64::MAX {
303 anyhow::bail!("Missing required price for `{field_name}`")
304 } else {
305 Ok(Price::from_raw(decode_raw_price_i64(value), precision))
306 }
307}
308
309#[inline(always)]
314#[must_use]
315pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
316 if value == i64::MAX {
317 None
318 } else {
319 Some(Price::from_raw(decode_raw_price_i64(value), precision))
320 }
321}
322
323#[inline(always)]
328#[must_use]
329pub fn decode_price_or_undef(value: i64, precision: u8) -> Price {
330 if value == i64::MAX {
331 Price::from_raw(PRICE_UNDEF, 0)
332 } else {
333 Price::from_raw(decode_raw_price_i64(value), precision)
334 }
335}
336
337#[inline(always)]
339#[must_use]
340pub fn decode_price_increment(value: i64, precision: u8) -> Price {
341 match value {
342 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
343 _ => Price::from_raw(decode_raw_price_i64(value), precision),
344 }
345}
346
347#[inline(always)]
349#[must_use]
350pub fn decode_quantity(value: u64) -> Quantity {
351 Quantity::from(value)
352}
353
354#[inline(always)]
356#[must_use]
357pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
358 match value {
359 i64::MAX => None,
360 _ => Some(Quantity::from(value)),
361 }
362}
363
364#[inline(always)]
372pub fn decode_timestamp(value: u64, field_name: &str) -> anyhow::Result<UnixNanos> {
373 if value == dbn::UNDEF_TIMESTAMP {
374 anyhow::bail!("Missing required timestamp for `{field_name}`")
375 } else {
376 Ok(UnixNanos::from(value))
377 }
378}
379
380#[inline(always)]
384#[must_use]
385pub fn decode_optional_timestamp(value: u64) -> Option<UnixNanos> {
386 if value == dbn::UNDEF_TIMESTAMP {
387 None
388 } else {
389 Some(UnixNanos::from(value))
390 }
391}
392
393pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
400 const SCALE: u128 = 1_000_000_000;
401
402 match value {
403 0 | i64::MAX => Ok(Quantity::from(1)),
404 v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
405 v => {
406 let abs = v as u128;
409 let int_part = abs / SCALE;
410 let frac_part = abs % SCALE;
411
412 if frac_part == 0 {
415 Ok(Quantity::from(int_part as u64))
417 } else {
418 let mut frac_str = format!("{frac_part:09}");
419 while frac_str.ends_with('0') {
420 frac_str.pop();
421 }
422 let s = format!("{int_part}.{frac_str}");
423 Ok(Quantity::from(s))
424 }
425 }
426 }
427}
428
429#[inline(always)]
431#[must_use]
432pub fn decode_lot_size(value: i32) -> Quantity {
433 match value {
434 0 | i32::MAX => Quantity::from(1),
435 value => Quantity::from(value),
436 }
437}
438
439#[inline(always)]
440#[must_use]
441fn is_trade_msg(action: c_char) -> bool {
442 action as u8 as char == 'T'
443}
444
445#[inline(always)]
450#[must_use]
451fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
452 bid_px != i64::MAX && ask_px != i64::MAX
453}
454
455pub fn decode_mbo_msg(
464 msg: &dbn::MboMsg,
465 instrument_id: InstrumentId,
466 price_precision: u8,
467 ts_init: Option<UnixNanos>,
468 include_trades: bool,
469) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
470 let side = parse_order_side(msg.side);
471 if is_trade_msg(msg.action) {
472 if include_trades && msg.size > 0 {
473 let price = decode_price_or_undef(msg.price, price_precision);
474 let size = decode_quantity(msg.size as u64);
475 let aggressor_side = parse_aggressor_side(msg.side);
476 let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
477 let ts_event = msg.ts_recv.into();
478 let ts_init = ts_init.unwrap_or(ts_event);
479
480 let trade = TradeTick::new(
481 instrument_id,
482 price,
483 size,
484 aggressor_side,
485 trade_id,
486 ts_event,
487 ts_init,
488 );
489 return Ok((None, Some(trade)));
490 }
491
492 return Ok((None, None));
493 }
494
495 let action = parse_book_action(msg.action)?;
496 let price = decode_price_or_undef(msg.price, price_precision);
497 let size = decode_quantity(msg.size as u64);
498 let order = BookOrder::new(side, price, size, msg.order_id);
499
500 let ts_event = msg.ts_recv.into();
501 let ts_init = ts_init.unwrap_or(ts_event);
502
503 let delta = OrderBookDelta::new(
504 instrument_id,
505 action,
506 order,
507 msg.flags.raw(),
508 msg.sequence.into(),
509 ts_event,
510 ts_init,
511 );
512
513 Ok((Some(delta), None))
514}
515
516pub fn decode_trade_msg(
522 msg: &dbn::TradeMsg,
523 instrument_id: InstrumentId,
524 price_precision: u8,
525 ts_init: Option<UnixNanos>,
526) -> anyhow::Result<TradeTick> {
527 let ts_event = msg.ts_recv.into();
528 let ts_init = ts_init.unwrap_or(ts_event);
529
530 let trade = TradeTick::new(
531 instrument_id,
532 decode_price_or_undef(msg.price, price_precision),
533 decode_quantity(msg.size as u64),
534 parse_aggressor_side(msg.side),
535 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
536 ts_event,
537 ts_init,
538 );
539
540 Ok(trade)
541}
542
543pub fn decode_tbbo_msg(
552 msg: &dbn::TbboMsg,
553 instrument_id: InstrumentId,
554 price_precision: u8,
555 ts_init: Option<UnixNanos>,
556) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
557 let top_level = &msg.levels[0];
558 let ts_event = msg.ts_recv.into();
559 let ts_init = ts_init.unwrap_or(ts_event);
560
561 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
562 Some(QuoteTick::new(
563 instrument_id,
564 decode_price_or_undef(top_level.bid_px, price_precision),
565 decode_price_or_undef(top_level.ask_px, price_precision),
566 decode_quantity(top_level.bid_sz as u64),
567 decode_quantity(top_level.ask_sz as u64),
568 ts_event,
569 ts_init,
570 ))
571 } else {
572 None
573 };
574
575 let trade = TradeTick::new(
576 instrument_id,
577 decode_price_or_undef(msg.price, price_precision),
578 decode_quantity(msg.size as u64),
579 parse_aggressor_side(msg.side),
580 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
581 ts_event,
582 ts_init,
583 );
584
585 Ok((maybe_quote, trade))
586}
587
588pub fn decode_mbp1_msg(
596 msg: &dbn::Mbp1Msg,
597 instrument_id: InstrumentId,
598 price_precision: u8,
599 ts_init: Option<UnixNanos>,
600 include_trades: bool,
601) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
602 let top_level = &msg.levels[0];
603 let ts_event = msg.ts_recv.into();
604 let ts_init = ts_init.unwrap_or(ts_event);
605
606 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
607 Some(QuoteTick::new(
608 instrument_id,
609 decode_price_or_undef(top_level.bid_px, price_precision),
610 decode_price_or_undef(top_level.ask_px, price_precision),
611 decode_quantity(top_level.bid_sz as u64),
612 decode_quantity(top_level.ask_sz as u64),
613 ts_event,
614 ts_init,
615 ))
616 } else {
617 None
618 };
619
620 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
621 Some(TradeTick::new(
622 instrument_id,
623 decode_price_or_undef(msg.price, price_precision),
624 decode_quantity(msg.size as u64),
625 parse_aggressor_side(msg.side),
626 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
627 ts_event,
628 ts_init,
629 ))
630 } else {
631 None
632 };
633
634 Ok((maybe_quote, maybe_trade))
635}
636
637pub fn decode_bbo_msg(
645 msg: &dbn::BboMsg,
646 instrument_id: InstrumentId,
647 price_precision: u8,
648 ts_init: Option<UnixNanos>,
649) -> anyhow::Result<Option<QuoteTick>> {
650 let top_level = &msg.levels[0];
651 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
652 return Ok(None);
653 }
654
655 let ts_event = msg.ts_recv.into();
656 let ts_init = ts_init.unwrap_or(ts_event);
657
658 let quote = QuoteTick::new(
659 instrument_id,
660 decode_price_or_undef(top_level.bid_px, price_precision),
661 decode_price_or_undef(top_level.ask_px, price_precision),
662 decode_quantity(top_level.bid_sz as u64),
663 decode_quantity(top_level.ask_sz as u64),
664 ts_event,
665 ts_init,
666 );
667
668 Ok(Some(quote))
669}
670
671pub fn decode_mbp10_msg(
677 msg: &dbn::Mbp10Msg,
678 instrument_id: InstrumentId,
679 price_precision: u8,
680 ts_init: Option<UnixNanos>,
681) -> anyhow::Result<OrderBookDepth10> {
682 let mut bids = Vec::with_capacity(DEPTH10_LEN);
683 let mut asks = Vec::with_capacity(DEPTH10_LEN);
684 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
685 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
686
687 for level in &msg.levels {
688 let bid_order = BookOrder::new(
689 OrderSide::Buy,
690 decode_price_or_undef(level.bid_px, price_precision),
691 decode_quantity(level.bid_sz as u64),
692 0,
693 );
694
695 let ask_order = BookOrder::new(
696 OrderSide::Sell,
697 decode_price_or_undef(level.ask_px, price_precision),
698 decode_quantity(level.ask_sz as u64),
699 0,
700 );
701
702 bids.push(bid_order);
703 asks.push(ask_order);
704 bid_counts.push(level.bid_ct);
705 ask_counts.push(level.ask_ct);
706 }
707
708 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
709 anyhow::anyhow!(
710 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
711 v.len()
712 )
713 })?;
714
715 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
716 anyhow::anyhow!(
717 "Expected exactly {DEPTH10_LEN} ask levels, received {}",
718 v.len()
719 )
720 })?;
721
722 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
723 anyhow::anyhow!(
724 "Expected exactly {DEPTH10_LEN} bid counts, received {}",
725 v.len()
726 )
727 })?;
728
729 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
730 anyhow::anyhow!(
731 "Expected exactly {DEPTH10_LEN} ask counts, received {}",
732 v.len()
733 )
734 })?;
735
736 let ts_event = msg.ts_recv.into();
737 let ts_init = ts_init.unwrap_or(ts_event);
738
739 let depth = OrderBookDepth10::new(
740 instrument_id,
741 bids,
742 asks,
743 bid_counts,
744 ask_counts,
745 msg.flags.raw(),
746 msg.sequence.into(),
747 ts_event,
748 ts_init,
749 );
750
751 Ok(depth)
752}
753
754pub fn decode_cmbp1_msg(
763 msg: &dbn::Cmbp1Msg,
764 instrument_id: InstrumentId,
765 price_precision: u8,
766 ts_init: Option<UnixNanos>,
767 include_trades: bool,
768) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
769 let top_level = &msg.levels[0];
770 let ts_event = msg.ts_recv.into();
771 let ts_init = ts_init.unwrap_or(ts_event);
772
773 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
774 Some(QuoteTick::new(
775 instrument_id,
776 decode_price_or_undef(top_level.bid_px, price_precision),
777 decode_price_or_undef(top_level.ask_px, price_precision),
778 decode_quantity(top_level.bid_sz as u64),
779 decode_quantity(top_level.ask_sz as u64),
780 ts_event,
781 ts_init,
782 ))
783 } else {
784 None
785 };
786
787 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
788 Some(TradeTick::new(
790 instrument_id,
791 decode_price_or_undef(msg.price, price_precision),
792 decode_quantity(msg.size as u64),
793 parse_aggressor_side(msg.side),
794 TradeId::new(UUID4::new().as_str()),
795 ts_event,
796 ts_init,
797 ))
798 } else {
799 None
800 };
801
802 Ok((maybe_quote, maybe_trade))
803}
804
805pub fn decode_cbbo_msg(
813 msg: &dbn::CbboMsg,
814 instrument_id: InstrumentId,
815 price_precision: u8,
816 ts_init: Option<UnixNanos>,
817) -> anyhow::Result<Option<QuoteTick>> {
818 let top_level = &msg.levels[0];
819 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
820 return Ok(None);
821 }
822
823 let ts_event = msg.ts_recv.into();
824 let ts_init = ts_init.unwrap_or(ts_event);
825
826 let quote = QuoteTick::new(
827 instrument_id,
828 decode_price_or_undef(top_level.bid_px, price_precision),
829 decode_price_or_undef(top_level.ask_px, price_precision),
830 decode_quantity(top_level.bid_sz as u64),
831 decode_quantity(top_level.ask_sz as u64),
832 ts_event,
833 ts_init,
834 );
835
836 Ok(Some(quote))
837}
838
839pub fn decode_tcbbo_msg(
848 msg: &dbn::CbboMsg,
849 instrument_id: InstrumentId,
850 price_precision: u8,
851 ts_init: Option<UnixNanos>,
852) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
853 let top_level = &msg.levels[0];
854 let ts_event = msg.ts_recv.into();
855 let ts_init = ts_init.unwrap_or(ts_event);
856
857 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
858 Some(QuoteTick::new(
859 instrument_id,
860 decode_price_or_undef(top_level.bid_px, price_precision),
861 decode_price_or_undef(top_level.ask_px, price_precision),
862 decode_quantity(top_level.bid_sz as u64),
863 decode_quantity(top_level.ask_sz as u64),
864 ts_event,
865 ts_init,
866 ))
867 } else {
868 None
869 };
870
871 let trade = TradeTick::new(
873 instrument_id,
874 decode_price_or_undef(msg.price, price_precision),
875 decode_quantity(msg.size as u64),
876 parse_aggressor_side(msg.side),
877 TradeId::new(UUID4::new().as_str()),
878 ts_event,
879 ts_init,
880 );
881
882 Ok((maybe_quote, trade))
883}
884
885pub fn decode_bar_type(
889 msg: &dbn::OhlcvMsg,
890 instrument_id: InstrumentId,
891) -> anyhow::Result<BarType> {
892 let bar_type = match msg.hd.rtype {
893 32 => {
894 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
896 }
897 33 => {
898 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
900 }
901 34 => {
902 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
904 }
905 35 => {
906 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
908 }
909 36 => {
910 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
912 }
913 _ => anyhow::bail!(
914 "`rtype` is not a supported bar aggregation, was {}",
915 msg.hd.rtype
916 ),
917 };
918
919 Ok(bar_type)
920}
921
922pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
926 let adjustment = match msg.hd.rtype {
927 32 => {
928 BAR_CLOSE_ADJUSTMENT_1S
930 }
931 33 => {
932 BAR_CLOSE_ADJUSTMENT_1M
934 }
935 34 => {
936 BAR_CLOSE_ADJUSTMENT_1H
938 }
939 35 | 36 => {
940 BAR_CLOSE_ADJUSTMENT_1D
942 }
943 _ => anyhow::bail!(
944 "`rtype` is not a supported bar aggregation, was {}",
945 msg.hd.rtype
946 ),
947 };
948
949 Ok(adjustment.into())
950}
951
952pub fn decode_ohlcv_msg(
956 msg: &dbn::OhlcvMsg,
957 instrument_id: InstrumentId,
958 price_precision: u8,
959 ts_init: Option<UnixNanos>,
960 timestamp_on_close: bool,
961) -> anyhow::Result<Bar> {
962 let bar_type = decode_bar_type(msg, instrument_id)?;
963 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
964
965 let ts_event_raw = msg.hd.ts_event.into();
966 let ts_close = ts_event_raw + ts_event_adjustment;
967 let ts_init = ts_init.unwrap_or(ts_close); let ts_event = if timestamp_on_close {
970 ts_close
971 } else {
972 ts_event_raw
973 };
974
975 let bar = Bar::new(
976 bar_type,
977 decode_price_or_undef(msg.open, price_precision),
978 decode_price_or_undef(msg.high, price_precision),
979 decode_price_or_undef(msg.low, price_precision),
980 decode_price_or_undef(msg.close, price_precision),
981 decode_quantity(msg.volume),
982 ts_event,
983 ts_init,
984 );
985
986 Ok(bar)
987}
988
989pub fn decode_status_msg(
995 msg: &dbn::StatusMsg,
996 instrument_id: InstrumentId,
997 ts_init: Option<UnixNanos>,
998) -> anyhow::Result<InstrumentStatus> {
999 let ts_event = msg.hd.ts_event.into();
1000 let ts_init = ts_init.unwrap_or(ts_event);
1001
1002 let action = MarketStatusAction::from_u16(msg.action)
1003 .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
1004
1005 let status = InstrumentStatus::new(
1006 instrument_id,
1007 action,
1008 ts_event,
1009 ts_init,
1010 parse_status_reason(msg.reason)?,
1011 parse_status_trading_event(msg.trading_event)?,
1012 parse_optional_bool(msg.is_trading),
1013 parse_optional_bool(msg.is_quoting),
1014 parse_optional_bool(msg.is_short_sell_restricted),
1015 );
1016
1017 Ok(status)
1018}
1019
1020pub fn decode_record(
1024 record: &dbn::RecordRef,
1025 instrument_id: InstrumentId,
1026 price_precision: u8,
1027 ts_init: Option<UnixNanos>,
1028 include_trades: bool,
1029 bars_timestamp_on_close: bool,
1030) -> anyhow::Result<(Option<Data>, Option<Data>)> {
1031 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
1035 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1036 let result = decode_mbo_msg(
1037 msg,
1038 instrument_id,
1039 price_precision,
1040 Some(ts_init),
1041 include_trades,
1042 )?;
1043 match result {
1044 (Some(delta), None) => (Some(Data::Delta(delta)), None),
1045 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
1046 (None, None) => (None, None),
1047 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
1048 }
1049 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
1050 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1051 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1052 (Some(Data::Trade(trade)), None)
1053 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
1054 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1055 let (maybe_quote, maybe_trade) = decode_mbp1_msg(
1056 msg,
1057 instrument_id,
1058 price_precision,
1059 Some(ts_init),
1060 include_trades,
1061 )?;
1062 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1063 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
1064 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1065 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1066 (maybe_quote.map(Data::Quote), None)
1067 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
1068 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1069 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1070 (maybe_quote.map(Data::Quote), None)
1071 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
1072 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1073 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1074 (Some(Data::from(depth)), None)
1075 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1076 let bar = decode_ohlcv_msg(
1079 msg,
1080 instrument_id,
1081 price_precision,
1082 ts_init,
1083 bars_timestamp_on_close,
1084 )?;
1085 (Some(Data::Bar(bar)), None)
1086 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1087 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1088 let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1089 msg,
1090 instrument_id,
1091 price_precision,
1092 Some(ts_init),
1093 include_trades,
1094 )?;
1095 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1096 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1097 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1099 let (maybe_quote, trade) =
1100 decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1101 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1102 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1103 if msg.price != i64::MAX && msg.size > 0 {
1105 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1107 let (maybe_quote, trade) =
1108 decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1109 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1110 } else {
1111 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1113 let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1114 (maybe_quote.map(Data::Quote), None)
1115 }
1116 } else {
1117 anyhow::bail!("DBN message type is not currently supported")
1118 };
1119
1120 Ok(result)
1121}
1122
1123const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1124 match ts_init {
1125 Some(ts_init) => ts_init,
1126 None => msg_timestamp,
1127 }
1128}
1129
1130pub fn decode_instrument_def_msg(
1134 msg: &dbn::InstrumentDefMsg,
1135 instrument_id: InstrumentId,
1136 ts_init: Option<UnixNanos>,
1137) -> anyhow::Result<InstrumentAny> {
1138 match msg.instrument_class as u8 as char {
1139 'K' => Ok(InstrumentAny::Equity(decode_equity(
1140 msg,
1141 instrument_id,
1142 ts_init,
1143 )?)),
1144 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1145 msg,
1146 instrument_id,
1147 ts_init,
1148 )?)),
1149 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1150 msg,
1151 instrument_id,
1152 ts_init,
1153 )?)),
1154 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1155 msg,
1156 instrument_id,
1157 ts_init,
1158 )?)),
1159 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1160 msg,
1161 instrument_id,
1162 ts_init,
1163 )?)),
1164 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1165 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1166 _ => anyhow::bail!(
1167 "Unsupported `instrument_class` '{}'",
1168 msg.instrument_class as u8 as char
1169 ),
1170 }
1171}
1172
1173pub fn decode_equity(
1179 msg: &dbn::InstrumentDefMsg,
1180 instrument_id: InstrumentId,
1181 ts_init: Option<UnixNanos>,
1182) -> anyhow::Result<Equity> {
1183 let currency = parse_currency_or_usd_default(msg.currency());
1184 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1185 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1186 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1188
1189 Ok(Equity::new(
1190 instrument_id,
1191 instrument_id.symbol,
1192 None, currency,
1194 price_increment.precision,
1195 price_increment,
1196 Some(lot_size),
1197 None, None, None, None, None, None, None, None, ts_event,
1206 ts_init,
1207 ))
1208}
1209
1210pub fn decode_futures_contract(
1216 msg: &dbn::InstrumentDefMsg,
1217 instrument_id: InstrumentId,
1218 ts_init: Option<UnixNanos>,
1219) -> anyhow::Result<FuturesContract> {
1220 let currency = parse_currency_or_usd_default(msg.currency());
1221 let exchange = Ustr::from(msg.exchange()?);
1222 let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1223 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1224 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1225 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1226 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1227 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1229
1230 FuturesContract::new_checked(
1231 instrument_id,
1232 instrument_id.symbol,
1233 asset_class.unwrap_or(AssetClass::Commodity),
1234 Some(exchange),
1235 underlying,
1236 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1237 decode_timestamp(msg.expiration, "expiration")?,
1238 currency,
1239 price_increment.precision,
1240 price_increment,
1241 multiplier,
1242 lot_size,
1243 None, None, None, None, None, None, None, None, ts_event,
1252 ts_init,
1253 )
1254}
1255
1256pub fn decode_futures_spread(
1262 msg: &dbn::InstrumentDefMsg,
1263 instrument_id: InstrumentId,
1264 ts_init: Option<UnixNanos>,
1265) -> anyhow::Result<FuturesSpread> {
1266 let exchange = Ustr::from(msg.exchange()?);
1267 let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1268 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1269 let strategy_type = Ustr::from(msg.secsubtype()?);
1270 let currency = parse_currency_or_usd_default(msg.currency());
1271 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1272 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1273 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1274 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1276
1277 FuturesSpread::new_checked(
1278 instrument_id,
1279 instrument_id.symbol,
1280 asset_class.unwrap_or(AssetClass::Commodity),
1281 Some(exchange),
1282 underlying,
1283 strategy_type,
1284 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1285 decode_timestamp(msg.expiration, "expiration")?,
1286 currency,
1287 price_increment.precision,
1288 price_increment,
1289 multiplier,
1290 lot_size,
1291 None, None, None, None, None, None, None, None, ts_event,
1300 ts_init,
1301 )
1302}
1303
1304pub fn decode_option_contract(
1310 msg: &dbn::InstrumentDefMsg,
1311 instrument_id: InstrumentId,
1312 ts_init: Option<UnixNanos>,
1313) -> anyhow::Result<OptionContract> {
1314 let currency = parse_currency_or_usd_default(msg.currency());
1315 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1316 let exchange = Ustr::from(msg.exchange()?);
1317 let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1318 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1319 Some(AssetClass::Equity)
1320 } else {
1321 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1322 asset_class
1323 };
1324 let option_kind = parse_option_kind(msg.instrument_class)?;
1325 let strike_price = decode_price(
1326 msg.strike_price,
1327 strike_price_currency.precision,
1328 "strike_price",
1329 )?;
1330 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1331 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1332 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1333 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1335
1336 OptionContract::new_checked(
1337 instrument_id,
1338 instrument_id.symbol,
1339 asset_class_opt.unwrap_or(AssetClass::Commodity),
1340 Some(exchange),
1341 underlying,
1342 option_kind,
1343 strike_price,
1344 currency,
1345 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1346 decode_timestamp(msg.expiration, "expiration")?,
1347 price_increment.precision,
1348 price_increment,
1349 multiplier,
1350 lot_size,
1351 None, None, None, None, None, None, None, None, ts_event,
1360 ts_init,
1361 )
1362}
1363
1364pub fn decode_option_spread(
1370 msg: &dbn::InstrumentDefMsg,
1371 instrument_id: InstrumentId,
1372 ts_init: Option<UnixNanos>,
1373) -> anyhow::Result<OptionSpread> {
1374 let exchange = Ustr::from(msg.exchange()?);
1375 let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1376 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1377 Some(AssetClass::Equity)
1378 } else {
1379 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1380 asset_class
1381 };
1382 let strategy_type = Ustr::from(msg.secsubtype()?);
1383 let currency = parse_currency_or_usd_default(msg.currency());
1384 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1385 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1386 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1387 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1389
1390 OptionSpread::new_checked(
1391 instrument_id,
1392 instrument_id.symbol,
1393 asset_class_opt.unwrap_or(AssetClass::Commodity),
1394 Some(exchange),
1395 underlying,
1396 strategy_type,
1397 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1398 decode_timestamp(msg.expiration, "expiration")?,
1399 currency,
1400 price_increment.precision,
1401 price_increment,
1402 multiplier,
1403 lot_size,
1404 None, None, None, None, None, None, None, None, ts_event,
1413 ts_init,
1414 )
1415}
1416
1417pub fn decode_imbalance_msg(
1423 msg: &dbn::ImbalanceMsg,
1424 instrument_id: InstrumentId,
1425 price_precision: u8,
1426 ts_init: Option<UnixNanos>,
1427) -> anyhow::Result<DatabentoImbalance> {
1428 let ts_event = msg.ts_recv.into();
1429 let ts_init = ts_init.unwrap_or(ts_event);
1430
1431 Ok(DatabentoImbalance::new(
1432 instrument_id,
1433 decode_price_or_undef(msg.ref_price, price_precision),
1434 decode_price_or_undef(msg.cont_book_clr_price, price_precision),
1435 decode_price_or_undef(msg.auct_interest_clr_price, price_precision),
1436 Quantity::new(f64::from(msg.paired_qty), 0),
1437 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1438 parse_order_side(msg.side),
1439 msg.significant_imbalance as c_char,
1440 msg.hd.ts_event.into(),
1441 ts_event,
1442 ts_init,
1443 ))
1444}
1445
1446pub fn decode_statistics_msg(
1453 msg: &dbn::StatMsg,
1454 instrument_id: InstrumentId,
1455 price_precision: u8,
1456 ts_init: Option<UnixNanos>,
1457) -> anyhow::Result<DatabentoStatistics> {
1458 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1459 .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1460 let update_action =
1461 DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1462 anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1463 })?;
1464 let ts_event = msg.ts_recv.into();
1465 let ts_init = ts_init.unwrap_or(ts_event);
1466
1467 Ok(DatabentoStatistics::new(
1468 instrument_id,
1469 stat_type,
1470 update_action,
1471 decode_optional_price(msg.price, price_precision),
1472 decode_optional_quantity(msg.quantity),
1473 msg.channel_id,
1474 msg.stat_flags,
1475 msg.sequence,
1476 msg.ts_ref.into(),
1477 msg.ts_in_delta,
1478 msg.hd.ts_event.into(),
1479 ts_event,
1480 ts_init,
1481 ))
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486 use std::path::{Path, PathBuf};
1487
1488 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1489 use fallible_streaming_iterator::FallibleStreamingIterator;
1490 use nautilus_model::instruments::Instrument;
1491 use rstest::*;
1492
1493 use super::*;
1494
1495 fn test_data_path() -> PathBuf {
1496 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1497 }
1498
1499 #[rstest]
1500 #[case('Y' as c_char, Some(true))]
1501 #[case('N' as c_char, Some(false))]
1502 #[case('X' as c_char, None)]
1503 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1504 assert_eq!(parse_optional_bool(input), expected);
1505 }
1506
1507 #[rstest]
1508 #[case('A' as c_char, OrderSide::Sell)]
1509 #[case('B' as c_char, OrderSide::Buy)]
1510 #[case('X' as c_char, OrderSide::NoOrderSide)]
1511 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1512 assert_eq!(parse_order_side(input), expected);
1513 }
1514
1515 #[rstest]
1516 #[case('A' as c_char, AggressorSide::Seller)]
1517 #[case('B' as c_char, AggressorSide::Buyer)]
1518 #[case('X' as c_char, AggressorSide::NoAggressor)]
1519 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1520 assert_eq!(parse_aggressor_side(input), expected);
1521 }
1522
1523 #[rstest]
1524 #[case('T' as c_char, true)]
1525 #[case('A' as c_char, false)]
1526 #[case('C' as c_char, false)]
1527 #[case('F' as c_char, false)]
1528 #[case('M' as c_char, false)]
1529 #[case('R' as c_char, false)]
1530 fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1531 assert_eq!(is_trade_msg(action), expected);
1532 }
1533
1534 #[rstest]
1535 #[case('A' as c_char, Ok(BookAction::Add))]
1536 #[case('C' as c_char, Ok(BookAction::Delete))]
1537 #[case('F' as c_char, Ok(BookAction::Update))]
1538 #[case('M' as c_char, Ok(BookAction::Update))]
1539 #[case('R' as c_char, Ok(BookAction::Clear))]
1540 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1541 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1542 match parse_book_action(input) {
1543 Ok(action) => assert_eq!(Ok(action), expected),
1544 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1545 }
1546 }
1547
1548 #[rstest]
1549 #[case('C' as c_char, Ok(OptionKind::Call))]
1550 #[case('P' as c_char, Ok(OptionKind::Put))]
1551 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1552 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1553 match parse_option_kind(input) {
1554 Ok(kind) => assert_eq!(Ok(kind), expected),
1555 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1556 }
1557 }
1558
1559 #[rstest]
1560 #[case(Ok("USD"), Currency::USD())]
1561 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1562 #[case(Ok(""), Currency::USD())]
1563 #[case(Err("Error"), Currency::USD())]
1564 fn test_parse_currency_or_usd_default(
1565 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1567 ) {
1568 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1569 assert_eq!(actual, expected);
1570 }
1571
1572 #[rstest]
1573 #[case("DII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1574 #[case("EII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1575 #[case("EIA", (Some(AssetClass::Equity), Some(InstrumentClass::Future)))]
1576 #[case("XXX", (None, None))]
1577 #[case("D", (None, None))]
1578 #[case("", (None, None))]
1579 fn test_parse_cfi_iso10926(
1580 #[case] input: &str,
1581 #[case] expected: (Option<AssetClass>, Option<InstrumentClass>),
1582 ) {
1583 let result = parse_cfi_iso10926(input);
1584 assert_eq!(result, expected);
1585 }
1586
1587 #[rstest]
1588 #[case(0, 2, Price::from_raw(0, 2))]
1589 #[case(
1590 1_000_000_000,
1591 2,
1592 Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1593 )]
1594 fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1595 let actual = decode_price(value, precision, "test_field").unwrap();
1596 assert_eq!(actual, expected);
1597 }
1598
1599 #[rstest]
1600 fn test_decode_price_undefined_errors() {
1601 let result = decode_price(i64::MAX, 2, "strike_price");
1602 assert!(result.is_err());
1603 assert!(result.unwrap_err().to_string().contains("strike_price"));
1604 }
1605
1606 #[rstest]
1607 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(
1610 10_000_000_000,
1611 2,
1612 Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1613 )]
1614 fn test_decode_price_increment(
1615 #[case] value: i64,
1616 #[case] precision: u8,
1617 #[case] expected: Price,
1618 ) {
1619 let actual = decode_price_increment(value, precision);
1620 assert_eq!(actual, expected);
1621 }
1622
1623 #[rstest]
1624 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(
1627 10_000_000_000,
1628 2,
1629 Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1630 )]
1631 fn test_decode_optional_price(
1632 #[case] value: i64,
1633 #[case] precision: u8,
1634 #[case] expected: Option<Price>,
1635 ) {
1636 let actual = decode_optional_price(value, precision);
1637 assert_eq!(actual, expected);
1638 }
1639
1640 #[rstest]
1641 #[case(0, 2, Price::from_raw(0, 2))]
1642 #[case(
1643 1_000_000_000,
1644 2,
1645 Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1646 )]
1647 #[case(i64::MAX, 2, Price::from_raw(PRICE_UNDEF, 0))] fn test_decode_price_or_undef(
1649 #[case] value: i64,
1650 #[case] precision: u8,
1651 #[case] expected: Price,
1652 ) {
1653 let actual = decode_price_or_undef(value, precision);
1654 assert_eq!(actual, expected);
1655 }
1656
1657 #[rstest]
1658 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1662 let actual = decode_optional_quantity(value);
1663 assert_eq!(actual, expected);
1664 }
1665
1666 #[rstest]
1667 #[case(0, UnixNanos::from(0))]
1668 #[case(1_000_000_000, UnixNanos::from(1_000_000_000))]
1669 fn test_decode_timestamp(#[case] value: u64, #[case] expected: UnixNanos) {
1670 let actual = decode_timestamp(value, "test_field").unwrap();
1671 assert_eq!(actual, expected);
1672 }
1673
1674 #[rstest]
1675 fn test_decode_timestamp_undefined_errors() {
1676 let result = decode_timestamp(dbn::UNDEF_TIMESTAMP, "expiration");
1677 assert!(result.is_err());
1678 assert!(result.unwrap_err().to_string().contains("expiration"));
1679 }
1680
1681 #[rstest]
1682 #[case(0, Some(UnixNanos::from(0)))]
1683 #[case(1_000_000_000, Some(UnixNanos::from(1_000_000_000)))]
1684 #[case(dbn::UNDEF_TIMESTAMP, None)]
1685 fn test_decode_optional_timestamp(#[case] value: u64, #[case] expected: Option<UnixNanos>) {
1686 let actual = decode_optional_timestamp(value);
1687 assert_eq!(actual, expected);
1688 }
1689
1690 #[rstest]
1691 #[case(0, Quantity::from(1))] #[case(i64::MAX, Quantity::from(1))] #[case(50_000_000_000, Quantity::from("50"))] #[case(12_500_000_000, Quantity::from("12.5"))] #[case(1_000_000_000, Quantity::from("1"))] #[case(1, Quantity::from("0.000000001"))] #[case(1_000_000_001, Quantity::from("1.000000001"))] #[case(999_999_999, Quantity::from("0.999999999"))] #[case(123_456_789_000, Quantity::from("123.456789"))] fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1701 assert_eq!(decode_multiplier(raw).unwrap(), expected);
1702 }
1703
1704 #[rstest]
1705 #[case(-1_500_000_000)] #[case(-1)] #[case(-999_999_999)] fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1709 let result = decode_multiplier(raw);
1710 assert!(result.is_err());
1711 assert!(
1712 result
1713 .unwrap_err()
1714 .to_string()
1715 .contains("Invalid negative multiplier")
1716 );
1717 }
1718
1719 #[rstest]
1720 #[case(100, Quantity::from(100))]
1721 #[case(1000, Quantity::from(1000))]
1722 #[case(5, Quantity::from(5))]
1723 fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1724 assert_eq!(decode_quantity(value), expected);
1725 }
1726
1727 #[rstest]
1728 #[case(0, Quantity::from(1))] #[case(i32::MAX, Quantity::from(1))] #[case(100, Quantity::from(100))]
1731 #[case(1, Quantity::from(1))]
1732 #[case(1000, Quantity::from(1000))]
1733 fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1734 assert_eq!(decode_lot_size(value), expected);
1735 }
1736
1737 #[rstest]
1738 #[case(0, None)] #[case(1, Some(Ustr::from("Scheduled")))]
1740 #[case(2, Some(Ustr::from("Surveillance intervention")))]
1741 #[case(3, Some(Ustr::from("Market event")))]
1742 #[case(10, Some(Ustr::from("Regulatory")))]
1743 #[case(30, Some(Ustr::from("News pending")))]
1744 #[case(40, Some(Ustr::from("Order imbalance")))]
1745 #[case(50, Some(Ustr::from("LULD pause")))]
1746 #[case(60, Some(Ustr::from("Operational")))]
1747 #[case(100, Some(Ustr::from("Corporate action")))]
1748 #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1749 fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1750 assert_eq!(parse_status_reason(value).unwrap(), expected);
1751 }
1752
1753 #[rstest]
1754 #[case(999)] fn test_parse_status_reason_invalid(#[case] value: u16) {
1756 assert!(parse_status_reason(value).is_err());
1757 }
1758
1759 #[rstest]
1760 #[case(0, None)] #[case(1, Some(Ustr::from("No cancel")))]
1762 #[case(2, Some(Ustr::from("Change trading session")))]
1763 #[case(3, Some(Ustr::from("Implied matching on")))]
1764 #[case(4, Some(Ustr::from("Implied matching off")))]
1765 fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1766 assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1767 }
1768
1769 #[rstest]
1770 #[case(5)] #[case(100)] fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1773 assert!(parse_status_trading_event(value).is_err());
1774 }
1775
1776 #[rstest]
1777 fn test_decode_mbo_msg() {
1778 let path = test_data_path().join("test_data.mbo.dbn.zst");
1779 let mut dbn_stream = Decoder::from_zstd_file(path)
1780 .unwrap()
1781 .decode_stream::<dbn::MboMsg>();
1782 let msg = dbn_stream.next().unwrap().unwrap();
1783
1784 let instrument_id = InstrumentId::from("ESM4.GLBX");
1785 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1786 let delta = delta.unwrap();
1787
1788 assert_eq!(delta.instrument_id, instrument_id);
1789 assert_eq!(delta.action, BookAction::Delete);
1790 assert_eq!(delta.order.side, OrderSide::Sell);
1791 assert_eq!(delta.order.price, Price::from("3722.75"));
1792 assert_eq!(delta.order.size, Quantity::from("1"));
1793 assert_eq!(delta.order.order_id, 647_784_973_705);
1794 assert_eq!(delta.flags, 128);
1795 assert_eq!(delta.sequence, 1_170_352);
1796 assert_eq!(delta.ts_event, msg.ts_recv);
1797 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1798 assert_eq!(delta.ts_init, 0);
1799 }
1800
1801 #[rstest]
1802 fn test_decode_mbo_msg_clear_action() {
1803 let ts_recv = 1_609_160_400_000_000_000;
1805 let msg = dbn::MboMsg {
1806 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1807 order_id: 0,
1808 price: i64::MAX,
1809 size: 0,
1810 flags: dbn::FlagSet::empty(),
1811 channel_id: 0,
1812 action: 'R' as c_char,
1813 side: 'N' as c_char, ts_recv,
1815 ts_in_delta: 0,
1816 sequence: 1_000_000,
1817 };
1818
1819 let instrument_id = InstrumentId::from("ESM4.GLBX");
1820 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1821
1822 assert!(trade.is_none());
1824 let delta = delta.expect("Clear action should produce OrderBookDelta");
1825
1826 assert_eq!(delta.instrument_id, instrument_id);
1827 assert_eq!(delta.action, BookAction::Clear);
1828 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1829 assert_eq!(delta.order.size, Quantity::from("0"));
1830 assert_eq!(delta.order.order_id, 0);
1831 assert_eq!(delta.sequence, 1_000_000);
1832 assert_eq!(delta.ts_event, ts_recv);
1833 assert_eq!(delta.ts_init, 0);
1834 assert!(delta.order.price.is_undefined());
1835 assert_eq!(delta.order.price.precision, 0);
1836 }
1837
1838 #[rstest]
1839 fn test_decode_mbo_msg_price_undef_with_precision() {
1840 let ts_recv = 1_609_160_400_000_000_000;
1842 let msg = dbn::MboMsg {
1843 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1844 order_id: 0,
1845 price: i64::MAX, size: 0,
1847 flags: dbn::FlagSet::empty(),
1848 channel_id: 0,
1849 action: 'R' as c_char, side: 'N' as c_char, ts_recv,
1852 ts_in_delta: 0,
1853 sequence: 0,
1854 };
1855
1856 let instrument_id = InstrumentId::from("ESM4.GLBX");
1857 let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1858 let delta = delta.unwrap();
1859
1860 assert!(delta.order.price.is_undefined());
1861 assert_eq!(delta.order.price.precision, 0);
1862 assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1863 }
1864
1865 #[rstest]
1866 fn test_decode_mbo_msg_no_order_side_update() {
1867 let ts_recv = 1_609_160_400_000_000_000;
1870 let msg = dbn::MboMsg {
1871 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1872 order_id: 123_456_789,
1873 price: 4_800_250_000_000, size: 1,
1875 flags: dbn::FlagSet::empty(),
1876 channel_id: 1,
1877 action: 'M' as c_char, side: 'N' as c_char, ts_recv,
1880 ts_in_delta: 0,
1881 sequence: 1_000_000,
1882 };
1883
1884 let instrument_id = InstrumentId::from("ESM4.GLBX");
1885 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1886
1887 assert!(delta.is_some());
1889 assert!(trade.is_none());
1890 let delta = delta.unwrap();
1891 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1892 assert_eq!(delta.order.order_id, 123_456_789);
1893 assert_eq!(delta.action, BookAction::Update);
1894 }
1895
1896 #[rstest]
1897 fn test_decode_mbp1_msg() {
1898 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1899 let mut dbn_stream = Decoder::from_zstd_file(path)
1900 .unwrap()
1901 .decode_stream::<dbn::Mbp1Msg>();
1902 let msg = dbn_stream.next().unwrap().unwrap();
1903
1904 let instrument_id = InstrumentId::from("ESM4.GLBX");
1905 let (maybe_quote, _) =
1906 decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1907 let quote = maybe_quote.expect("Expected valid quote");
1908
1909 assert_eq!(quote.instrument_id, instrument_id);
1910 assert_eq!(quote.bid_price, Price::from("3720.25"));
1911 assert_eq!(quote.ask_price, Price::from("3720.50"));
1912 assert_eq!(quote.bid_size, Quantity::from("24"));
1913 assert_eq!(quote.ask_size, Quantity::from("11"));
1914 assert_eq!(quote.ts_event, msg.ts_recv);
1915 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1916 assert_eq!(quote.ts_init, 0);
1917 }
1918
1919 #[rstest]
1920 fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
1921 let ts_recv = 1_609_160_400_000_000_000;
1922 let msg = dbn::Mbp1Msg {
1923 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1924 price: 3_720_250_000_000, size: 5,
1926 action: 'A' as c_char,
1927 side: 'B' as c_char,
1928 flags: dbn::FlagSet::empty(),
1929 depth: 0,
1930 ts_recv,
1931 ts_in_delta: 0,
1932 sequence: 1_170_352,
1933 levels: [dbn::BidAskPair {
1934 bid_px: 3_720_250_000_000, ask_px: i64::MAX, bid_sz: 24,
1937 ask_sz: 0,
1938 bid_ct: 1,
1939 ask_ct: 0,
1940 }],
1941 };
1942
1943 let instrument_id = InstrumentId::from("ESM4.GLBX");
1944 let (maybe_quote, _) =
1945 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1946
1947 assert!(maybe_quote.is_none());
1949 }
1950
1951 #[rstest]
1952 fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
1953 let ts_recv = 1_609_160_400_000_000_000;
1954 let msg = dbn::Mbp1Msg {
1955 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1956 price: 3_720_500_000_000, size: 5,
1958 action: 'A' as c_char,
1959 side: 'A' as c_char,
1960 flags: dbn::FlagSet::empty(),
1961 depth: 0,
1962 ts_recv,
1963 ts_in_delta: 0,
1964 sequence: 1_170_352,
1965 levels: [dbn::BidAskPair {
1966 bid_px: i64::MAX, ask_px: 3_720_500_000_000, bid_sz: 0,
1969 ask_sz: 11,
1970 bid_ct: 0,
1971 ask_ct: 1,
1972 }],
1973 };
1974
1975 let instrument_id = InstrumentId::from("ESM4.GLBX");
1976 let (maybe_quote, _) =
1977 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1978
1979 assert!(maybe_quote.is_none());
1981 }
1982
1983 #[rstest]
1984 fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
1985 let ts_recv = 1_609_160_400_000_000_000;
1986 let msg = dbn::Mbp1Msg {
1987 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1988 price: 3_720_250_000_000, size: 5,
1990 action: 'T' as c_char, side: 'A' as c_char,
1992 flags: dbn::FlagSet::empty(),
1993 depth: 0,
1994 ts_recv,
1995 ts_in_delta: 0,
1996 sequence: 1_170_352,
1997 levels: [dbn::BidAskPair {
1998 bid_px: i64::MAX, ask_px: i64::MAX, bid_sz: 0,
2001 ask_sz: 0,
2002 bid_ct: 0,
2003 ask_ct: 0,
2004 }],
2005 };
2006
2007 let instrument_id = InstrumentId::from("ESM4.GLBX");
2008 let (maybe_quote, maybe_trade) =
2009 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
2010
2011 assert!(maybe_quote.is_none());
2013
2014 let trade = maybe_trade.expect("Expected trade");
2016 assert_eq!(trade.instrument_id, instrument_id);
2017 assert_eq!(trade.price, Price::from("3720.25"));
2018 assert_eq!(trade.size, Quantity::from("5"));
2019 }
2020
2021 #[rstest]
2022 fn test_decode_bbo_1s_msg() {
2023 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
2024 let mut dbn_stream = Decoder::from_zstd_file(path)
2025 .unwrap()
2026 .decode_stream::<dbn::BboMsg>();
2027 let msg = dbn_stream.next().unwrap().unwrap();
2028
2029 let instrument_id = InstrumentId::from("ESM4.GLBX");
2030 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2031 let quote = maybe_quote.expect("Expected valid quote");
2032
2033 assert_eq!(quote.instrument_id, instrument_id);
2034 assert_eq!(quote.bid_price, Price::from("3702.25"));
2035 assert_eq!(quote.ask_price, Price::from("3702.75"));
2036 assert_eq!(quote.bid_size, Quantity::from("18"));
2037 assert_eq!(quote.ask_size, Quantity::from("13"));
2038 assert_eq!(quote.ts_event, msg.ts_recv);
2039 assert_eq!(quote.ts_event, 1609113600000000000);
2040 assert_eq!(quote.ts_init, 0);
2041 }
2042
2043 #[rstest]
2044 fn test_decode_bbo_1m_msg() {
2045 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
2046 let mut dbn_stream = Decoder::from_zstd_file(path)
2047 .unwrap()
2048 .decode_stream::<dbn::BboMsg>();
2049 let msg = dbn_stream.next().unwrap().unwrap();
2050
2051 let instrument_id = InstrumentId::from("ESM4.GLBX");
2052 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2053 let quote = maybe_quote.expect("Expected valid quote");
2054
2055 assert_eq!(quote.instrument_id, instrument_id);
2056 assert_eq!(quote.bid_price, Price::from("3702.25"));
2057 assert_eq!(quote.ask_price, Price::from("3702.75"));
2058 assert_eq!(quote.bid_size, Quantity::from("18"));
2059 assert_eq!(quote.ask_size, Quantity::from("13"));
2060 assert_eq!(quote.ts_event, msg.ts_recv);
2061 assert_eq!(quote.ts_event, 1609113600000000000);
2062 assert_eq!(quote.ts_init, 0);
2063 }
2064
2065 #[rstest]
2066 fn test_decode_mbp10_msg() {
2067 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
2068 let mut dbn_stream = Decoder::from_zstd_file(path)
2069 .unwrap()
2070 .decode_stream::<dbn::Mbp10Msg>();
2071 let msg = dbn_stream.next().unwrap().unwrap();
2072
2073 let instrument_id = InstrumentId::from("ESM4.GLBX");
2074 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2075
2076 assert_eq!(depth10.instrument_id, instrument_id);
2077 assert_eq!(depth10.bids.len(), 10);
2078 assert_eq!(depth10.asks.len(), 10);
2079 assert_eq!(depth10.bid_counts.len(), 10);
2080 assert_eq!(depth10.ask_counts.len(), 10);
2081 assert_eq!(depth10.flags, 128);
2082 assert_eq!(depth10.sequence, 1_170_352);
2083 assert_eq!(depth10.ts_event, msg.ts_recv);
2084 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
2085 assert_eq!(depth10.ts_init, 0);
2086 }
2087
2088 #[rstest]
2089 fn test_decode_trade_msg() {
2090 let path = test_data_path().join("test_data.trades.dbn.zst");
2091 let mut dbn_stream = Decoder::from_zstd_file(path)
2092 .unwrap()
2093 .decode_stream::<dbn::TradeMsg>();
2094 let msg = dbn_stream.next().unwrap().unwrap();
2095
2096 let instrument_id = InstrumentId::from("ESM4.GLBX");
2097 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2098
2099 assert_eq!(trade.instrument_id, instrument_id);
2100 assert_eq!(trade.price, Price::from("3720.25"));
2101 assert_eq!(trade.size, Quantity::from("5"));
2102 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2103 assert_eq!(trade.trade_id.to_string(), "1170380");
2104 assert_eq!(trade.ts_event, msg.ts_recv);
2105 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2106 assert_eq!(trade.ts_init, 0);
2107 }
2108
2109 #[rstest]
2110 fn test_decode_tbbo_msg() {
2111 let path = test_data_path().join("test_data.tbbo.dbn.zst");
2112 let mut dbn_stream = Decoder::from_zstd_file(path)
2113 .unwrap()
2114 .decode_stream::<dbn::Mbp1Msg>();
2115 let msg = dbn_stream.next().unwrap().unwrap();
2116
2117 let instrument_id = InstrumentId::from("ESM4.GLBX");
2118 let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2119 let quote = maybe_quote.expect("Expected valid quote");
2120
2121 assert_eq!(quote.instrument_id, instrument_id);
2122 assert_eq!(quote.bid_price, Price::from("3720.25"));
2123 assert_eq!(quote.ask_price, Price::from("3720.50"));
2124 assert_eq!(quote.bid_size, Quantity::from("26"));
2125 assert_eq!(quote.ask_size, Quantity::from("7"));
2126 assert_eq!(quote.ts_event, msg.ts_recv);
2127 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2128 assert_eq!(quote.ts_init, 0);
2129
2130 assert_eq!(trade.instrument_id, instrument_id);
2131 assert_eq!(trade.price, Price::from("3720.25"));
2132 assert_eq!(trade.size, Quantity::from("5"));
2133 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2134 assert_eq!(trade.trade_id.to_string(), "1170380");
2135 assert_eq!(trade.ts_event, msg.ts_recv);
2136 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2137 assert_eq!(trade.ts_init, 0);
2138 }
2139
2140 #[rstest]
2141 fn test_decode_ohlcv_msg() {
2142 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2143 let mut dbn_stream = Decoder::from_zstd_file(path)
2144 .unwrap()
2145 .decode_stream::<dbn::OhlcvMsg>();
2146 let msg = dbn_stream.next().unwrap().unwrap();
2147
2148 let instrument_id = InstrumentId::from("ESM4.GLBX");
2149 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2150
2151 assert_eq!(
2152 bar.bar_type,
2153 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2154 );
2155 assert_eq!(bar.open, Price::from("372025.00"));
2156 assert_eq!(bar.high, Price::from("372050.00"));
2157 assert_eq!(bar.low, Price::from("372025.00"));
2158 assert_eq!(bar.close, Price::from("372050.00"));
2159 assert_eq!(bar.volume, Quantity::from("57"));
2160 assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); assert_eq!(bar.ts_init, 0); }
2163
2164 #[rstest]
2165 fn test_decode_definition_msg() {
2166 let path = test_data_path().join("test_data.definition.dbn.zst");
2167 let mut dbn_stream = Decoder::from_zstd_file(path)
2168 .unwrap()
2169 .decode_stream::<dbn::InstrumentDefMsg>();
2170 let msg = dbn_stream.next().unwrap().unwrap();
2171
2172 let instrument_id = InstrumentId::from("ESM4.GLBX");
2173 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2174
2175 assert!(result.is_ok());
2176 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2177 }
2178
2179 #[rstest]
2180 fn test_decode_status_msg() {
2181 let path = test_data_path().join("test_data.status.dbn.zst");
2182 let mut dbn_stream = Decoder::from_zstd_file(path)
2183 .unwrap()
2184 .decode_stream::<dbn::StatusMsg>();
2185 let msg = dbn_stream.next().unwrap().unwrap();
2186
2187 let instrument_id = InstrumentId::from("ESM4.GLBX");
2188 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2189
2190 assert_eq!(status.instrument_id, instrument_id);
2191 assert_eq!(status.action, MarketStatusAction::Trading);
2192 assert_eq!(status.ts_event, msg.hd.ts_event);
2193 assert_eq!(status.ts_init, 0);
2194 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2195 assert_eq!(status.trading_event, None);
2196 assert_eq!(status.is_trading, Some(true));
2197 assert_eq!(status.is_quoting, Some(true));
2198 assert_eq!(status.is_short_sell_restricted, None);
2199 }
2200
2201 #[rstest]
2202 fn test_decode_imbalance_msg() {
2203 let path = test_data_path().join("test_data.imbalance.dbn.zst");
2204 let mut dbn_stream = Decoder::from_zstd_file(path)
2205 .unwrap()
2206 .decode_stream::<dbn::ImbalanceMsg>();
2207 let msg = dbn_stream.next().unwrap().unwrap();
2208
2209 let instrument_id = InstrumentId::from("ESM4.GLBX");
2210 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2211
2212 assert_eq!(imbalance.instrument_id, instrument_id);
2213 assert_eq!(imbalance.ref_price, Price::from("229.43"));
2214 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2215 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2216 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2217 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2218 assert_eq!(imbalance.side, OrderSide::Buy);
2219 assert_eq!(imbalance.significant_imbalance, 126);
2220 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2221 assert_eq!(imbalance.ts_recv, msg.ts_recv);
2222 assert_eq!(imbalance.ts_init, 0);
2223 }
2224
2225 #[rstest]
2226 fn test_decode_statistics_msg() {
2227 let path = test_data_path().join("test_data.statistics.dbn.zst");
2228 let mut dbn_stream = Decoder::from_zstd_file(path)
2229 .unwrap()
2230 .decode_stream::<dbn::StatMsg>();
2231 let msg = dbn_stream.next().unwrap().unwrap();
2232
2233 let instrument_id = InstrumentId::from("ESM4.GLBX");
2234 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2235
2236 assert_eq!(statistics.instrument_id, instrument_id);
2237 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2238 assert_eq!(
2239 statistics.update_action,
2240 DatabentoStatisticUpdateAction::Added
2241 );
2242 assert_eq!(statistics.price, Some(Price::from("100.00")));
2243 assert_eq!(statistics.quantity, None);
2244 assert_eq!(statistics.channel_id, 13);
2245 assert_eq!(statistics.stat_flags, 255);
2246 assert_eq!(statistics.sequence, 2);
2247 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2248 assert_eq!(statistics.ts_in_delta, 26961);
2249 assert_eq!(statistics.ts_event, msg.hd.ts_event);
2250 assert_eq!(statistics.ts_recv, msg.ts_recv);
2251 assert_eq!(statistics.ts_init, 0);
2252 }
2253
2254 #[rstest]
2255 fn test_decode_cmbp1_msg() {
2256 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2257 let mut dbn_stream = Decoder::from_zstd_file(path)
2258 .unwrap()
2259 .decode_stream::<dbn::Cmbp1Msg>();
2260 let msg = dbn_stream.next().unwrap().unwrap();
2261
2262 let instrument_id = InstrumentId::from("ESM4.GLBX");
2263 let (maybe_quote, trade) =
2264 decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2265 let quote = maybe_quote.expect("Expected valid quote");
2266
2267 assert_eq!(quote.instrument_id, instrument_id);
2268 assert!(quote.bid_price.raw > 0);
2269 assert!(quote.ask_price.raw > 0);
2270 assert!(quote.bid_size.raw > 0);
2271 assert!(quote.ask_size.raw > 0);
2272 assert_eq!(quote.ts_event, msg.ts_recv);
2273 assert_eq!(quote.ts_init, 0);
2274
2275 if is_trade_msg(msg.action) {
2277 assert!(trade.is_some());
2278 let trade = trade.unwrap();
2279 assert_eq!(trade.instrument_id, instrument_id);
2280 } else {
2281 assert!(trade.is_none());
2282 }
2283 }
2284
2285 #[rstest]
2286 fn test_decode_cbbo_1s_msg() {
2287 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2288 let mut dbn_stream = Decoder::from_zstd_file(path)
2289 .unwrap()
2290 .decode_stream::<dbn::CbboMsg>();
2291 let msg = dbn_stream.next().unwrap().unwrap();
2292
2293 let instrument_id = InstrumentId::from("ESM4.GLBX");
2294 let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2295 let quote = maybe_quote.expect("Expected valid quote");
2296
2297 assert_eq!(quote.instrument_id, instrument_id);
2298 assert!(quote.bid_price.raw > 0);
2299 assert!(quote.ask_price.raw > 0);
2300 assert!(quote.bid_size.raw > 0);
2301 assert!(quote.ask_size.raw > 0);
2302 assert_eq!(quote.ts_event, msg.ts_recv);
2303 assert_eq!(quote.ts_init, 0);
2304 }
2305
2306 #[rstest]
2307 fn test_decode_mbp10_msg_with_all_levels() {
2308 let mut msg = dbn::Mbp10Msg::default();
2309 for i in 0..10 {
2310 msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2311 msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2312 msg.levels[i].bid_sz = 10 + i as u32;
2313 msg.levels[i].ask_sz = 10 + i as u32;
2314 msg.levels[i].bid_ct = 1 + i as u32;
2315 msg.levels[i].ask_ct = 1 + i as u32;
2316 }
2317 msg.ts_recv = 1_609_160_400_000_704_060;
2318
2319 let instrument_id = InstrumentId::from("TEST.VENUE");
2320 let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2321
2322 assert!(result.is_ok());
2323 let depth = result.unwrap();
2324 assert_eq!(depth.bids.len(), 10);
2325 assert_eq!(depth.asks.len(), 10);
2326 assert_eq!(depth.bid_counts.len(), 10);
2327 assert_eq!(depth.ask_counts.len(), 10);
2328 }
2329
2330 #[rstest]
2331 fn test_array_conversion_error_handling() {
2332 let mut bids = Vec::new();
2333 let mut asks = Vec::new();
2334
2335 for i in 0..5 {
2337 bids.push(BookOrder::new(
2338 OrderSide::Buy,
2339 Price::from(format!("{}.00", 100 - i)),
2340 Quantity::from(10),
2341 i as u64,
2342 ));
2343 asks.push(BookOrder::new(
2344 OrderSide::Sell,
2345 Price::from(format!("{}.00", 101 + i)),
2346 Quantity::from(10),
2347 i as u64,
2348 ));
2349 }
2350
2351 let result: Result<[BookOrder; DEPTH10_LEN], _> =
2352 bids.try_into().map_err(|v: Vec<BookOrder>| {
2353 anyhow::anyhow!(
2354 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2355 v.len()
2356 )
2357 });
2358 assert!(result.is_err());
2359 assert!(
2360 result
2361 .unwrap_err()
2362 .to_string()
2363 .contains("Expected exactly 10 bid levels, received 5")
2364 );
2365 }
2366
2367 #[rstest]
2368 fn test_decode_tcbbo_msg() {
2369 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2371 let mut dbn_stream = Decoder::from_zstd_file(path)
2372 .unwrap()
2373 .decode_stream::<dbn::CbboMsg>();
2374 let msg = dbn_stream.next().unwrap().unwrap();
2375
2376 let mut tcbbo_msg = msg.clone();
2378 tcbbo_msg.price = 3702500000000;
2379 tcbbo_msg.size = 10;
2380
2381 let instrument_id = InstrumentId::from("ESM4.GLBX");
2382 let (maybe_quote, trade) =
2383 decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2384 let quote = maybe_quote.expect("Expected valid quote");
2385
2386 assert_eq!(quote.instrument_id, instrument_id);
2387 assert!(quote.bid_price.raw > 0);
2388 assert!(quote.ask_price.raw > 0);
2389 assert!(quote.bid_size.raw > 0);
2390 assert!(quote.ask_size.raw > 0);
2391 assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2392 assert_eq!(quote.ts_init, 0);
2393
2394 assert_eq!(trade.instrument_id, instrument_id);
2395 assert_eq!(trade.price, Price::from("3702.50"));
2396 assert_eq!(trade.size, Quantity::from(10));
2397 assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2398 assert_eq!(trade.ts_init, 0);
2399 }
2400
2401 #[rstest]
2402 fn test_decode_bar_type() {
2403 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2404 let instrument_id = InstrumentId::from("ESM4.GLBX");
2405
2406 msg.hd.rtype = 32;
2408 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2409 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2410
2411 msg.hd.rtype = 33;
2413 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2414 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2415
2416 msg.hd.rtype = 34;
2418 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2419 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2420
2421 msg.hd.rtype = 35;
2423 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2424 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2425
2426 msg.hd.rtype = 99;
2428 let result = decode_bar_type(&msg, instrument_id);
2429 assert!(result.is_err());
2430 }
2431
2432 #[rstest]
2433 fn test_decode_ts_event_adjustment() {
2434 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2435
2436 msg.hd.rtype = 32;
2438 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2439 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2440
2441 msg.hd.rtype = 33;
2443 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2444 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2445
2446 msg.hd.rtype = 34;
2448 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2449 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2450
2451 msg.hd.rtype = 35;
2453 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2454 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2455
2456 msg.hd.rtype = 36;
2458 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2459 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2460
2461 msg.hd.rtype = 99;
2463 let result = decode_ts_event_adjustment(&msg);
2464 assert!(result.is_err());
2465 }
2466
2467 #[rstest]
2468 fn test_decode_record() {
2469 let path = test_data_path().join("test_data.mbo.dbn.zst");
2471 let decoder = Decoder::from_zstd_file(path).unwrap();
2472 let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2473 let msg = dbn_stream.next().unwrap().unwrap();
2474
2475 let record_ref = dbn::RecordRef::from(msg);
2476 let instrument_id = InstrumentId::from("ESM4.GLBX");
2477
2478 let (data1, data2) =
2479 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2480
2481 assert!(data1.is_some());
2482 assert!(data2.is_none());
2483
2484 let path = test_data_path().join("test_data.trades.dbn.zst");
2486 let decoder = Decoder::from_zstd_file(path).unwrap();
2487 let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2488 let msg = dbn_stream.next().unwrap().unwrap();
2489
2490 let record_ref = dbn::RecordRef::from(msg);
2491
2492 let (data1, data2) =
2493 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2494
2495 assert!(data1.is_some());
2496 assert!(data2.is_none());
2497 assert!(matches!(data1.unwrap(), Data::Trade(_)));
2498 }
2499}