1use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24 },
25 enums::{
26 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28 },
29 identifiers::{InstrumentId, TradeId},
30 instruments::{
31 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32 },
33 types::{
34 Currency, Price, Quantity,
35 price::{PRICE_UNDEF, PriceRaw, decode_raw_price_i64},
36 },
37};
38use ustr::Ustr;
39
40use super::{
41 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
42 types::{DatabentoImbalance, DatabentoStatistics},
43};
44
45const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
47
48const BAR_SPEC_1S: BarSpecification = BarSpecification {
49 step: STEP_ONE,
50 aggregation: BarAggregation::Second,
51 price_type: PriceType::Last,
52};
53const BAR_SPEC_1M: BarSpecification = BarSpecification {
54 step: STEP_ONE,
55 aggregation: BarAggregation::Minute,
56 price_type: PriceType::Last,
57};
58const BAR_SPEC_1H: BarSpecification = BarSpecification {
59 step: STEP_ONE,
60 aggregation: BarAggregation::Hour,
61 price_type: PriceType::Last,
62};
63const BAR_SPEC_1D: BarSpecification = BarSpecification {
64 step: STEP_ONE,
65 aggregation: BarAggregation::Day,
66 price_type: PriceType::Last,
67};
68
69const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
70const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
71const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
72const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
73
74#[must_use]
75pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
76 match c as u8 as char {
77 'Y' => Some(true),
78 'N' => Some(false),
79 _ => None,
80 }
81}
82
83#[must_use]
84pub const fn parse_order_side(c: c_char) -> OrderSide {
85 match c as u8 as char {
86 'A' => OrderSide::Sell,
87 'B' => OrderSide::Buy,
88 _ => OrderSide::NoOrderSide,
89 }
90}
91
92#[must_use]
93pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
94 match c as u8 as char {
95 'A' => AggressorSide::Seller,
96 'B' => AggressorSide::Buyer,
97 _ => AggressorSide::NoAggressor,
98 }
99}
100
101pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
107 match c as u8 as char {
108 'A' => Ok(BookAction::Add),
109 'C' => Ok(BookAction::Delete),
110 'F' => Ok(BookAction::Update),
111 'M' => Ok(BookAction::Update),
112 'R' => Ok(BookAction::Clear),
113 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
114 }
115}
116
117pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
123 match c as u8 as char {
124 'C' => Ok(OptionKind::Call),
125 'P' => Ok(OptionKind::Put),
126 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
127 }
128}
129
130fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
131 match value {
132 Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
133 tracing::warn!("Unknown currency code '{value}', defaulting to USD");
134 Currency::USD()
135 }),
136 Ok(_) => Currency::USD(),
137 Err(e) => {
138 tracing::error!("Error parsing currency: {e}");
139 Currency::USD()
140 }
141 }
142}
143
144pub fn parse_cfi_iso10926(
150 value: &str,
151) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
152 let chars: Vec<char> = value.chars().collect();
153 if chars.len() < 3 {
154 anyhow::bail!("Value string is too short");
155 }
156
157 let cfi_category = chars[0];
159 let cfi_group = chars[1];
160 let cfi_attribute1 = chars[2];
161 let mut asset_class = match cfi_category {
166 'D' => Some(AssetClass::Debt),
167 'E' => Some(AssetClass::Equity),
168 'S' => None,
169 _ => None,
170 };
171
172 let instrument_class = match cfi_group {
173 'I' => Some(InstrumentClass::Future),
174 _ => None,
175 };
176
177 if cfi_attribute1 == 'I' {
178 asset_class = Some(AssetClass::Index);
179 }
180
181 Ok((asset_class, instrument_class))
182}
183
184pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
192 let value_str = match value {
193 0 => return Ok(None),
194 1 => "Scheduled",
195 2 => "Surveillance intervention",
196 3 => "Market event",
197 4 => "Instrument activation",
198 5 => "Instrument expiration",
199 6 => "Recovery in process",
200 10 => "Regulatory",
201 11 => "Administrative",
202 12 => "Non-compliance",
203 13 => "Filings not current",
204 14 => "SEC trading suspension",
205 15 => "New issue",
206 16 => "Issue available",
207 17 => "Issues reviewed",
208 18 => "Filing requirements satisfied",
209 30 => "News pending",
210 31 => "News released",
211 32 => "News and resumption times",
212 33 => "News not forthcoming",
213 40 => "Order imbalance",
214 50 => "LULD pause",
215 60 => "Operational",
216 70 => "Additional information requested",
217 80 => "Merger effective",
218 90 => "ETF",
219 100 => "Corporate action",
220 110 => "New Security offering",
221 120 => "Market wide halt level 1",
222 121 => "Market wide halt level 2",
223 122 => "Market wide halt level 3",
224 123 => "Market wide halt carryover",
225 124 => "Market wide halt resumption",
226 130 => "Quotation not available",
227 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
228 };
229
230 Ok(Some(Ustr::from(value_str)))
231}
232
233pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
239 let value_str = match value {
240 0 => return Ok(None),
241 1 => "No cancel",
242 2 => "Change trading session",
243 3 => "Implied matching on",
244 4 => "Implied matching off",
245 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
246 };
247
248 Ok(Some(Ustr::from(value_str)))
249}
250
251#[inline(always)]
259#[must_use]
260fn decode_raw_price_with_precision(value: i64, precision: u8) -> (PriceRaw, u8) {
261 let raw = if value == i64::MAX {
262 PRICE_UNDEF
263 } else {
264 decode_raw_price_i64(value)
265 };
266
267 let precision = if raw == PRICE_UNDEF { 0 } else { precision };
269 (raw, precision)
270}
271
272#[inline(always)]
274#[must_use]
275pub fn decode_price(value: i64, precision: u8) -> Price {
276 let (raw, precision) = decode_raw_price_with_precision(value, precision);
277 Price::from_raw(raw, precision)
278}
279
280#[inline(always)]
282#[must_use]
283pub fn decode_price_increment(value: i64, precision: u8) -> Price {
284 match value {
285 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
286 _ => decode_price(value, precision),
287 }
288}
289
290#[inline(always)]
292#[must_use]
293pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
294 match value {
295 i64::MAX => None,
296 _ => Some(decode_price(value, precision)),
297 }
298}
299
300#[inline(always)]
302#[must_use]
303pub fn decode_quantity(value: u64) -> Quantity {
304 Quantity::from(value)
305}
306
307#[inline(always)]
309#[must_use]
310pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
311 match value {
312 i64::MAX => None,
313 _ => Some(Quantity::from(value)),
314 }
315}
316
317pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
324 match value {
325 0 | i64::MAX => Ok(Quantity::from(1)),
326 v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
327 v => {
328 let abs = v as u128;
331
332 const SCALE: u128 = 1_000_000_000;
333 let int_part = abs / SCALE;
334 let frac_part = abs % SCALE;
335
336 if frac_part == 0 {
339 Ok(Quantity::from(int_part as u64))
341 } else {
342 let mut frac_str = format!("{frac_part:09}");
343 while frac_str.ends_with('0') {
344 frac_str.pop();
345 }
346 let s = format!("{int_part}.{frac_str}");
347 Ok(Quantity::from(s))
348 }
349 }
350 }
351}
352
353#[inline(always)]
355#[must_use]
356pub fn decode_lot_size(value: i32) -> Quantity {
357 match value {
358 0 | i32::MAX => Quantity::from(1),
359 value => Quantity::from(value),
360 }
361}
362
363#[inline(always)]
364#[must_use]
365fn is_trade_msg(action: c_char) -> bool {
366 action as u8 as char == 'T'
367}
368
369#[inline(always)]
374#[must_use]
375fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
376 bid_px != i64::MAX && ask_px != i64::MAX
377}
378
379pub fn decode_mbo_msg(
388 msg: &dbn::MboMsg,
389 instrument_id: InstrumentId,
390 price_precision: u8,
391 ts_init: Option<UnixNanos>,
392 include_trades: bool,
393) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
394 let side = parse_order_side(msg.side);
395 if is_trade_msg(msg.action) {
396 if include_trades && msg.size > 0 {
397 let price = decode_price(msg.price, price_precision);
398 let size = decode_quantity(msg.size as u64);
399 let aggressor_side = parse_aggressor_side(msg.side);
400 let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
401 let ts_event = msg.ts_recv.into();
402 let ts_init = ts_init.unwrap_or(ts_event);
403
404 let trade = TradeTick::new(
405 instrument_id,
406 price,
407 size,
408 aggressor_side,
409 trade_id,
410 ts_event,
411 ts_init,
412 );
413 return Ok((None, Some(trade)));
414 }
415
416 return Ok((None, None));
417 }
418
419 let action = parse_book_action(msg.action)?;
420 let (raw_price, precision) = decode_raw_price_with_precision(msg.price, price_precision);
421 let price = Price::from_raw(raw_price, precision);
422 let size = decode_quantity(msg.size as u64);
423 let order = BookOrder::new(side, price, size, msg.order_id);
424
425 let ts_event = msg.ts_recv.into();
426 let ts_init = ts_init.unwrap_or(ts_event);
427
428 let delta = OrderBookDelta::new(
429 instrument_id,
430 action,
431 order,
432 msg.flags.raw(),
433 msg.sequence.into(),
434 ts_event,
435 ts_init,
436 );
437
438 Ok((Some(delta), None))
439}
440
441pub fn decode_trade_msg(
447 msg: &dbn::TradeMsg,
448 instrument_id: InstrumentId,
449 price_precision: u8,
450 ts_init: Option<UnixNanos>,
451) -> anyhow::Result<TradeTick> {
452 let ts_event = msg.ts_recv.into();
453 let ts_init = ts_init.unwrap_or(ts_event);
454
455 let trade = TradeTick::new(
456 instrument_id,
457 decode_price(msg.price, price_precision),
458 decode_quantity(msg.size as u64),
459 parse_aggressor_side(msg.side),
460 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
461 ts_event,
462 ts_init,
463 );
464
465 Ok(trade)
466}
467
468pub fn decode_tbbo_msg(
477 msg: &dbn::TbboMsg,
478 instrument_id: InstrumentId,
479 price_precision: u8,
480 ts_init: Option<UnixNanos>,
481) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
482 let top_level = &msg.levels[0];
483 let ts_event = msg.ts_recv.into();
484 let ts_init = ts_init.unwrap_or(ts_event);
485
486 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
487 Some(QuoteTick::new(
488 instrument_id,
489 decode_price(top_level.bid_px, price_precision),
490 decode_price(top_level.ask_px, price_precision),
491 decode_quantity(top_level.bid_sz as u64),
492 decode_quantity(top_level.ask_sz as u64),
493 ts_event,
494 ts_init,
495 ))
496 } else {
497 None
498 };
499
500 let trade = TradeTick::new(
501 instrument_id,
502 decode_price(msg.price, price_precision),
503 decode_quantity(msg.size as u64),
504 parse_aggressor_side(msg.side),
505 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
506 ts_event,
507 ts_init,
508 );
509
510 Ok((maybe_quote, trade))
511}
512
513pub fn decode_mbp1_msg(
521 msg: &dbn::Mbp1Msg,
522 instrument_id: InstrumentId,
523 price_precision: u8,
524 ts_init: Option<UnixNanos>,
525 include_trades: bool,
526) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
527 let top_level = &msg.levels[0];
528 let ts_event = msg.ts_recv.into();
529 let ts_init = ts_init.unwrap_or(ts_event);
530
531 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
532 Some(QuoteTick::new(
533 instrument_id,
534 decode_price(top_level.bid_px, price_precision),
535 decode_price(top_level.ask_px, price_precision),
536 decode_quantity(top_level.bid_sz as u64),
537 decode_quantity(top_level.ask_sz as u64),
538 ts_event,
539 ts_init,
540 ))
541 } else {
542 None
543 };
544
545 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
546 Some(TradeTick::new(
547 instrument_id,
548 decode_price(msg.price, price_precision),
549 decode_quantity(msg.size as u64),
550 parse_aggressor_side(msg.side),
551 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
552 ts_event,
553 ts_init,
554 ))
555 } else {
556 None
557 };
558
559 Ok((maybe_quote, maybe_trade))
560}
561
562pub fn decode_bbo_msg(
570 msg: &dbn::BboMsg,
571 instrument_id: InstrumentId,
572 price_precision: u8,
573 ts_init: Option<UnixNanos>,
574) -> anyhow::Result<Option<QuoteTick>> {
575 let top_level = &msg.levels[0];
576 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
577 return Ok(None);
578 }
579
580 let ts_event = msg.ts_recv.into();
581 let ts_init = ts_init.unwrap_or(ts_event);
582
583 let quote = QuoteTick::new(
584 instrument_id,
585 decode_price(top_level.bid_px, price_precision),
586 decode_price(top_level.ask_px, price_precision),
587 decode_quantity(top_level.bid_sz as u64),
588 decode_quantity(top_level.ask_sz as u64),
589 ts_event,
590 ts_init,
591 );
592
593 Ok(Some(quote))
594}
595
596pub fn decode_mbp10_msg(
602 msg: &dbn::Mbp10Msg,
603 instrument_id: InstrumentId,
604 price_precision: u8,
605 ts_init: Option<UnixNanos>,
606) -> anyhow::Result<OrderBookDepth10> {
607 let mut bids = Vec::with_capacity(DEPTH10_LEN);
608 let mut asks = Vec::with_capacity(DEPTH10_LEN);
609 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
610 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
611
612 for level in &msg.levels {
613 let bid_order = BookOrder::new(
614 OrderSide::Buy,
615 decode_price(level.bid_px, price_precision),
616 decode_quantity(level.bid_sz as u64),
617 0,
618 );
619
620 let ask_order = BookOrder::new(
621 OrderSide::Sell,
622 decode_price(level.ask_px, price_precision),
623 decode_quantity(level.ask_sz as u64),
624 0,
625 );
626
627 bids.push(bid_order);
628 asks.push(ask_order);
629 bid_counts.push(level.bid_ct);
630 ask_counts.push(level.ask_ct);
631 }
632
633 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
634 anyhow::anyhow!(
635 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
636 v.len()
637 )
638 })?;
639
640 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
641 anyhow::anyhow!(
642 "Expected exactly {DEPTH10_LEN} ask levels, received {}",
643 v.len()
644 )
645 })?;
646
647 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
648 anyhow::anyhow!(
649 "Expected exactly {DEPTH10_LEN} bid counts, received {}",
650 v.len()
651 )
652 })?;
653
654 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
655 anyhow::anyhow!(
656 "Expected exactly {DEPTH10_LEN} ask counts, received {}",
657 v.len()
658 )
659 })?;
660
661 let ts_event = msg.ts_recv.into();
662 let ts_init = ts_init.unwrap_or(ts_event);
663
664 let depth = OrderBookDepth10::new(
665 instrument_id,
666 bids,
667 asks,
668 bid_counts,
669 ask_counts,
670 msg.flags.raw(),
671 msg.sequence.into(),
672 ts_event,
673 ts_init,
674 );
675
676 Ok(depth)
677}
678
679pub fn decode_cmbp1_msg(
688 msg: &dbn::Cmbp1Msg,
689 instrument_id: InstrumentId,
690 price_precision: u8,
691 ts_init: Option<UnixNanos>,
692 include_trades: bool,
693) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
694 let top_level = &msg.levels[0];
695 let ts_event = msg.ts_recv.into();
696 let ts_init = ts_init.unwrap_or(ts_event);
697
698 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
699 Some(QuoteTick::new(
700 instrument_id,
701 decode_price(top_level.bid_px, price_precision),
702 decode_price(top_level.ask_px, price_precision),
703 decode_quantity(top_level.bid_sz as u64),
704 decode_quantity(top_level.ask_sz as u64),
705 ts_event,
706 ts_init,
707 ))
708 } else {
709 None
710 };
711
712 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
713 Some(TradeTick::new(
715 instrument_id,
716 decode_price(msg.price, price_precision),
717 decode_quantity(msg.size as u64),
718 parse_aggressor_side(msg.side),
719 TradeId::new(UUID4::new().as_str()),
720 ts_event,
721 ts_init,
722 ))
723 } else {
724 None
725 };
726
727 Ok((maybe_quote, maybe_trade))
728}
729
730pub fn decode_cbbo_msg(
738 msg: &dbn::CbboMsg,
739 instrument_id: InstrumentId,
740 price_precision: u8,
741 ts_init: Option<UnixNanos>,
742) -> anyhow::Result<Option<QuoteTick>> {
743 let top_level = &msg.levels[0];
744 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
745 return Ok(None);
746 }
747
748 let ts_event = msg.ts_recv.into();
749 let ts_init = ts_init.unwrap_or(ts_event);
750
751 let quote = QuoteTick::new(
752 instrument_id,
753 decode_price(top_level.bid_px, price_precision),
754 decode_price(top_level.ask_px, price_precision),
755 decode_quantity(top_level.bid_sz as u64),
756 decode_quantity(top_level.ask_sz as u64),
757 ts_event,
758 ts_init,
759 );
760
761 Ok(Some(quote))
762}
763
764pub fn decode_tcbbo_msg(
773 msg: &dbn::CbboMsg,
774 instrument_id: InstrumentId,
775 price_precision: u8,
776 ts_init: Option<UnixNanos>,
777) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
778 let top_level = &msg.levels[0];
779 let ts_event = msg.ts_recv.into();
780 let ts_init = ts_init.unwrap_or(ts_event);
781
782 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
783 Some(QuoteTick::new(
784 instrument_id,
785 decode_price(top_level.bid_px, price_precision),
786 decode_price(top_level.ask_px, price_precision),
787 decode_quantity(top_level.bid_sz as u64),
788 decode_quantity(top_level.ask_sz as u64),
789 ts_event,
790 ts_init,
791 ))
792 } else {
793 None
794 };
795
796 let trade = TradeTick::new(
798 instrument_id,
799 decode_price(msg.price, price_precision),
800 decode_quantity(msg.size as u64),
801 parse_aggressor_side(msg.side),
802 TradeId::new(UUID4::new().as_str()),
803 ts_event,
804 ts_init,
805 );
806
807 Ok((maybe_quote, trade))
808}
809
810pub fn decode_bar_type(
814 msg: &dbn::OhlcvMsg,
815 instrument_id: InstrumentId,
816) -> anyhow::Result<BarType> {
817 let bar_type = match msg.hd.rtype {
818 32 => {
819 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
821 }
822 33 => {
823 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
825 }
826 34 => {
827 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
829 }
830 35 => {
831 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
833 }
834 36 => {
835 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
837 }
838 _ => anyhow::bail!(
839 "`rtype` is not a supported bar aggregation, was {}",
840 msg.hd.rtype
841 ),
842 };
843
844 Ok(bar_type)
845}
846
847pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
851 let adjustment = match msg.hd.rtype {
852 32 => {
853 BAR_CLOSE_ADJUSTMENT_1S
855 }
856 33 => {
857 BAR_CLOSE_ADJUSTMENT_1M
859 }
860 34 => {
861 BAR_CLOSE_ADJUSTMENT_1H
863 }
864 35 | 36 => {
865 BAR_CLOSE_ADJUSTMENT_1D
867 }
868 _ => anyhow::bail!(
869 "`rtype` is not a supported bar aggregation, was {}",
870 msg.hd.rtype
871 ),
872 };
873
874 Ok(adjustment.into())
875}
876
877pub fn decode_ohlcv_msg(
881 msg: &dbn::OhlcvMsg,
882 instrument_id: InstrumentId,
883 price_precision: u8,
884 ts_init: Option<UnixNanos>,
885 timestamp_on_close: bool,
886) -> anyhow::Result<Bar> {
887 let bar_type = decode_bar_type(msg, instrument_id)?;
888 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
889
890 let ts_event_raw = msg.hd.ts_event.into();
891 let ts_close = ts_event_raw + ts_event_adjustment;
892 let ts_init = ts_init.unwrap_or(ts_close); let ts_event = if timestamp_on_close {
895 ts_close
896 } else {
897 ts_event_raw
898 };
899
900 let bar = Bar::new(
901 bar_type,
902 decode_price(msg.open, price_precision),
903 decode_price(msg.high, price_precision),
904 decode_price(msg.low, price_precision),
905 decode_price(msg.close, price_precision),
906 decode_quantity(msg.volume),
907 ts_event,
908 ts_init,
909 );
910
911 Ok(bar)
912}
913
914pub fn decode_status_msg(
920 msg: &dbn::StatusMsg,
921 instrument_id: InstrumentId,
922 ts_init: Option<UnixNanos>,
923) -> anyhow::Result<InstrumentStatus> {
924 let ts_event = msg.hd.ts_event.into();
925 let ts_init = ts_init.unwrap_or(ts_event);
926
927 let action = MarketStatusAction::from_u16(msg.action)
928 .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
929
930 let status = InstrumentStatus::new(
931 instrument_id,
932 action,
933 ts_event,
934 ts_init,
935 parse_status_reason(msg.reason)?,
936 parse_status_trading_event(msg.trading_event)?,
937 parse_optional_bool(msg.is_trading),
938 parse_optional_bool(msg.is_quoting),
939 parse_optional_bool(msg.is_short_sell_restricted),
940 );
941
942 Ok(status)
943}
944
945pub fn decode_record(
949 record: &dbn::RecordRef,
950 instrument_id: InstrumentId,
951 price_precision: u8,
952 ts_init: Option<UnixNanos>,
953 include_trades: bool,
954 bars_timestamp_on_close: bool,
955) -> anyhow::Result<(Option<Data>, Option<Data>)> {
956 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
960 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
961 let result = decode_mbo_msg(
962 msg,
963 instrument_id,
964 price_precision,
965 Some(ts_init),
966 include_trades,
967 )?;
968 match result {
969 (Some(delta), None) => (Some(Data::Delta(delta)), None),
970 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
971 (None, None) => (None, None),
972 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
973 }
974 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
975 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
976 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
977 (Some(Data::Trade(trade)), None)
978 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
979 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
980 let (maybe_quote, maybe_trade) = decode_mbp1_msg(
981 msg,
982 instrument_id,
983 price_precision,
984 Some(ts_init),
985 include_trades,
986 )?;
987 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
988 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
989 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
990 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
991 (maybe_quote.map(Data::Quote), None)
992 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
993 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
994 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
995 (maybe_quote.map(Data::Quote), None)
996 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
997 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
998 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
999 (Some(Data::from(depth)), None)
1000 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1001 let bar = decode_ohlcv_msg(
1004 msg,
1005 instrument_id,
1006 price_precision,
1007 ts_init,
1008 bars_timestamp_on_close,
1009 )?;
1010 (Some(Data::Bar(bar)), None)
1011 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1012 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1013 let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1014 msg,
1015 instrument_id,
1016 price_precision,
1017 Some(ts_init),
1018 include_trades,
1019 )?;
1020 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1021 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1022 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1024 let (maybe_quote, trade) =
1025 decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1026 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1027 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1028 if msg.price != i64::MAX && msg.size > 0 {
1030 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1032 let (maybe_quote, trade) =
1033 decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1034 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1035 } else {
1036 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1038 let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1039 (maybe_quote.map(Data::Quote), None)
1040 }
1041 } else {
1042 anyhow::bail!("DBN message type is not currently supported")
1043 };
1044
1045 Ok(result)
1046}
1047
1048const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1049 match ts_init {
1050 Some(ts_init) => ts_init,
1051 None => msg_timestamp,
1052 }
1053}
1054
1055pub fn decode_instrument_def_msg(
1059 msg: &dbn::InstrumentDefMsg,
1060 instrument_id: InstrumentId,
1061 ts_init: Option<UnixNanos>,
1062) -> anyhow::Result<InstrumentAny> {
1063 match msg.instrument_class as u8 as char {
1064 'K' => Ok(InstrumentAny::Equity(decode_equity(
1065 msg,
1066 instrument_id,
1067 ts_init,
1068 )?)),
1069 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1070 msg,
1071 instrument_id,
1072 ts_init,
1073 )?)),
1074 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1075 msg,
1076 instrument_id,
1077 ts_init,
1078 )?)),
1079 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1080 msg,
1081 instrument_id,
1082 ts_init,
1083 )?)),
1084 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1085 msg,
1086 instrument_id,
1087 ts_init,
1088 )?)),
1089 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1090 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1091 _ => anyhow::bail!(
1092 "Unsupported `instrument_class` '{}'",
1093 msg.instrument_class as u8 as char
1094 ),
1095 }
1096}
1097
1098pub fn decode_equity(
1104 msg: &dbn::InstrumentDefMsg,
1105 instrument_id: InstrumentId,
1106 ts_init: Option<UnixNanos>,
1107) -> anyhow::Result<Equity> {
1108 let currency = parse_currency_or_usd_default(msg.currency());
1109 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1110 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1111 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1113
1114 Ok(Equity::new(
1115 instrument_id,
1116 instrument_id.symbol,
1117 None, currency,
1119 price_increment.precision,
1120 price_increment,
1121 Some(lot_size),
1122 None, None, None, None, None, None, None, None, ts_event,
1131 ts_init,
1132 ))
1133}
1134
1135pub fn decode_futures_contract(
1141 msg: &dbn::InstrumentDefMsg,
1142 instrument_id: InstrumentId,
1143 ts_init: Option<UnixNanos>,
1144) -> anyhow::Result<FuturesContract> {
1145 let currency = parse_currency_or_usd_default(msg.currency());
1146 let exchange = Ustr::from(msg.exchange()?);
1147 let underlying = Ustr::from(msg.asset()?);
1148 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1149 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1150 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1151 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1152 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1154
1155 FuturesContract::new_checked(
1156 instrument_id,
1157 instrument_id.symbol,
1158 asset_class.unwrap_or(AssetClass::Commodity),
1159 Some(exchange),
1160 underlying,
1161 msg.activation.into(),
1162 msg.expiration.into(),
1163 currency,
1164 price_increment.precision,
1165 price_increment,
1166 multiplier,
1167 lot_size,
1168 None, None, None, None, None, None, None, None, ts_event,
1177 ts_init,
1178 )
1179}
1180
1181pub fn decode_futures_spread(
1187 msg: &dbn::InstrumentDefMsg,
1188 instrument_id: InstrumentId,
1189 ts_init: Option<UnixNanos>,
1190) -> anyhow::Result<FuturesSpread> {
1191 let exchange = Ustr::from(msg.exchange()?);
1192 let underlying = Ustr::from(msg.asset()?);
1193 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1194 let strategy_type = Ustr::from(msg.secsubtype()?);
1195 let currency = parse_currency_or_usd_default(msg.currency());
1196 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1197 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1198 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1199 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1201
1202 FuturesSpread::new_checked(
1203 instrument_id,
1204 instrument_id.symbol,
1205 asset_class.unwrap_or(AssetClass::Commodity),
1206 Some(exchange),
1207 underlying,
1208 strategy_type,
1209 msg.activation.into(),
1210 msg.expiration.into(),
1211 currency,
1212 price_increment.precision,
1213 price_increment,
1214 multiplier,
1215 lot_size,
1216 None, None, None, None, None, None, None, None, ts_event,
1225 ts_init,
1226 )
1227}
1228
1229pub fn decode_option_contract(
1235 msg: &dbn::InstrumentDefMsg,
1236 instrument_id: InstrumentId,
1237 ts_init: Option<UnixNanos>,
1238) -> anyhow::Result<OptionContract> {
1239 let currency = parse_currency_or_usd_default(msg.currency());
1240 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1241 let exchange = Ustr::from(msg.exchange()?);
1242 let underlying = Ustr::from(msg.underlying()?);
1243 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1244 Some(AssetClass::Equity)
1245 } else {
1246 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1247 asset_class
1248 };
1249 let option_kind = parse_option_kind(msg.instrument_class)?;
1250 let strike_price = Price::from_raw(
1251 decode_raw_price_i64(msg.strike_price),
1252 strike_price_currency.precision,
1253 );
1254 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1255 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1256 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1257 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1259
1260 OptionContract::new_checked(
1261 instrument_id,
1262 instrument_id.symbol,
1263 asset_class_opt.unwrap_or(AssetClass::Commodity),
1264 Some(exchange),
1265 underlying,
1266 option_kind,
1267 strike_price,
1268 currency,
1269 msg.activation.into(),
1270 msg.expiration.into(),
1271 price_increment.precision,
1272 price_increment,
1273 multiplier,
1274 lot_size,
1275 None, None, None, None, None, None, None, None, ts_event,
1284 ts_init,
1285 )
1286}
1287
1288pub fn decode_option_spread(
1294 msg: &dbn::InstrumentDefMsg,
1295 instrument_id: InstrumentId,
1296 ts_init: Option<UnixNanos>,
1297) -> anyhow::Result<OptionSpread> {
1298 let exchange = Ustr::from(msg.exchange()?);
1299 let underlying = Ustr::from(msg.underlying()?);
1300 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1301 Some(AssetClass::Equity)
1302 } else {
1303 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1304 asset_class
1305 };
1306 let strategy_type = Ustr::from(msg.secsubtype()?);
1307 let currency = parse_currency_or_usd_default(msg.currency());
1308 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1309 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1310 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1311 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1313
1314 OptionSpread::new_checked(
1315 instrument_id,
1316 instrument_id.symbol,
1317 asset_class_opt.unwrap_or(AssetClass::Commodity),
1318 Some(exchange),
1319 underlying,
1320 strategy_type,
1321 msg.activation.into(),
1322 msg.expiration.into(),
1323 currency,
1324 price_increment.precision,
1325 price_increment,
1326 multiplier,
1327 lot_size,
1328 None, None, None, None, None, None, None, None, ts_event,
1337 ts_init,
1338 )
1339}
1340
1341pub fn decode_imbalance_msg(
1347 msg: &dbn::ImbalanceMsg,
1348 instrument_id: InstrumentId,
1349 price_precision: u8,
1350 ts_init: Option<UnixNanos>,
1351) -> anyhow::Result<DatabentoImbalance> {
1352 let ts_event = msg.ts_recv.into();
1353 let ts_init = ts_init.unwrap_or(ts_event);
1354
1355 Ok(DatabentoImbalance::new(
1356 instrument_id,
1357 decode_price(msg.ref_price, price_precision),
1358 decode_price(msg.cont_book_clr_price, price_precision),
1359 decode_price(msg.auct_interest_clr_price, price_precision),
1360 Quantity::new(f64::from(msg.paired_qty), 0),
1361 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1362 parse_order_side(msg.side),
1363 msg.significant_imbalance as c_char,
1364 msg.hd.ts_event.into(),
1365 ts_event,
1366 ts_init,
1367 ))
1368}
1369
1370pub fn decode_statistics_msg(
1377 msg: &dbn::StatMsg,
1378 instrument_id: InstrumentId,
1379 price_precision: u8,
1380 ts_init: Option<UnixNanos>,
1381) -> anyhow::Result<DatabentoStatistics> {
1382 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1383 .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1384 let update_action =
1385 DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1386 anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1387 })?;
1388 let ts_event = msg.ts_recv.into();
1389 let ts_init = ts_init.unwrap_or(ts_event);
1390
1391 Ok(DatabentoStatistics::new(
1392 instrument_id,
1393 stat_type,
1394 update_action,
1395 decode_optional_price(msg.price, price_precision),
1396 decode_optional_quantity(msg.quantity),
1397 msg.channel_id,
1398 msg.stat_flags,
1399 msg.sequence,
1400 msg.ts_ref.into(),
1401 msg.ts_in_delta,
1402 msg.hd.ts_event.into(),
1403 ts_event,
1404 ts_init,
1405 ))
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410 use std::path::{Path, PathBuf};
1411
1412 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1413 use fallible_streaming_iterator::FallibleStreamingIterator;
1414 use nautilus_model::instruments::Instrument;
1415 use rstest::*;
1416
1417 use super::*;
1418
1419 fn test_data_path() -> PathBuf {
1420 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1421 }
1422
1423 #[rstest]
1424 #[case('Y' as c_char, Some(true))]
1425 #[case('N' as c_char, Some(false))]
1426 #[case('X' as c_char, None)]
1427 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1428 assert_eq!(parse_optional_bool(input), expected);
1429 }
1430
1431 #[rstest]
1432 #[case('A' as c_char, OrderSide::Sell)]
1433 #[case('B' as c_char, OrderSide::Buy)]
1434 #[case('X' as c_char, OrderSide::NoOrderSide)]
1435 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1436 assert_eq!(parse_order_side(input), expected);
1437 }
1438
1439 #[rstest]
1440 #[case('A' as c_char, AggressorSide::Seller)]
1441 #[case('B' as c_char, AggressorSide::Buyer)]
1442 #[case('X' as c_char, AggressorSide::NoAggressor)]
1443 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1444 assert_eq!(parse_aggressor_side(input), expected);
1445 }
1446
1447 #[rstest]
1448 #[case('T' as c_char, true)]
1449 #[case('A' as c_char, false)]
1450 #[case('C' as c_char, false)]
1451 #[case('F' as c_char, false)]
1452 #[case('M' as c_char, false)]
1453 #[case('R' as c_char, false)]
1454 fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1455 assert_eq!(is_trade_msg(action), expected);
1456 }
1457
1458 #[rstest]
1459 #[case('A' as c_char, Ok(BookAction::Add))]
1460 #[case('C' as c_char, Ok(BookAction::Delete))]
1461 #[case('F' as c_char, Ok(BookAction::Update))]
1462 #[case('M' as c_char, Ok(BookAction::Update))]
1463 #[case('R' as c_char, Ok(BookAction::Clear))]
1464 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1465 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1466 match parse_book_action(input) {
1467 Ok(action) => assert_eq!(Ok(action), expected),
1468 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1469 }
1470 }
1471
1472 #[rstest]
1473 #[case('C' as c_char, Ok(OptionKind::Call))]
1474 #[case('P' as c_char, Ok(OptionKind::Put))]
1475 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1476 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1477 match parse_option_kind(input) {
1478 Ok(kind) => assert_eq!(Ok(kind), expected),
1479 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1480 }
1481 }
1482
1483 #[rstest]
1484 #[case(Ok("USD"), Currency::USD())]
1485 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1486 #[case(Ok(""), Currency::USD())]
1487 #[case(Err("Error"), Currency::USD())]
1488 fn test_parse_currency_or_usd_default(
1489 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1491 ) {
1492 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1493 assert_eq!(actual, expected);
1494 }
1495
1496 #[rstest]
1497 #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1498 #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1499 #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1500 #[case("XXX", Ok((None, None)))]
1501 #[case("D", Err("Value string is too short"))]
1502 fn test_parse_cfi_iso10926(
1503 #[case] input: &str,
1504 #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1505 ) {
1506 match parse_cfi_iso10926(input) {
1507 Ok(result) => assert_eq!(Ok(result), expected),
1508 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1509 }
1510 }
1511
1512 #[rstest]
1513 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(
1516 10_000_000_000,
1517 2,
1518 Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1519 )]
1520 fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1521 let actual = decode_price_increment(value, precision);
1522 assert_eq!(actual, expected);
1523 }
1524
1525 #[rstest]
1526 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(
1529 10_000_000_000,
1530 2,
1531 Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1532 )]
1533 fn test_decode_optional_price(
1534 #[case] value: i64,
1535 #[case] precision: u8,
1536 #[case] expected: Option<Price>,
1537 ) {
1538 let actual = decode_optional_price(value, precision);
1539 assert_eq!(actual, expected);
1540 }
1541
1542 #[rstest]
1543 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1547 let actual = decode_optional_quantity(value);
1548 assert_eq!(actual, expected);
1549 }
1550
1551 #[rstest]
1552 #[case(0, Quantity::from(1))] #[case(i64::MAX, Quantity::from(1))] #[case(50_000_000_000, Quantity::from("50"))] #[case(12_500_000_000, Quantity::from("12.5"))] #[case(1_000_000_000, Quantity::from("1"))] #[case(1, Quantity::from("0.000000001"))] #[case(1_000_000_001, Quantity::from("1.000000001"))] #[case(999_999_999, Quantity::from("0.999999999"))] #[case(123_456_789_000, Quantity::from("123.456789"))] fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1562 assert_eq!(decode_multiplier(raw).unwrap(), expected);
1563 }
1564
1565 #[rstest]
1566 #[case(-1_500_000_000)] #[case(-1)] #[case(-999_999_999)] fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1570 let result = decode_multiplier(raw);
1571 assert!(result.is_err());
1572 assert!(
1573 result
1574 .unwrap_err()
1575 .to_string()
1576 .contains("Invalid negative multiplier")
1577 );
1578 }
1579
1580 #[rstest]
1581 #[case(100, Quantity::from(100))]
1582 #[case(1000, Quantity::from(1000))]
1583 #[case(5, Quantity::from(5))]
1584 fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1585 assert_eq!(decode_quantity(value), expected);
1586 }
1587
1588 #[rstest]
1589 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(
1592 10_000_000_000,
1593 2,
1594 Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1595 )]
1596 fn test_decode_price_increment(
1597 #[case] value: i64,
1598 #[case] precision: u8,
1599 #[case] expected: Price,
1600 ) {
1601 assert_eq!(decode_price_increment(value, precision), expected);
1602 }
1603
1604 #[rstest]
1605 #[case(0, Quantity::from(1))] #[case(i32::MAX, Quantity::from(1))] #[case(100, Quantity::from(100))]
1608 #[case(1, Quantity::from(1))]
1609 #[case(1000, Quantity::from(1000))]
1610 fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1611 assert_eq!(decode_lot_size(value), expected);
1612 }
1613
1614 #[rstest]
1615 #[case(0, None)] #[case(1, Some(Ustr::from("Scheduled")))]
1617 #[case(2, Some(Ustr::from("Surveillance intervention")))]
1618 #[case(3, Some(Ustr::from("Market event")))]
1619 #[case(10, Some(Ustr::from("Regulatory")))]
1620 #[case(30, Some(Ustr::from("News pending")))]
1621 #[case(40, Some(Ustr::from("Order imbalance")))]
1622 #[case(50, Some(Ustr::from("LULD pause")))]
1623 #[case(60, Some(Ustr::from("Operational")))]
1624 #[case(100, Some(Ustr::from("Corporate action")))]
1625 #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1626 fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1627 assert_eq!(parse_status_reason(value).unwrap(), expected);
1628 }
1629
1630 #[rstest]
1631 #[case(999)] fn test_parse_status_reason_invalid(#[case] value: u16) {
1633 assert!(parse_status_reason(value).is_err());
1634 }
1635
1636 #[rstest]
1637 #[case(0, None)] #[case(1, Some(Ustr::from("No cancel")))]
1639 #[case(2, Some(Ustr::from("Change trading session")))]
1640 #[case(3, Some(Ustr::from("Implied matching on")))]
1641 #[case(4, Some(Ustr::from("Implied matching off")))]
1642 fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1643 assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1644 }
1645
1646 #[rstest]
1647 #[case(5)] #[case(100)] fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1650 assert!(parse_status_trading_event(value).is_err());
1651 }
1652
1653 #[rstest]
1654 fn test_decode_mbo_msg() {
1655 let path = test_data_path().join("test_data.mbo.dbn.zst");
1656 let mut dbn_stream = Decoder::from_zstd_file(path)
1657 .unwrap()
1658 .decode_stream::<dbn::MboMsg>();
1659 let msg = dbn_stream.next().unwrap().unwrap();
1660
1661 let instrument_id = InstrumentId::from("ESM4.GLBX");
1662 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1663 let delta = delta.unwrap();
1664
1665 assert_eq!(delta.instrument_id, instrument_id);
1666 assert_eq!(delta.action, BookAction::Delete);
1667 assert_eq!(delta.order.side, OrderSide::Sell);
1668 assert_eq!(delta.order.price, Price::from("3722.75"));
1669 assert_eq!(delta.order.size, Quantity::from("1"));
1670 assert_eq!(delta.order.order_id, 647_784_973_705);
1671 assert_eq!(delta.flags, 128);
1672 assert_eq!(delta.sequence, 1_170_352);
1673 assert_eq!(delta.ts_event, msg.ts_recv);
1674 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1675 assert_eq!(delta.ts_init, 0);
1676 }
1677
1678 #[rstest]
1679 fn test_decode_mbo_msg_clear_action() {
1680 let ts_recv = 1_609_160_400_000_000_000;
1682 let msg = dbn::MboMsg {
1683 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1684 order_id: 0,
1685 price: i64::MAX,
1686 size: 0,
1687 flags: dbn::FlagSet::empty(),
1688 channel_id: 0,
1689 action: 'R' as c_char,
1690 side: 'N' as c_char, ts_recv,
1692 ts_in_delta: 0,
1693 sequence: 1_000_000,
1694 };
1695
1696 let instrument_id = InstrumentId::from("ESM4.GLBX");
1697 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1698
1699 assert!(trade.is_none());
1701 let delta = delta.expect("Clear action should produce OrderBookDelta");
1702
1703 assert_eq!(delta.instrument_id, instrument_id);
1704 assert_eq!(delta.action, BookAction::Clear);
1705 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1706 assert_eq!(delta.order.size, Quantity::from("0"));
1707 assert_eq!(delta.order.order_id, 0);
1708 assert_eq!(delta.sequence, 1_000_000);
1709 assert_eq!(delta.ts_event, ts_recv);
1710 assert_eq!(delta.ts_init, 0);
1711 assert!(delta.order.price.is_undefined());
1712 assert_eq!(delta.order.price.precision, 0);
1713 }
1714
1715 #[rstest]
1716 fn test_decode_mbo_msg_price_undef_with_precision() {
1717 let ts_recv = 1_609_160_400_000_000_000;
1719 let msg = dbn::MboMsg {
1720 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1721 order_id: 0,
1722 price: i64::MAX, size: 0,
1724 flags: dbn::FlagSet::empty(),
1725 channel_id: 0,
1726 action: 'R' as c_char, side: 'N' as c_char, ts_recv,
1729 ts_in_delta: 0,
1730 sequence: 0,
1731 };
1732
1733 let instrument_id = InstrumentId::from("ESM4.GLBX");
1734 let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1735 let delta = delta.unwrap();
1736
1737 assert!(delta.order.price.is_undefined());
1738 assert_eq!(delta.order.price.precision, 0);
1739 assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1740 }
1741
1742 #[rstest]
1743 fn test_decode_mbo_msg_no_order_side_update() {
1744 let ts_recv = 1_609_160_400_000_000_000;
1747 let msg = dbn::MboMsg {
1748 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1749 order_id: 123_456_789,
1750 price: 4_800_250_000_000, size: 1,
1752 flags: dbn::FlagSet::empty(),
1753 channel_id: 1,
1754 action: 'M' as c_char, side: 'N' as c_char, ts_recv,
1757 ts_in_delta: 0,
1758 sequence: 1_000_000,
1759 };
1760
1761 let instrument_id = InstrumentId::from("ESM4.GLBX");
1762 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1763
1764 assert!(delta.is_some());
1766 assert!(trade.is_none());
1767 let delta = delta.unwrap();
1768 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1769 assert_eq!(delta.order.order_id, 123_456_789);
1770 assert_eq!(delta.action, BookAction::Update);
1771 }
1772
1773 #[rstest]
1774 fn test_decode_mbp1_msg() {
1775 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1776 let mut dbn_stream = Decoder::from_zstd_file(path)
1777 .unwrap()
1778 .decode_stream::<dbn::Mbp1Msg>();
1779 let msg = dbn_stream.next().unwrap().unwrap();
1780
1781 let instrument_id = InstrumentId::from("ESM4.GLBX");
1782 let (maybe_quote, _) =
1783 decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1784 let quote = maybe_quote.expect("Expected valid quote");
1785
1786 assert_eq!(quote.instrument_id, instrument_id);
1787 assert_eq!(quote.bid_price, Price::from("3720.25"));
1788 assert_eq!(quote.ask_price, Price::from("3720.50"));
1789 assert_eq!(quote.bid_size, Quantity::from("24"));
1790 assert_eq!(quote.ask_size, Quantity::from("11"));
1791 assert_eq!(quote.ts_event, msg.ts_recv);
1792 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1793 assert_eq!(quote.ts_init, 0);
1794 }
1795
1796 #[rstest]
1797 fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
1798 let ts_recv = 1_609_160_400_000_000_000;
1799 let msg = dbn::Mbp1Msg {
1800 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1801 price: 3_720_250_000_000, size: 5,
1803 action: 'A' as c_char,
1804 side: 'B' as c_char,
1805 flags: dbn::FlagSet::empty(),
1806 depth: 0,
1807 ts_recv,
1808 ts_in_delta: 0,
1809 sequence: 1_170_352,
1810 levels: [dbn::BidAskPair {
1811 bid_px: 3_720_250_000_000, ask_px: i64::MAX, bid_sz: 24,
1814 ask_sz: 0,
1815 bid_ct: 1,
1816 ask_ct: 0,
1817 }],
1818 };
1819
1820 let instrument_id = InstrumentId::from("ESM4.GLBX");
1821 let (maybe_quote, _) =
1822 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1823
1824 assert!(maybe_quote.is_none());
1826 }
1827
1828 #[rstest]
1829 fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
1830 let ts_recv = 1_609_160_400_000_000_000;
1831 let msg = dbn::Mbp1Msg {
1832 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1833 price: 3_720_500_000_000, size: 5,
1835 action: 'A' as c_char,
1836 side: 'A' as c_char,
1837 flags: dbn::FlagSet::empty(),
1838 depth: 0,
1839 ts_recv,
1840 ts_in_delta: 0,
1841 sequence: 1_170_352,
1842 levels: [dbn::BidAskPair {
1843 bid_px: i64::MAX, ask_px: 3_720_500_000_000, bid_sz: 0,
1846 ask_sz: 11,
1847 bid_ct: 0,
1848 ask_ct: 1,
1849 }],
1850 };
1851
1852 let instrument_id = InstrumentId::from("ESM4.GLBX");
1853 let (maybe_quote, _) =
1854 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1855
1856 assert!(maybe_quote.is_none());
1858 }
1859
1860 #[rstest]
1861 fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
1862 let ts_recv = 1_609_160_400_000_000_000;
1863 let msg = dbn::Mbp1Msg {
1864 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1865 price: 3_720_250_000_000, size: 5,
1867 action: 'T' as c_char, side: 'A' as c_char,
1869 flags: dbn::FlagSet::empty(),
1870 depth: 0,
1871 ts_recv,
1872 ts_in_delta: 0,
1873 sequence: 1_170_352,
1874 levels: [dbn::BidAskPair {
1875 bid_px: i64::MAX, ask_px: i64::MAX, bid_sz: 0,
1878 ask_sz: 0,
1879 bid_ct: 0,
1880 ask_ct: 0,
1881 }],
1882 };
1883
1884 let instrument_id = InstrumentId::from("ESM4.GLBX");
1885 let (maybe_quote, maybe_trade) =
1886 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
1887
1888 assert!(maybe_quote.is_none());
1890
1891 let trade = maybe_trade.expect("Expected trade");
1893 assert_eq!(trade.instrument_id, instrument_id);
1894 assert_eq!(trade.price, Price::from("3720.25"));
1895 assert_eq!(trade.size, Quantity::from("5"));
1896 }
1897
1898 #[rstest]
1899 fn test_decode_bbo_1s_msg() {
1900 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1901 let mut dbn_stream = Decoder::from_zstd_file(path)
1902 .unwrap()
1903 .decode_stream::<dbn::BboMsg>();
1904 let msg = dbn_stream.next().unwrap().unwrap();
1905
1906 let instrument_id = InstrumentId::from("ESM4.GLBX");
1907 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1908 let quote = maybe_quote.expect("Expected valid quote");
1909
1910 assert_eq!(quote.instrument_id, instrument_id);
1911 assert_eq!(quote.bid_price, Price::from("3702.25"));
1912 assert_eq!(quote.ask_price, Price::from("3702.75"));
1913 assert_eq!(quote.bid_size, Quantity::from("18"));
1914 assert_eq!(quote.ask_size, Quantity::from("13"));
1915 assert_eq!(quote.ts_event, msg.ts_recv);
1916 assert_eq!(quote.ts_event, 1609113600000000000);
1917 assert_eq!(quote.ts_init, 0);
1918 }
1919
1920 #[rstest]
1921 fn test_decode_bbo_1m_msg() {
1922 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1923 let mut dbn_stream = Decoder::from_zstd_file(path)
1924 .unwrap()
1925 .decode_stream::<dbn::BboMsg>();
1926 let msg = dbn_stream.next().unwrap().unwrap();
1927
1928 let instrument_id = InstrumentId::from("ESM4.GLBX");
1929 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1930 let quote = maybe_quote.expect("Expected valid quote");
1931
1932 assert_eq!(quote.instrument_id, instrument_id);
1933 assert_eq!(quote.bid_price, Price::from("3702.25"));
1934 assert_eq!(quote.ask_price, Price::from("3702.75"));
1935 assert_eq!(quote.bid_size, Quantity::from("18"));
1936 assert_eq!(quote.ask_size, Quantity::from("13"));
1937 assert_eq!(quote.ts_event, msg.ts_recv);
1938 assert_eq!(quote.ts_event, 1609113600000000000);
1939 assert_eq!(quote.ts_init, 0);
1940 }
1941
1942 #[rstest]
1943 fn test_decode_mbp10_msg() {
1944 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1945 let mut dbn_stream = Decoder::from_zstd_file(path)
1946 .unwrap()
1947 .decode_stream::<dbn::Mbp10Msg>();
1948 let msg = dbn_stream.next().unwrap().unwrap();
1949
1950 let instrument_id = InstrumentId::from("ESM4.GLBX");
1951 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1952
1953 assert_eq!(depth10.instrument_id, instrument_id);
1954 assert_eq!(depth10.bids.len(), 10);
1955 assert_eq!(depth10.asks.len(), 10);
1956 assert_eq!(depth10.bid_counts.len(), 10);
1957 assert_eq!(depth10.ask_counts.len(), 10);
1958 assert_eq!(depth10.flags, 128);
1959 assert_eq!(depth10.sequence, 1_170_352);
1960 assert_eq!(depth10.ts_event, msg.ts_recv);
1961 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1962 assert_eq!(depth10.ts_init, 0);
1963 }
1964
1965 #[rstest]
1966 fn test_decode_trade_msg() {
1967 let path = test_data_path().join("test_data.trades.dbn.zst");
1968 let mut dbn_stream = Decoder::from_zstd_file(path)
1969 .unwrap()
1970 .decode_stream::<dbn::TradeMsg>();
1971 let msg = dbn_stream.next().unwrap().unwrap();
1972
1973 let instrument_id = InstrumentId::from("ESM4.GLBX");
1974 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1975
1976 assert_eq!(trade.instrument_id, instrument_id);
1977 assert_eq!(trade.price, Price::from("3720.25"));
1978 assert_eq!(trade.size, Quantity::from("5"));
1979 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1980 assert_eq!(trade.trade_id.to_string(), "1170380");
1981 assert_eq!(trade.ts_event, msg.ts_recv);
1982 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1983 assert_eq!(trade.ts_init, 0);
1984 }
1985
1986 #[rstest]
1987 fn test_decode_tbbo_msg() {
1988 let path = test_data_path().join("test_data.tbbo.dbn.zst");
1989 let mut dbn_stream = Decoder::from_zstd_file(path)
1990 .unwrap()
1991 .decode_stream::<dbn::Mbp1Msg>();
1992 let msg = dbn_stream.next().unwrap().unwrap();
1993
1994 let instrument_id = InstrumentId::from("ESM4.GLBX");
1995 let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1996 let quote = maybe_quote.expect("Expected valid quote");
1997
1998 assert_eq!(quote.instrument_id, instrument_id);
1999 assert_eq!(quote.bid_price, Price::from("3720.25"));
2000 assert_eq!(quote.ask_price, Price::from("3720.50"));
2001 assert_eq!(quote.bid_size, Quantity::from("26"));
2002 assert_eq!(quote.ask_size, Quantity::from("7"));
2003 assert_eq!(quote.ts_event, msg.ts_recv);
2004 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2005 assert_eq!(quote.ts_init, 0);
2006
2007 assert_eq!(trade.instrument_id, instrument_id);
2008 assert_eq!(trade.price, Price::from("3720.25"));
2009 assert_eq!(trade.size, Quantity::from("5"));
2010 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2011 assert_eq!(trade.trade_id.to_string(), "1170380");
2012 assert_eq!(trade.ts_event, msg.ts_recv);
2013 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2014 assert_eq!(trade.ts_init, 0);
2015 }
2016
2017 #[rstest]
2018 fn test_decode_ohlcv_msg() {
2019 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2020 let mut dbn_stream = Decoder::from_zstd_file(path)
2021 .unwrap()
2022 .decode_stream::<dbn::OhlcvMsg>();
2023 let msg = dbn_stream.next().unwrap().unwrap();
2024
2025 let instrument_id = InstrumentId::from("ESM4.GLBX");
2026 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2027
2028 assert_eq!(
2029 bar.bar_type,
2030 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2031 );
2032 assert_eq!(bar.open, Price::from("372025.00"));
2033 assert_eq!(bar.high, Price::from("372050.00"));
2034 assert_eq!(bar.low, Price::from("372025.00"));
2035 assert_eq!(bar.close, Price::from("372050.00"));
2036 assert_eq!(bar.volume, Quantity::from("57"));
2037 assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); assert_eq!(bar.ts_init, 0); }
2040
2041 #[rstest]
2042 fn test_decode_definition_msg() {
2043 let path = test_data_path().join("test_data.definition.dbn.zst");
2044 let mut dbn_stream = Decoder::from_zstd_file(path)
2045 .unwrap()
2046 .decode_stream::<dbn::InstrumentDefMsg>();
2047 let msg = dbn_stream.next().unwrap().unwrap();
2048
2049 let instrument_id = InstrumentId::from("ESM4.GLBX");
2050 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2051
2052 assert!(result.is_ok());
2053 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2054 }
2055
2056 #[rstest]
2057 fn test_decode_status_msg() {
2058 let path = test_data_path().join("test_data.status.dbn.zst");
2059 let mut dbn_stream = Decoder::from_zstd_file(path)
2060 .unwrap()
2061 .decode_stream::<dbn::StatusMsg>();
2062 let msg = dbn_stream.next().unwrap().unwrap();
2063
2064 let instrument_id = InstrumentId::from("ESM4.GLBX");
2065 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2066
2067 assert_eq!(status.instrument_id, instrument_id);
2068 assert_eq!(status.action, MarketStatusAction::Trading);
2069 assert_eq!(status.ts_event, msg.hd.ts_event);
2070 assert_eq!(status.ts_init, 0);
2071 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2072 assert_eq!(status.trading_event, None);
2073 assert_eq!(status.is_trading, Some(true));
2074 assert_eq!(status.is_quoting, Some(true));
2075 assert_eq!(status.is_short_sell_restricted, None);
2076 }
2077
2078 #[rstest]
2079 fn test_decode_imbalance_msg() {
2080 let path = test_data_path().join("test_data.imbalance.dbn.zst");
2081 let mut dbn_stream = Decoder::from_zstd_file(path)
2082 .unwrap()
2083 .decode_stream::<dbn::ImbalanceMsg>();
2084 let msg = dbn_stream.next().unwrap().unwrap();
2085
2086 let instrument_id = InstrumentId::from("ESM4.GLBX");
2087 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2088
2089 assert_eq!(imbalance.instrument_id, instrument_id);
2090 assert_eq!(imbalance.ref_price, Price::from("229.43"));
2091 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2092 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2093 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2094 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2095 assert_eq!(imbalance.side, OrderSide::Buy);
2096 assert_eq!(imbalance.significant_imbalance, 126);
2097 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2098 assert_eq!(imbalance.ts_recv, msg.ts_recv);
2099 assert_eq!(imbalance.ts_init, 0);
2100 }
2101
2102 #[rstest]
2103 fn test_decode_statistics_msg() {
2104 let path = test_data_path().join("test_data.statistics.dbn.zst");
2105 let mut dbn_stream = Decoder::from_zstd_file(path)
2106 .unwrap()
2107 .decode_stream::<dbn::StatMsg>();
2108 let msg = dbn_stream.next().unwrap().unwrap();
2109
2110 let instrument_id = InstrumentId::from("ESM4.GLBX");
2111 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2112
2113 assert_eq!(statistics.instrument_id, instrument_id);
2114 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2115 assert_eq!(
2116 statistics.update_action,
2117 DatabentoStatisticUpdateAction::Added
2118 );
2119 assert_eq!(statistics.price, Some(Price::from("100.00")));
2120 assert_eq!(statistics.quantity, None);
2121 assert_eq!(statistics.channel_id, 13);
2122 assert_eq!(statistics.stat_flags, 255);
2123 assert_eq!(statistics.sequence, 2);
2124 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2125 assert_eq!(statistics.ts_in_delta, 26961);
2126 assert_eq!(statistics.ts_event, msg.hd.ts_event);
2127 assert_eq!(statistics.ts_recv, msg.ts_recv);
2128 assert_eq!(statistics.ts_init, 0);
2129 }
2130
2131 #[rstest]
2132 fn test_decode_cmbp1_msg() {
2133 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2134 let mut dbn_stream = Decoder::from_zstd_file(path)
2135 .unwrap()
2136 .decode_stream::<dbn::Cmbp1Msg>();
2137 let msg = dbn_stream.next().unwrap().unwrap();
2138
2139 let instrument_id = InstrumentId::from("ESM4.GLBX");
2140 let (maybe_quote, trade) =
2141 decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2142 let quote = maybe_quote.expect("Expected valid quote");
2143
2144 assert_eq!(quote.instrument_id, instrument_id);
2145 assert!(quote.bid_price.raw > 0);
2146 assert!(quote.ask_price.raw > 0);
2147 assert!(quote.bid_size.raw > 0);
2148 assert!(quote.ask_size.raw > 0);
2149 assert_eq!(quote.ts_event, msg.ts_recv);
2150 assert_eq!(quote.ts_init, 0);
2151
2152 if is_trade_msg(msg.action) {
2154 assert!(trade.is_some());
2155 let trade = trade.unwrap();
2156 assert_eq!(trade.instrument_id, instrument_id);
2157 } else {
2158 assert!(trade.is_none());
2159 }
2160 }
2161
2162 #[rstest]
2163 fn test_decode_cbbo_1s_msg() {
2164 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2165 let mut dbn_stream = Decoder::from_zstd_file(path)
2166 .unwrap()
2167 .decode_stream::<dbn::CbboMsg>();
2168 let msg = dbn_stream.next().unwrap().unwrap();
2169
2170 let instrument_id = InstrumentId::from("ESM4.GLBX");
2171 let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2172 let quote = maybe_quote.expect("Expected valid quote");
2173
2174 assert_eq!(quote.instrument_id, instrument_id);
2175 assert!(quote.bid_price.raw > 0);
2176 assert!(quote.ask_price.raw > 0);
2177 assert!(quote.bid_size.raw > 0);
2178 assert!(quote.ask_size.raw > 0);
2179 assert_eq!(quote.ts_event, msg.ts_recv);
2180 assert_eq!(quote.ts_init, 0);
2181 }
2182
2183 #[rstest]
2184 fn test_decode_mbp10_msg_with_all_levels() {
2185 let mut msg = dbn::Mbp10Msg::default();
2186 for i in 0..10 {
2187 msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2188 msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2189 msg.levels[i].bid_sz = 10 + i as u32;
2190 msg.levels[i].ask_sz = 10 + i as u32;
2191 msg.levels[i].bid_ct = 1 + i as u32;
2192 msg.levels[i].ask_ct = 1 + i as u32;
2193 }
2194 msg.ts_recv = 1_609_160_400_000_704_060;
2195
2196 let instrument_id = InstrumentId::from("TEST.VENUE");
2197 let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2198
2199 assert!(result.is_ok());
2200 let depth = result.unwrap();
2201 assert_eq!(depth.bids.len(), 10);
2202 assert_eq!(depth.asks.len(), 10);
2203 assert_eq!(depth.bid_counts.len(), 10);
2204 assert_eq!(depth.ask_counts.len(), 10);
2205 }
2206
2207 #[rstest]
2208 fn test_array_conversion_error_handling() {
2209 let mut bids = Vec::new();
2210 let mut asks = Vec::new();
2211
2212 for i in 0..5 {
2214 bids.push(BookOrder::new(
2215 OrderSide::Buy,
2216 Price::from(format!("{}.00", 100 - i)),
2217 Quantity::from(10),
2218 i as u64,
2219 ));
2220 asks.push(BookOrder::new(
2221 OrderSide::Sell,
2222 Price::from(format!("{}.00", 101 + i)),
2223 Quantity::from(10),
2224 i as u64,
2225 ));
2226 }
2227
2228 let result: Result<[BookOrder; DEPTH10_LEN], _> =
2229 bids.try_into().map_err(|v: Vec<BookOrder>| {
2230 anyhow::anyhow!(
2231 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2232 v.len()
2233 )
2234 });
2235 assert!(result.is_err());
2236 assert!(
2237 result
2238 .unwrap_err()
2239 .to_string()
2240 .contains("Expected exactly 10 bid levels, received 5")
2241 );
2242 }
2243
2244 #[rstest]
2245 fn test_decode_tcbbo_msg() {
2246 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2248 let mut dbn_stream = Decoder::from_zstd_file(path)
2249 .unwrap()
2250 .decode_stream::<dbn::CbboMsg>();
2251 let msg = dbn_stream.next().unwrap().unwrap();
2252
2253 let mut tcbbo_msg = msg.clone();
2255 tcbbo_msg.price = 3702500000000;
2256 tcbbo_msg.size = 10;
2257
2258 let instrument_id = InstrumentId::from("ESM4.GLBX");
2259 let (maybe_quote, trade) =
2260 decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2261 let quote = maybe_quote.expect("Expected valid quote");
2262
2263 assert_eq!(quote.instrument_id, instrument_id);
2264 assert!(quote.bid_price.raw > 0);
2265 assert!(quote.ask_price.raw > 0);
2266 assert!(quote.bid_size.raw > 0);
2267 assert!(quote.ask_size.raw > 0);
2268 assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2269 assert_eq!(quote.ts_init, 0);
2270
2271 assert_eq!(trade.instrument_id, instrument_id);
2272 assert_eq!(trade.price, Price::from("3702.50"));
2273 assert_eq!(trade.size, Quantity::from(10));
2274 assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2275 assert_eq!(trade.ts_init, 0);
2276 }
2277
2278 #[rstest]
2279 fn test_decode_bar_type() {
2280 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2281 let instrument_id = InstrumentId::from("ESM4.GLBX");
2282
2283 msg.hd.rtype = 32;
2285 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2286 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2287
2288 msg.hd.rtype = 33;
2290 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2291 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2292
2293 msg.hd.rtype = 34;
2295 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2296 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2297
2298 msg.hd.rtype = 35;
2300 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2301 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2302
2303 msg.hd.rtype = 99;
2305 let result = decode_bar_type(&msg, instrument_id);
2306 assert!(result.is_err());
2307 }
2308
2309 #[rstest]
2310 fn test_decode_ts_event_adjustment() {
2311 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2312
2313 msg.hd.rtype = 32;
2315 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2316 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2317
2318 msg.hd.rtype = 33;
2320 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2321 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2322
2323 msg.hd.rtype = 34;
2325 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2326 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2327
2328 msg.hd.rtype = 35;
2330 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2331 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2332
2333 msg.hd.rtype = 36;
2335 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2336 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2337
2338 msg.hd.rtype = 99;
2340 let result = decode_ts_event_adjustment(&msg);
2341 assert!(result.is_err());
2342 }
2343
2344 #[rstest]
2345 fn test_decode_record() {
2346 let path = test_data_path().join("test_data.mbo.dbn.zst");
2348 let decoder = Decoder::from_zstd_file(path).unwrap();
2349 let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2350 let msg = dbn_stream.next().unwrap().unwrap();
2351
2352 let record_ref = dbn::RecordRef::from(msg);
2353 let instrument_id = InstrumentId::from("ESM4.GLBX");
2354
2355 let (data1, data2) =
2356 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2357
2358 assert!(data1.is_some());
2359 assert!(data2.is_none());
2360
2361 let path = test_data_path().join("test_data.trades.dbn.zst");
2363 let decoder = Decoder::from_zstd_file(path).unwrap();
2364 let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2365 let msg = dbn_stream.next().unwrap().unwrap();
2366
2367 let record_ref = dbn::RecordRef::from(msg);
2368
2369 let (data1, data2) =
2370 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2371
2372 assert!(data1.is_some());
2373 assert!(data2.is_none());
2374 assert!(matches!(data1.unwrap(), Data::Trade(_)));
2375 }
2376}