1use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24 },
25 enums::{
26 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28 },
29 identifiers::{InstrumentId, TradeId},
30 instruments::{
31 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32 },
33 types::{
34 Currency, Price, Quantity,
35 price::{PRICE_UNDEF, PriceRaw, decode_raw_price_i64},
36 },
37};
38use ustr::Ustr;
39
40use super::{
41 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
42 types::{DatabentoImbalance, DatabentoStatistics},
43};
44
45const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
47
48const BAR_SPEC_1S: BarSpecification = BarSpecification {
49 step: STEP_ONE,
50 aggregation: BarAggregation::Second,
51 price_type: PriceType::Last,
52};
53const BAR_SPEC_1M: BarSpecification = BarSpecification {
54 step: STEP_ONE,
55 aggregation: BarAggregation::Minute,
56 price_type: PriceType::Last,
57};
58const BAR_SPEC_1H: BarSpecification = BarSpecification {
59 step: STEP_ONE,
60 aggregation: BarAggregation::Hour,
61 price_type: PriceType::Last,
62};
63const BAR_SPEC_1D: BarSpecification = BarSpecification {
64 step: STEP_ONE,
65 aggregation: BarAggregation::Day,
66 price_type: PriceType::Last,
67};
68
69const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
70const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
71const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
72const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
73
74#[must_use]
75pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
76 match c as u8 as char {
77 'Y' => Some(true),
78 'N' => Some(false),
79 _ => None,
80 }
81}
82
83#[must_use]
84pub const fn parse_order_side(c: c_char) -> OrderSide {
85 match c as u8 as char {
86 'A' => OrderSide::Sell,
87 'B' => OrderSide::Buy,
88 _ => OrderSide::NoOrderSide,
89 }
90}
91
92#[must_use]
93pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
94 match c as u8 as char {
95 'A' => AggressorSide::Seller,
96 'B' => AggressorSide::Buyer,
97 _ => AggressorSide::NoAggressor,
98 }
99}
100
101pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
107 match c as u8 as char {
108 'A' => Ok(BookAction::Add),
109 'C' => Ok(BookAction::Delete),
110 'F' => Ok(BookAction::Update),
111 'M' => Ok(BookAction::Update),
112 'R' => Ok(BookAction::Clear),
113 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
114 }
115}
116
117pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
123 match c as u8 as char {
124 'C' => Ok(OptionKind::Call),
125 'P' => Ok(OptionKind::Put),
126 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
127 }
128}
129
130fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
131 match value {
132 Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
133 tracing::warn!("Unknown currency code '{value}', defaulting to USD");
134 Currency::USD()
135 }),
136 Ok(_) => Currency::USD(),
137 Err(e) => {
138 tracing::error!("Error parsing currency: {e}");
139 Currency::USD()
140 }
141 }
142}
143
144pub fn parse_cfi_iso10926(
150 value: &str,
151) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
152 let chars: Vec<char> = value.chars().collect();
153 if chars.len() < 3 {
154 anyhow::bail!("Value string is too short");
155 }
156
157 let cfi_category = chars[0];
159 let cfi_group = chars[1];
160 let cfi_attribute1 = chars[2];
161 let mut asset_class = match cfi_category {
166 'D' => Some(AssetClass::Debt),
167 'E' => Some(AssetClass::Equity),
168 'S' => None,
169 _ => None,
170 };
171
172 let instrument_class = match cfi_group {
173 'I' => Some(InstrumentClass::Future),
174 _ => None,
175 };
176
177 if cfi_attribute1 == 'I' {
178 asset_class = Some(AssetClass::Index);
179 }
180
181 Ok((asset_class, instrument_class))
182}
183
184pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
192 let value_str = match value {
193 0 => return Ok(None),
194 1 => "Scheduled",
195 2 => "Surveillance intervention",
196 3 => "Market event",
197 4 => "Instrument activation",
198 5 => "Instrument expiration",
199 6 => "Recovery in process",
200 10 => "Regulatory",
201 11 => "Administrative",
202 12 => "Non-compliance",
203 13 => "Filings not current",
204 14 => "SEC trading suspension",
205 15 => "New issue",
206 16 => "Issue available",
207 17 => "Issues reviewed",
208 18 => "Filing requirements satisfied",
209 30 => "News pending",
210 31 => "News released",
211 32 => "News and resumption times",
212 33 => "News not forthcoming",
213 40 => "Order imbalance",
214 50 => "LULD pause",
215 60 => "Operational",
216 70 => "Additional information requested",
217 80 => "Merger effective",
218 90 => "ETF",
219 100 => "Corporate action",
220 110 => "New Security offering",
221 120 => "Market wide halt level 1",
222 121 => "Market wide halt level 2",
223 122 => "Market wide halt level 3",
224 123 => "Market wide halt carryover",
225 124 => "Market wide halt resumption",
226 130 => "Quotation not available",
227 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
228 };
229
230 Ok(Some(Ustr::from(value_str)))
231}
232
233pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
239 let value_str = match value {
240 0 => return Ok(None),
241 1 => "No cancel",
242 2 => "Change trading session",
243 3 => "Implied matching on",
244 4 => "Implied matching off",
245 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
246 };
247
248 Ok(Some(Ustr::from(value_str)))
249}
250
251#[inline(always)]
259#[must_use]
260fn decode_raw_price_with_precision(value: i64, precision: u8) -> (PriceRaw, u8) {
261 let raw = if value == i64::MAX {
262 PRICE_UNDEF
263 } else {
264 decode_raw_price_i64(value)
265 };
266
267 let precision = if raw == PRICE_UNDEF { 0 } else { precision };
269 (raw, precision)
270}
271
272#[inline(always)]
274#[must_use]
275pub fn decode_price(value: i64, precision: u8) -> Price {
276 let (raw, precision) = decode_raw_price_with_precision(value, precision);
277 Price::from_raw(raw, precision)
278}
279
280#[inline(always)]
282#[must_use]
283pub fn decode_price_increment(value: i64, precision: u8) -> Price {
284 match value {
285 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
286 _ => decode_price(value, precision),
287 }
288}
289
290#[inline(always)]
292#[must_use]
293pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
294 match value {
295 i64::MAX => None,
296 _ => Some(decode_price(value, precision)),
297 }
298}
299
300#[inline(always)]
302#[must_use]
303pub fn decode_quantity(value: u64) -> Quantity {
304 Quantity::from(value)
305}
306
307#[inline(always)]
309#[must_use]
310pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
311 match value {
312 i64::MAX => None,
313 _ => Some(Quantity::from(value)),
314 }
315}
316
317pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
324 match value {
325 0 | i64::MAX => Ok(Quantity::from(1)),
326 v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
327 v => {
328 let abs = v as u128;
331
332 const SCALE: u128 = 1_000_000_000;
333 let int_part = abs / SCALE;
334 let frac_part = abs % SCALE;
335
336 if frac_part == 0 {
339 Ok(Quantity::from(int_part as u64))
341 } else {
342 let mut frac_str = format!("{frac_part:09}");
343 while frac_str.ends_with('0') {
344 frac_str.pop();
345 }
346 let s = format!("{int_part}.{frac_str}");
347 Ok(Quantity::from(s))
348 }
349 }
350 }
351}
352
353#[inline(always)]
355#[must_use]
356pub fn decode_lot_size(value: i32) -> Quantity {
357 match value {
358 0 | i32::MAX => Quantity::from(1),
359 value => Quantity::from(value),
360 }
361}
362
363#[inline(always)]
364#[must_use]
365fn is_trade_msg(action: c_char) -> bool {
366 action as u8 as char == 'T'
367}
368
369pub fn decode_mbo_msg(
378 msg: &dbn::MboMsg,
379 instrument_id: InstrumentId,
380 price_precision: u8,
381 ts_init: Option<UnixNanos>,
382 include_trades: bool,
383) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
384 let side = parse_order_side(msg.side);
385 if is_trade_msg(msg.action) {
386 if include_trades && msg.size > 0 {
387 let price = decode_price(msg.price, price_precision);
388 let size = decode_quantity(msg.size as u64);
389 let aggressor_side = parse_aggressor_side(msg.side);
390 let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
391 let ts_event = msg.ts_recv.into();
392 let ts_init = ts_init.unwrap_or(ts_event);
393
394 let trade = TradeTick::new(
395 instrument_id,
396 price,
397 size,
398 aggressor_side,
399 trade_id,
400 ts_event,
401 ts_init,
402 );
403 return Ok((None, Some(trade)));
404 }
405
406 return Ok((None, None));
407 }
408
409 let action = parse_book_action(msg.action)?;
410 let (raw_price, precision) = decode_raw_price_with_precision(msg.price, price_precision);
411 let price = Price::from_raw(raw_price, precision);
412 let size = decode_quantity(msg.size as u64);
413 let order = BookOrder::new(side, price, size, msg.order_id);
414
415 let ts_event = msg.ts_recv.into();
416 let ts_init = ts_init.unwrap_or(ts_event);
417
418 let delta = OrderBookDelta::new(
419 instrument_id,
420 action,
421 order,
422 msg.flags.raw(),
423 msg.sequence.into(),
424 ts_event,
425 ts_init,
426 );
427
428 Ok((Some(delta), None))
429}
430
431pub fn decode_trade_msg(
437 msg: &dbn::TradeMsg,
438 instrument_id: InstrumentId,
439 price_precision: u8,
440 ts_init: Option<UnixNanos>,
441) -> anyhow::Result<TradeTick> {
442 let ts_event = msg.ts_recv.into();
443 let ts_init = ts_init.unwrap_or(ts_event);
444
445 let trade = TradeTick::new(
446 instrument_id,
447 decode_price(msg.price, price_precision),
448 decode_quantity(msg.size as u64),
449 parse_aggressor_side(msg.side),
450 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
451 ts_event,
452 ts_init,
453 );
454
455 Ok(trade)
456}
457
458pub fn decode_tbbo_msg(
464 msg: &dbn::TbboMsg,
465 instrument_id: InstrumentId,
466 price_precision: u8,
467 ts_init: Option<UnixNanos>,
468) -> anyhow::Result<(QuoteTick, TradeTick)> {
469 let top_level = &msg.levels[0];
470 let ts_event = msg.ts_recv.into();
471 let ts_init = ts_init.unwrap_or(ts_event);
472
473 let quote = QuoteTick::new(
474 instrument_id,
475 decode_price(top_level.bid_px, price_precision),
476 decode_price(top_level.ask_px, price_precision),
477 decode_quantity(top_level.bid_sz as u64),
478 decode_quantity(top_level.ask_sz as u64),
479 ts_event,
480 ts_init,
481 );
482
483 let trade = TradeTick::new(
484 instrument_id,
485 decode_price(msg.price, price_precision),
486 decode_quantity(msg.size as u64),
487 parse_aggressor_side(msg.side),
488 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
489 ts_event,
490 ts_init,
491 );
492
493 Ok((quote, trade))
494}
495
496pub fn decode_mbp1_msg(
502 msg: &dbn::Mbp1Msg,
503 instrument_id: InstrumentId,
504 price_precision: u8,
505 ts_init: Option<UnixNanos>,
506 include_trades: bool,
507) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
508 let top_level = &msg.levels[0];
509 let ts_event = msg.ts_recv.into();
510 let ts_init = ts_init.unwrap_or(ts_event);
511
512 let quote = QuoteTick::new(
513 instrument_id,
514 decode_price(top_level.bid_px, price_precision),
515 decode_price(top_level.ask_px, price_precision),
516 decode_quantity(top_level.bid_sz as u64),
517 decode_quantity(top_level.ask_sz as u64),
518 ts_event,
519 ts_init,
520 );
521
522 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
523 Some(TradeTick::new(
524 instrument_id,
525 decode_price(msg.price, price_precision),
526 decode_quantity(msg.size as u64),
527 parse_aggressor_side(msg.side),
528 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
529 ts_event,
530 ts_init,
531 ))
532 } else {
533 None
534 };
535
536 Ok((quote, maybe_trade))
537}
538
539pub fn decode_bbo_msg(
545 msg: &dbn::BboMsg,
546 instrument_id: InstrumentId,
547 price_precision: u8,
548 ts_init: Option<UnixNanos>,
549) -> anyhow::Result<QuoteTick> {
550 let top_level = &msg.levels[0];
551 let ts_event = msg.ts_recv.into();
552 let ts_init = ts_init.unwrap_or(ts_event);
553
554 let quote = QuoteTick::new(
555 instrument_id,
556 decode_price(top_level.bid_px, price_precision),
557 decode_price(top_level.ask_px, price_precision),
558 decode_quantity(top_level.bid_sz as u64),
559 decode_quantity(top_level.ask_sz as u64),
560 ts_event,
561 ts_init,
562 );
563
564 Ok(quote)
565}
566
567pub fn decode_mbp10_msg(
573 msg: &dbn::Mbp10Msg,
574 instrument_id: InstrumentId,
575 price_precision: u8,
576 ts_init: Option<UnixNanos>,
577) -> anyhow::Result<OrderBookDepth10> {
578 let mut bids = Vec::with_capacity(DEPTH10_LEN);
579 let mut asks = Vec::with_capacity(DEPTH10_LEN);
580 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
581 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
582
583 for level in &msg.levels {
584 let bid_order = BookOrder::new(
585 OrderSide::Buy,
586 decode_price(level.bid_px, price_precision),
587 decode_quantity(level.bid_sz as u64),
588 0,
589 );
590
591 let ask_order = BookOrder::new(
592 OrderSide::Sell,
593 decode_price(level.ask_px, price_precision),
594 decode_quantity(level.ask_sz as u64),
595 0,
596 );
597
598 bids.push(bid_order);
599 asks.push(ask_order);
600 bid_counts.push(level.bid_ct);
601 ask_counts.push(level.ask_ct);
602 }
603
604 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
605 anyhow::anyhow!(
606 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
607 v.len()
608 )
609 })?;
610
611 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
612 anyhow::anyhow!(
613 "Expected exactly {DEPTH10_LEN} ask levels, received {}",
614 v.len()
615 )
616 })?;
617
618 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
619 anyhow::anyhow!(
620 "Expected exactly {DEPTH10_LEN} bid counts, received {}",
621 v.len()
622 )
623 })?;
624
625 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
626 anyhow::anyhow!(
627 "Expected exactly {DEPTH10_LEN} ask counts, received {}",
628 v.len()
629 )
630 })?;
631
632 let ts_event = msg.ts_recv.into();
633 let ts_init = ts_init.unwrap_or(ts_event);
634
635 let depth = OrderBookDepth10::new(
636 instrument_id,
637 bids,
638 asks,
639 bid_counts,
640 ask_counts,
641 msg.flags.raw(),
642 msg.sequence.into(),
643 ts_event,
644 ts_init,
645 );
646
647 Ok(depth)
648}
649
650pub fn decode_cmbp1_msg(
658 msg: &dbn::Cmbp1Msg,
659 instrument_id: InstrumentId,
660 price_precision: u8,
661 ts_init: Option<UnixNanos>,
662 include_trades: bool,
663) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
664 let top_level = &msg.levels[0];
665 let ts_event = msg.ts_recv.into();
666 let ts_init = ts_init.unwrap_or(ts_event);
667
668 let quote = QuoteTick::new(
669 instrument_id,
670 decode_price(top_level.bid_px, price_precision),
671 decode_price(top_level.ask_px, price_precision),
672 decode_quantity(top_level.bid_sz as u64),
673 decode_quantity(top_level.ask_sz as u64),
674 ts_event,
675 ts_init,
676 );
677
678 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
679 Some(TradeTick::new(
681 instrument_id,
682 decode_price(msg.price, price_precision),
683 decode_quantity(msg.size as u64),
684 parse_aggressor_side(msg.side),
685 TradeId::new(UUID4::new().to_string()),
686 ts_event,
687 ts_init,
688 ))
689 } else {
690 None
691 };
692
693 Ok((quote, maybe_trade))
694}
695
696pub fn decode_cbbo_msg(
704 msg: &dbn::CbboMsg,
705 instrument_id: InstrumentId,
706 price_precision: u8,
707 ts_init: Option<UnixNanos>,
708) -> anyhow::Result<QuoteTick> {
709 let top_level = &msg.levels[0];
710 let ts_event = msg.ts_recv.into();
711 let ts_init = ts_init.unwrap_or(ts_event);
712
713 let quote = QuoteTick::new(
714 instrument_id,
715 decode_price(top_level.bid_px, price_precision),
716 decode_price(top_level.ask_px, price_precision),
717 decode_quantity(top_level.bid_sz as u64),
718 decode_quantity(top_level.ask_sz as u64),
719 ts_event,
720 ts_init,
721 );
722
723 Ok(quote)
724}
725
726pub fn decode_tcbbo_msg(
734 msg: &dbn::CbboMsg,
735 instrument_id: InstrumentId,
736 price_precision: u8,
737 ts_init: Option<UnixNanos>,
738) -> anyhow::Result<(QuoteTick, TradeTick)> {
739 let top_level = &msg.levels[0];
740 let ts_event = msg.ts_recv.into();
741 let ts_init = ts_init.unwrap_or(ts_event);
742
743 let quote = QuoteTick::new(
744 instrument_id,
745 decode_price(top_level.bid_px, price_precision),
746 decode_price(top_level.ask_px, price_precision),
747 decode_quantity(top_level.bid_sz as u64),
748 decode_quantity(top_level.ask_sz as u64),
749 ts_event,
750 ts_init,
751 );
752
753 let trade = TradeTick::new(
755 instrument_id,
756 decode_price(msg.price, price_precision),
757 decode_quantity(msg.size as u64),
758 parse_aggressor_side(msg.side),
759 TradeId::new(UUID4::new().to_string()),
760 ts_event,
761 ts_init,
762 );
763
764 Ok((quote, trade))
765}
766
767pub fn decode_bar_type(
771 msg: &dbn::OhlcvMsg,
772 instrument_id: InstrumentId,
773) -> anyhow::Result<BarType> {
774 let bar_type = match msg.hd.rtype {
775 32 => {
776 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
778 }
779 33 => {
780 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
782 }
783 34 => {
784 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
786 }
787 35 => {
788 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
790 }
791 36 => {
792 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
794 }
795 _ => anyhow::bail!(
796 "`rtype` is not a supported bar aggregation, was {}",
797 msg.hd.rtype
798 ),
799 };
800
801 Ok(bar_type)
802}
803
804pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
808 let adjustment = match msg.hd.rtype {
809 32 => {
810 BAR_CLOSE_ADJUSTMENT_1S
812 }
813 33 => {
814 BAR_CLOSE_ADJUSTMENT_1M
816 }
817 34 => {
818 BAR_CLOSE_ADJUSTMENT_1H
820 }
821 35 | 36 => {
822 BAR_CLOSE_ADJUSTMENT_1D
824 }
825 _ => anyhow::bail!(
826 "`rtype` is not a supported bar aggregation, was {}",
827 msg.hd.rtype
828 ),
829 };
830
831 Ok(adjustment.into())
832}
833
834pub fn decode_ohlcv_msg(
838 msg: &dbn::OhlcvMsg,
839 instrument_id: InstrumentId,
840 price_precision: u8,
841 ts_init: Option<UnixNanos>,
842 timestamp_on_close: bool,
843) -> anyhow::Result<Bar> {
844 let bar_type = decode_bar_type(msg, instrument_id)?;
845 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
846
847 let ts_event_raw = msg.hd.ts_event.into();
848 let ts_close = ts_event_raw + ts_event_adjustment;
849 let ts_init = ts_init.unwrap_or(ts_close); let ts_event = if timestamp_on_close {
852 ts_close
853 } else {
854 ts_event_raw
855 };
856
857 let bar = Bar::new(
858 bar_type,
859 decode_price(msg.open, price_precision),
860 decode_price(msg.high, price_precision),
861 decode_price(msg.low, price_precision),
862 decode_price(msg.close, price_precision),
863 decode_quantity(msg.volume),
864 ts_event,
865 ts_init,
866 );
867
868 Ok(bar)
869}
870
871pub fn decode_status_msg(
877 msg: &dbn::StatusMsg,
878 instrument_id: InstrumentId,
879 ts_init: Option<UnixNanos>,
880) -> anyhow::Result<InstrumentStatus> {
881 let ts_event = msg.hd.ts_event.into();
882 let ts_init = ts_init.unwrap_or(ts_event);
883
884 let action = MarketStatusAction::from_u16(msg.action)
885 .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
886
887 let status = InstrumentStatus::new(
888 instrument_id,
889 action,
890 ts_event,
891 ts_init,
892 parse_status_reason(msg.reason)?,
893 parse_status_trading_event(msg.trading_event)?,
894 parse_optional_bool(msg.is_trading),
895 parse_optional_bool(msg.is_quoting),
896 parse_optional_bool(msg.is_short_sell_restricted),
897 );
898
899 Ok(status)
900}
901
902pub fn decode_record(
906 record: &dbn::RecordRef,
907 instrument_id: InstrumentId,
908 price_precision: u8,
909 ts_init: Option<UnixNanos>,
910 include_trades: bool,
911 bars_timestamp_on_close: bool,
912) -> anyhow::Result<(Option<Data>, Option<Data>)> {
913 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
917 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
918 let result = decode_mbo_msg(
919 msg,
920 instrument_id,
921 price_precision,
922 Some(ts_init),
923 include_trades,
924 )?;
925 match result {
926 (Some(delta), None) => (Some(Data::Delta(delta)), None),
927 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
928 (None, None) => (None, None),
929 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
930 }
931 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
932 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
933 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
934 (Some(Data::Trade(trade)), None)
935 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
936 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
937 let result = decode_mbp1_msg(
938 msg,
939 instrument_id,
940 price_precision,
941 Some(ts_init),
942 include_trades,
943 )?;
944 match result {
945 (quote, None) => (Some(Data::Quote(quote)), None),
946 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
947 }
948 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
949 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
950 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
951 (Some(Data::Quote(quote)), None)
952 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
953 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
954 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
955 (Some(Data::Quote(quote)), None)
956 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
957 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
958 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
959 (Some(Data::from(depth)), None)
960 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
961 let bar = decode_ohlcv_msg(
964 msg,
965 instrument_id,
966 price_precision,
967 ts_init,
968 bars_timestamp_on_close,
969 )?;
970 (Some(Data::Bar(bar)), None)
971 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
972 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
973 let result = decode_cmbp1_msg(
974 msg,
975 instrument_id,
976 price_precision,
977 Some(ts_init),
978 include_trades,
979 )?;
980 match result {
981 (quote, None) => (Some(Data::Quote(quote)), None),
982 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
983 }
984 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
985 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
987 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
988 (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
989 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
990 if msg.price != i64::MAX && msg.size > 0 {
992 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
994 let (quote, trade) =
995 decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
996 (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
997 } else {
998 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1000 let quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1001 (Some(Data::Quote(quote)), None)
1002 }
1003 } else {
1004 anyhow::bail!("DBN message type is not currently supported")
1005 };
1006
1007 Ok(result)
1008}
1009
1010const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1011 match ts_init {
1012 Some(ts_init) => ts_init,
1013 None => msg_timestamp,
1014 }
1015}
1016
1017pub fn decode_instrument_def_msg(
1021 msg: &dbn::InstrumentDefMsg,
1022 instrument_id: InstrumentId,
1023 ts_init: Option<UnixNanos>,
1024) -> anyhow::Result<InstrumentAny> {
1025 match msg.instrument_class as u8 as char {
1026 'K' => Ok(InstrumentAny::Equity(decode_equity(
1027 msg,
1028 instrument_id,
1029 ts_init,
1030 )?)),
1031 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1032 msg,
1033 instrument_id,
1034 ts_init,
1035 )?)),
1036 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1037 msg,
1038 instrument_id,
1039 ts_init,
1040 )?)),
1041 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1042 msg,
1043 instrument_id,
1044 ts_init,
1045 )?)),
1046 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1047 msg,
1048 instrument_id,
1049 ts_init,
1050 )?)),
1051 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1052 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1053 _ => anyhow::bail!(
1054 "Unsupported `instrument_class` '{}'",
1055 msg.instrument_class as u8 as char
1056 ),
1057 }
1058}
1059
1060pub fn decode_equity(
1066 msg: &dbn::InstrumentDefMsg,
1067 instrument_id: InstrumentId,
1068 ts_init: Option<UnixNanos>,
1069) -> anyhow::Result<Equity> {
1070 let currency = parse_currency_or_usd_default(msg.currency());
1071 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1072 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1073 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1075
1076 Ok(Equity::new(
1077 instrument_id,
1078 instrument_id.symbol,
1079 None, currency,
1081 price_increment.precision,
1082 price_increment,
1083 Some(lot_size),
1084 None, None, None, None, None, None, None, None, ts_event,
1093 ts_init,
1094 ))
1095}
1096
1097pub fn decode_futures_contract(
1103 msg: &dbn::InstrumentDefMsg,
1104 instrument_id: InstrumentId,
1105 ts_init: Option<UnixNanos>,
1106) -> anyhow::Result<FuturesContract> {
1107 let currency = parse_currency_or_usd_default(msg.currency());
1108 let exchange = Ustr::from(msg.exchange()?);
1109 let underlying = Ustr::from(msg.asset()?);
1110 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1111 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1112 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1113 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1114 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1116
1117 FuturesContract::new_checked(
1118 instrument_id,
1119 instrument_id.symbol,
1120 asset_class.unwrap_or(AssetClass::Commodity),
1121 Some(exchange),
1122 underlying,
1123 msg.activation.into(),
1124 msg.expiration.into(),
1125 currency,
1126 price_increment.precision,
1127 price_increment,
1128 multiplier,
1129 lot_size,
1130 None, None, None, None, None, None, None, None, ts_event,
1139 ts_init,
1140 )
1141}
1142
1143pub fn decode_futures_spread(
1149 msg: &dbn::InstrumentDefMsg,
1150 instrument_id: InstrumentId,
1151 ts_init: Option<UnixNanos>,
1152) -> anyhow::Result<FuturesSpread> {
1153 let exchange = Ustr::from(msg.exchange()?);
1154 let underlying = Ustr::from(msg.asset()?);
1155 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1156 let strategy_type = Ustr::from(msg.secsubtype()?);
1157 let currency = parse_currency_or_usd_default(msg.currency());
1158 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1159 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1160 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1161 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1163
1164 FuturesSpread::new_checked(
1165 instrument_id,
1166 instrument_id.symbol,
1167 asset_class.unwrap_or(AssetClass::Commodity),
1168 Some(exchange),
1169 underlying,
1170 strategy_type,
1171 msg.activation.into(),
1172 msg.expiration.into(),
1173 currency,
1174 price_increment.precision,
1175 price_increment,
1176 multiplier,
1177 lot_size,
1178 None, None, None, None, None, None, None, None, ts_event,
1187 ts_init,
1188 )
1189}
1190
1191pub fn decode_option_contract(
1197 msg: &dbn::InstrumentDefMsg,
1198 instrument_id: InstrumentId,
1199 ts_init: Option<UnixNanos>,
1200) -> anyhow::Result<OptionContract> {
1201 let currency = parse_currency_or_usd_default(msg.currency());
1202 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1203 let exchange = Ustr::from(msg.exchange()?);
1204 let underlying = Ustr::from(msg.underlying()?);
1205 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1206 Some(AssetClass::Equity)
1207 } else {
1208 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1209 asset_class
1210 };
1211 let option_kind = parse_option_kind(msg.instrument_class)?;
1212 let strike_price = Price::from_raw(
1213 decode_raw_price_i64(msg.strike_price),
1214 strike_price_currency.precision,
1215 );
1216 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1217 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1218 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1219 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1221
1222 OptionContract::new_checked(
1223 instrument_id,
1224 instrument_id.symbol,
1225 asset_class_opt.unwrap_or(AssetClass::Commodity),
1226 Some(exchange),
1227 underlying,
1228 option_kind,
1229 strike_price,
1230 currency,
1231 msg.activation.into(),
1232 msg.expiration.into(),
1233 price_increment.precision,
1234 price_increment,
1235 multiplier,
1236 lot_size,
1237 None, None, None, None, None, None, None, None, ts_event,
1246 ts_init,
1247 )
1248}
1249
1250pub fn decode_option_spread(
1256 msg: &dbn::InstrumentDefMsg,
1257 instrument_id: InstrumentId,
1258 ts_init: Option<UnixNanos>,
1259) -> anyhow::Result<OptionSpread> {
1260 let exchange = Ustr::from(msg.exchange()?);
1261 let underlying = Ustr::from(msg.underlying()?);
1262 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1263 Some(AssetClass::Equity)
1264 } else {
1265 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1266 asset_class
1267 };
1268 let strategy_type = Ustr::from(msg.secsubtype()?);
1269 let currency = parse_currency_or_usd_default(msg.currency());
1270 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1271 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1272 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1273 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1275
1276 OptionSpread::new_checked(
1277 instrument_id,
1278 instrument_id.symbol,
1279 asset_class_opt.unwrap_or(AssetClass::Commodity),
1280 Some(exchange),
1281 underlying,
1282 strategy_type,
1283 msg.activation.into(),
1284 msg.expiration.into(),
1285 currency,
1286 price_increment.precision,
1287 price_increment,
1288 multiplier,
1289 lot_size,
1290 None, None, None, None, None, None, None, None, ts_event,
1299 ts_init,
1300 )
1301}
1302
1303pub fn decode_imbalance_msg(
1309 msg: &dbn::ImbalanceMsg,
1310 instrument_id: InstrumentId,
1311 price_precision: u8,
1312 ts_init: Option<UnixNanos>,
1313) -> anyhow::Result<DatabentoImbalance> {
1314 let ts_event = msg.ts_recv.into();
1315 let ts_init = ts_init.unwrap_or(ts_event);
1316
1317 Ok(DatabentoImbalance::new(
1318 instrument_id,
1319 decode_price(msg.ref_price, price_precision),
1320 decode_price(msg.cont_book_clr_price, price_precision),
1321 decode_price(msg.auct_interest_clr_price, price_precision),
1322 Quantity::new(f64::from(msg.paired_qty), 0),
1323 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1324 parse_order_side(msg.side),
1325 msg.significant_imbalance as c_char,
1326 msg.hd.ts_event.into(),
1327 ts_event,
1328 ts_init,
1329 ))
1330}
1331
1332pub fn decode_statistics_msg(
1339 msg: &dbn::StatMsg,
1340 instrument_id: InstrumentId,
1341 price_precision: u8,
1342 ts_init: Option<UnixNanos>,
1343) -> anyhow::Result<DatabentoStatistics> {
1344 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1345 .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1346 let update_action =
1347 DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1348 anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1349 })?;
1350 let ts_event = msg.ts_recv.into();
1351 let ts_init = ts_init.unwrap_or(ts_event);
1352
1353 Ok(DatabentoStatistics::new(
1354 instrument_id,
1355 stat_type,
1356 update_action,
1357 decode_optional_price(msg.price, price_precision),
1358 decode_optional_quantity(msg.quantity),
1359 msg.channel_id,
1360 msg.stat_flags,
1361 msg.sequence,
1362 msg.ts_ref.into(),
1363 msg.ts_in_delta,
1364 msg.hd.ts_event.into(),
1365 ts_event,
1366 ts_init,
1367 ))
1368}
1369
1370#[cfg(test)]
1374mod tests {
1375 use std::path::{Path, PathBuf};
1376
1377 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1378 use fallible_streaming_iterator::FallibleStreamingIterator;
1379 use nautilus_model::instruments::Instrument;
1380 use rstest::*;
1381
1382 use super::*;
1383
1384 fn test_data_path() -> PathBuf {
1385 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1386 }
1387
1388 #[rstest]
1389 #[case('Y' as c_char, Some(true))]
1390 #[case('N' as c_char, Some(false))]
1391 #[case('X' as c_char, None)]
1392 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1393 assert_eq!(parse_optional_bool(input), expected);
1394 }
1395
1396 #[rstest]
1397 #[case('A' as c_char, OrderSide::Sell)]
1398 #[case('B' as c_char, OrderSide::Buy)]
1399 #[case('X' as c_char, OrderSide::NoOrderSide)]
1400 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1401 assert_eq!(parse_order_side(input), expected);
1402 }
1403
1404 #[rstest]
1405 #[case('A' as c_char, AggressorSide::Seller)]
1406 #[case('B' as c_char, AggressorSide::Buyer)]
1407 #[case('X' as c_char, AggressorSide::NoAggressor)]
1408 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1409 assert_eq!(parse_aggressor_side(input), expected);
1410 }
1411
1412 #[rstest]
1413 #[case('T' as c_char, true)]
1414 #[case('A' as c_char, false)]
1415 #[case('C' as c_char, false)]
1416 #[case('F' as c_char, false)]
1417 #[case('M' as c_char, false)]
1418 #[case('R' as c_char, false)]
1419 fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1420 assert_eq!(is_trade_msg(action), expected);
1421 }
1422
1423 #[rstest]
1424 #[case('A' as c_char, Ok(BookAction::Add))]
1425 #[case('C' as c_char, Ok(BookAction::Delete))]
1426 #[case('F' as c_char, Ok(BookAction::Update))]
1427 #[case('M' as c_char, Ok(BookAction::Update))]
1428 #[case('R' as c_char, Ok(BookAction::Clear))]
1429 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1430 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1431 match parse_book_action(input) {
1432 Ok(action) => assert_eq!(Ok(action), expected),
1433 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1434 }
1435 }
1436
1437 #[rstest]
1438 #[case('C' as c_char, Ok(OptionKind::Call))]
1439 #[case('P' as c_char, Ok(OptionKind::Put))]
1440 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1441 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1442 match parse_option_kind(input) {
1443 Ok(kind) => assert_eq!(Ok(kind), expected),
1444 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1445 }
1446 }
1447
1448 #[rstest]
1449 #[case(Ok("USD"), Currency::USD())]
1450 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1451 #[case(Ok(""), Currency::USD())]
1452 #[case(Err("Error"), Currency::USD())]
1453 fn test_parse_currency_or_usd_default(
1454 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1456 ) {
1457 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1458 assert_eq!(actual, expected);
1459 }
1460
1461 #[rstest]
1462 #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1463 #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1464 #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1465 #[case("XXX", Ok((None, None)))]
1466 #[case("D", Err("Value string is too short"))]
1467 fn test_parse_cfi_iso10926(
1468 #[case] input: &str,
1469 #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1470 ) {
1471 match parse_cfi_iso10926(input) {
1472 Ok(result) => assert_eq!(Ok(result), expected),
1473 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1474 }
1475 }
1476
1477 #[rstest]
1478 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1482 let actual = decode_price_increment(value, precision);
1483 assert_eq!(actual, expected);
1484 }
1485
1486 #[rstest]
1487 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] fn test_decode_optional_price(
1491 #[case] value: i64,
1492 #[case] precision: u8,
1493 #[case] expected: Option<Price>,
1494 ) {
1495 let actual = decode_optional_price(value, precision);
1496 assert_eq!(actual, expected);
1497 }
1498
1499 #[rstest]
1500 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1504 let actual = decode_optional_quantity(value);
1505 assert_eq!(actual, expected);
1506 }
1507
1508 #[rstest]
1509 #[case(0, Quantity::from(1))] #[case(i64::MAX, Quantity::from(1))] #[case(50_000_000_000, Quantity::from("50"))] #[case(12_500_000_000, Quantity::from("12.5"))] #[case(1_000_000_000, Quantity::from("1"))] #[case(1, Quantity::from("0.000000001"))] #[case(1_000_000_001, Quantity::from("1.000000001"))] #[case(999_999_999, Quantity::from("0.999999999"))] #[case(123_456_789_000, Quantity::from("123.456789"))] fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1519 assert_eq!(decode_multiplier(raw).unwrap(), expected);
1520 }
1521
1522 #[rstest]
1523 #[case(-1_500_000_000)] #[case(-1)] #[case(-999_999_999)] fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1527 let result = decode_multiplier(raw);
1528 assert!(result.is_err());
1529 assert!(
1530 result
1531 .unwrap_err()
1532 .to_string()
1533 .contains("Invalid negative multiplier")
1534 );
1535 }
1536
1537 #[rstest]
1538 #[case(100, Quantity::from(100))]
1539 #[case(1000, Quantity::from(1000))]
1540 #[case(5, Quantity::from(5))]
1541 fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1542 assert_eq!(decode_quantity(value), expected);
1543 }
1544
1545 #[rstest]
1546 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] fn test_decode_price_increment(
1550 #[case] value: i64,
1551 #[case] precision: u8,
1552 #[case] expected: Price,
1553 ) {
1554 assert_eq!(decode_price_increment(value, precision), expected);
1555 }
1556
1557 #[rstest]
1558 #[case(0, Quantity::from(1))] #[case(i32::MAX, Quantity::from(1))] #[case(100, Quantity::from(100))]
1561 #[case(1, Quantity::from(1))]
1562 #[case(1000, Quantity::from(1000))]
1563 fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1564 assert_eq!(decode_lot_size(value), expected);
1565 }
1566
1567 #[rstest]
1568 #[case(0, None)] #[case(1, Some(Ustr::from("Scheduled")))]
1570 #[case(2, Some(Ustr::from("Surveillance intervention")))]
1571 #[case(3, Some(Ustr::from("Market event")))]
1572 #[case(10, Some(Ustr::from("Regulatory")))]
1573 #[case(30, Some(Ustr::from("News pending")))]
1574 #[case(40, Some(Ustr::from("Order imbalance")))]
1575 #[case(50, Some(Ustr::from("LULD pause")))]
1576 #[case(60, Some(Ustr::from("Operational")))]
1577 #[case(100, Some(Ustr::from("Corporate action")))]
1578 #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1579 fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1580 assert_eq!(parse_status_reason(value).unwrap(), expected);
1581 }
1582
1583 #[rstest]
1584 #[case(999)] fn test_parse_status_reason_invalid(#[case] value: u16) {
1586 assert!(parse_status_reason(value).is_err());
1587 }
1588
1589 #[rstest]
1590 #[case(0, None)] #[case(1, Some(Ustr::from("No cancel")))]
1592 #[case(2, Some(Ustr::from("Change trading session")))]
1593 #[case(3, Some(Ustr::from("Implied matching on")))]
1594 #[case(4, Some(Ustr::from("Implied matching off")))]
1595 fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1596 assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1597 }
1598
1599 #[rstest]
1600 #[case(5)] #[case(100)] fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1603 assert!(parse_status_trading_event(value).is_err());
1604 }
1605
1606 #[rstest]
1607 fn test_decode_mbo_msg() {
1608 let path = test_data_path().join("test_data.mbo.dbn.zst");
1609 let mut dbn_stream = Decoder::from_zstd_file(path)
1610 .unwrap()
1611 .decode_stream::<dbn::MboMsg>();
1612 let msg = dbn_stream.next().unwrap().unwrap();
1613
1614 let instrument_id = InstrumentId::from("ESM4.GLBX");
1615 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1616 let delta = delta.unwrap();
1617
1618 assert_eq!(delta.instrument_id, instrument_id);
1619 assert_eq!(delta.action, BookAction::Delete);
1620 assert_eq!(delta.order.side, OrderSide::Sell);
1621 assert_eq!(delta.order.price, Price::from("3722.75"));
1622 assert_eq!(delta.order.size, Quantity::from("1"));
1623 assert_eq!(delta.order.order_id, 647_784_973_705);
1624 assert_eq!(delta.flags, 128);
1625 assert_eq!(delta.sequence, 1_170_352);
1626 assert_eq!(delta.ts_event, msg.ts_recv);
1627 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1628 assert_eq!(delta.ts_init, 0);
1629 }
1630
1631 #[rstest]
1632 fn test_decode_mbo_msg_clear_action() {
1633 let ts_recv = 1_609_160_400_000_000_000;
1635 let msg = dbn::MboMsg {
1636 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1637 order_id: 0,
1638 price: i64::MAX,
1639 size: 0,
1640 flags: dbn::FlagSet::empty(),
1641 channel_id: 0,
1642 action: 'R' as c_char,
1643 side: 'N' as c_char, ts_recv,
1645 ts_in_delta: 0,
1646 sequence: 1_000_000,
1647 };
1648
1649 let instrument_id = InstrumentId::from("ESM4.GLBX");
1650 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1651
1652 assert!(trade.is_none());
1654 let delta = delta.expect("Clear action should produce OrderBookDelta");
1655
1656 assert_eq!(delta.instrument_id, instrument_id);
1657 assert_eq!(delta.action, BookAction::Clear);
1658 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1659 assert_eq!(delta.order.size, Quantity::from("0"));
1660 assert_eq!(delta.order.order_id, 0);
1661 assert_eq!(delta.sequence, 1_000_000);
1662 assert_eq!(delta.ts_event, ts_recv);
1663 assert_eq!(delta.ts_init, 0);
1664 assert!(delta.order.price.is_undefined());
1665 assert_eq!(delta.order.price.precision, 0);
1666 }
1667
1668 #[rstest]
1669 fn test_decode_mbo_msg_price_undef_with_precision() {
1670 let ts_recv = 1_609_160_400_000_000_000;
1672 let msg = dbn::MboMsg {
1673 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1674 order_id: 0,
1675 price: i64::MAX, size: 0,
1677 flags: dbn::FlagSet::empty(),
1678 channel_id: 0,
1679 action: 'R' as c_char, side: 'N' as c_char, ts_recv,
1682 ts_in_delta: 0,
1683 sequence: 0,
1684 };
1685
1686 let instrument_id = InstrumentId::from("ESM4.GLBX");
1687 let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1688 let delta = delta.unwrap();
1689
1690 assert!(delta.order.price.is_undefined());
1691 assert_eq!(delta.order.price.precision, 0);
1692 assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1693 }
1694
1695 #[rstest]
1696 fn test_decode_mbo_msg_no_order_side_update() {
1697 let ts_recv = 1_609_160_400_000_000_000;
1700 let msg = dbn::MboMsg {
1701 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1702 order_id: 123_456_789,
1703 price: 4_800_250_000_000, size: 1,
1705 flags: dbn::FlagSet::empty(),
1706 channel_id: 1,
1707 action: 'M' as c_char, side: 'N' as c_char, ts_recv,
1710 ts_in_delta: 0,
1711 sequence: 1_000_000,
1712 };
1713
1714 let instrument_id = InstrumentId::from("ESM4.GLBX");
1715 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1716
1717 assert!(delta.is_some());
1719 assert!(trade.is_none());
1720 let delta = delta.unwrap();
1721 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1722 assert_eq!(delta.order.order_id, 123_456_789);
1723 assert_eq!(delta.action, BookAction::Update);
1724 }
1725
1726 #[rstest]
1727 fn test_decode_mbp1_msg() {
1728 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1729 let mut dbn_stream = Decoder::from_zstd_file(path)
1730 .unwrap()
1731 .decode_stream::<dbn::Mbp1Msg>();
1732 let msg = dbn_stream.next().unwrap().unwrap();
1733
1734 let instrument_id = InstrumentId::from("ESM4.GLBX");
1735 let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1736
1737 assert_eq!(quote.instrument_id, instrument_id);
1738 assert_eq!(quote.bid_price, Price::from("3720.25"));
1739 assert_eq!(quote.ask_price, Price::from("3720.50"));
1740 assert_eq!(quote.bid_size, Quantity::from("24"));
1741 assert_eq!(quote.ask_size, Quantity::from("11"));
1742 assert_eq!(quote.ts_event, msg.ts_recv);
1743 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1744 assert_eq!(quote.ts_init, 0);
1745 }
1746
1747 #[rstest]
1748 fn test_decode_bbo_1s_msg() {
1749 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1750 let mut dbn_stream = Decoder::from_zstd_file(path)
1751 .unwrap()
1752 .decode_stream::<dbn::BboMsg>();
1753 let msg = dbn_stream.next().unwrap().unwrap();
1754
1755 let instrument_id = InstrumentId::from("ESM4.GLBX");
1756 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1757
1758 assert_eq!(quote.instrument_id, instrument_id);
1759 assert_eq!(quote.bid_price, Price::from("3702.25"));
1760 assert_eq!(quote.ask_price, Price::from("3702.75"));
1761 assert_eq!(quote.bid_size, Quantity::from("18"));
1762 assert_eq!(quote.ask_size, Quantity::from("13"));
1763 assert_eq!(quote.ts_event, msg.ts_recv);
1764 assert_eq!(quote.ts_event, 1609113600000000000);
1765 assert_eq!(quote.ts_init, 0);
1766 }
1767
1768 #[rstest]
1769 fn test_decode_bbo_1m_msg() {
1770 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1771 let mut dbn_stream = Decoder::from_zstd_file(path)
1772 .unwrap()
1773 .decode_stream::<dbn::BboMsg>();
1774 let msg = dbn_stream.next().unwrap().unwrap();
1775
1776 let instrument_id = InstrumentId::from("ESM4.GLBX");
1777 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1778
1779 assert_eq!(quote.instrument_id, instrument_id);
1780 assert_eq!(quote.bid_price, Price::from("3702.25"));
1781 assert_eq!(quote.ask_price, Price::from("3702.75"));
1782 assert_eq!(quote.bid_size, Quantity::from("18"));
1783 assert_eq!(quote.ask_size, Quantity::from("13"));
1784 assert_eq!(quote.ts_event, msg.ts_recv);
1785 assert_eq!(quote.ts_event, 1609113600000000000);
1786 assert_eq!(quote.ts_init, 0);
1787 }
1788
1789 #[rstest]
1790 fn test_decode_mbp10_msg() {
1791 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1792 let mut dbn_stream = Decoder::from_zstd_file(path)
1793 .unwrap()
1794 .decode_stream::<dbn::Mbp10Msg>();
1795 let msg = dbn_stream.next().unwrap().unwrap();
1796
1797 let instrument_id = InstrumentId::from("ESM4.GLBX");
1798 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1799
1800 assert_eq!(depth10.instrument_id, instrument_id);
1801 assert_eq!(depth10.bids.len(), 10);
1802 assert_eq!(depth10.asks.len(), 10);
1803 assert_eq!(depth10.bid_counts.len(), 10);
1804 assert_eq!(depth10.ask_counts.len(), 10);
1805 assert_eq!(depth10.flags, 128);
1806 assert_eq!(depth10.sequence, 1_170_352);
1807 assert_eq!(depth10.ts_event, msg.ts_recv);
1808 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1809 assert_eq!(depth10.ts_init, 0);
1810 }
1811
1812 #[rstest]
1813 fn test_decode_trade_msg() {
1814 let path = test_data_path().join("test_data.trades.dbn.zst");
1815 let mut dbn_stream = Decoder::from_zstd_file(path)
1816 .unwrap()
1817 .decode_stream::<dbn::TradeMsg>();
1818 let msg = dbn_stream.next().unwrap().unwrap();
1819
1820 let instrument_id = InstrumentId::from("ESM4.GLBX");
1821 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1822
1823 assert_eq!(trade.instrument_id, instrument_id);
1824 assert_eq!(trade.price, Price::from("3720.25"));
1825 assert_eq!(trade.size, Quantity::from("5"));
1826 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1827 assert_eq!(trade.trade_id.to_string(), "1170380");
1828 assert_eq!(trade.ts_event, msg.ts_recv);
1829 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1830 assert_eq!(trade.ts_init, 0);
1831 }
1832
1833 #[rstest]
1834 fn test_decode_tbbo_msg() {
1835 let path = test_data_path().join("test_data.tbbo.dbn.zst");
1836 let mut dbn_stream = Decoder::from_zstd_file(path)
1837 .unwrap()
1838 .decode_stream::<dbn::Mbp1Msg>();
1839 let msg = dbn_stream.next().unwrap().unwrap();
1840
1841 let instrument_id = InstrumentId::from("ESM4.GLBX");
1842 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1843
1844 assert_eq!(quote.instrument_id, instrument_id);
1845 assert_eq!(quote.bid_price, Price::from("3720.25"));
1846 assert_eq!(quote.ask_price, Price::from("3720.50"));
1847 assert_eq!(quote.bid_size, Quantity::from("26"));
1848 assert_eq!(quote.ask_size, Quantity::from("7"));
1849 assert_eq!(quote.ts_event, msg.ts_recv);
1850 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1851 assert_eq!(quote.ts_init, 0);
1852
1853 assert_eq!(trade.instrument_id, instrument_id);
1854 assert_eq!(trade.price, Price::from("3720.25"));
1855 assert_eq!(trade.size, Quantity::from("5"));
1856 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1857 assert_eq!(trade.trade_id.to_string(), "1170380");
1858 assert_eq!(trade.ts_event, msg.ts_recv);
1859 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1860 assert_eq!(trade.ts_init, 0);
1861 }
1862
1863 #[rstest]
1864 fn test_decode_ohlcv_msg() {
1865 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1866 let mut dbn_stream = Decoder::from_zstd_file(path)
1867 .unwrap()
1868 .decode_stream::<dbn::OhlcvMsg>();
1869 let msg = dbn_stream.next().unwrap().unwrap();
1870
1871 let instrument_id = InstrumentId::from("ESM4.GLBX");
1872 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1873
1874 assert_eq!(
1875 bar.bar_type,
1876 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1877 );
1878 assert_eq!(bar.open, Price::from("372025.00"));
1879 assert_eq!(bar.high, Price::from("372050.00"));
1880 assert_eq!(bar.low, Price::from("372025.00"));
1881 assert_eq!(bar.close, Price::from("372050.00"));
1882 assert_eq!(bar.volume, Quantity::from("57"));
1883 assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); assert_eq!(bar.ts_init, 0); }
1886
1887 #[rstest]
1888 fn test_decode_definition_msg() {
1889 let path = test_data_path().join("test_data.definition.dbn.zst");
1890 let mut dbn_stream = Decoder::from_zstd_file(path)
1891 .unwrap()
1892 .decode_stream::<dbn::InstrumentDefMsg>();
1893 let msg = dbn_stream.next().unwrap().unwrap();
1894
1895 let instrument_id = InstrumentId::from("ESM4.GLBX");
1896 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1897
1898 assert!(result.is_ok());
1899 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1900 }
1901
1902 #[rstest]
1903 fn test_decode_status_msg() {
1904 let path = test_data_path().join("test_data.status.dbn.zst");
1905 let mut dbn_stream = Decoder::from_zstd_file(path)
1906 .unwrap()
1907 .decode_stream::<dbn::StatusMsg>();
1908 let msg = dbn_stream.next().unwrap().unwrap();
1909
1910 let instrument_id = InstrumentId::from("ESM4.GLBX");
1911 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1912
1913 assert_eq!(status.instrument_id, instrument_id);
1914 assert_eq!(status.action, MarketStatusAction::Trading);
1915 assert_eq!(status.ts_event, msg.hd.ts_event);
1916 assert_eq!(status.ts_init, 0);
1917 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1918 assert_eq!(status.trading_event, None);
1919 assert_eq!(status.is_trading, Some(true));
1920 assert_eq!(status.is_quoting, Some(true));
1921 assert_eq!(status.is_short_sell_restricted, None);
1922 }
1923
1924 #[rstest]
1925 fn test_decode_imbalance_msg() {
1926 let path = test_data_path().join("test_data.imbalance.dbn.zst");
1927 let mut dbn_stream = Decoder::from_zstd_file(path)
1928 .unwrap()
1929 .decode_stream::<dbn::ImbalanceMsg>();
1930 let msg = dbn_stream.next().unwrap().unwrap();
1931
1932 let instrument_id = InstrumentId::from("ESM4.GLBX");
1933 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1934
1935 assert_eq!(imbalance.instrument_id, instrument_id);
1936 assert_eq!(imbalance.ref_price, Price::from("229.43"));
1937 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1938 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1939 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1940 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1941 assert_eq!(imbalance.side, OrderSide::Buy);
1942 assert_eq!(imbalance.significant_imbalance, 126);
1943 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1944 assert_eq!(imbalance.ts_recv, msg.ts_recv);
1945 assert_eq!(imbalance.ts_init, 0);
1946 }
1947
1948 #[rstest]
1949 fn test_decode_statistics_msg() {
1950 let path = test_data_path().join("test_data.statistics.dbn.zst");
1951 let mut dbn_stream = Decoder::from_zstd_file(path)
1952 .unwrap()
1953 .decode_stream::<dbn::StatMsg>();
1954 let msg = dbn_stream.next().unwrap().unwrap();
1955
1956 let instrument_id = InstrumentId::from("ESM4.GLBX");
1957 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1958
1959 assert_eq!(statistics.instrument_id, instrument_id);
1960 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1961 assert_eq!(
1962 statistics.update_action,
1963 DatabentoStatisticUpdateAction::Added
1964 );
1965 assert_eq!(statistics.price, Some(Price::from("100.00")));
1966 assert_eq!(statistics.quantity, None);
1967 assert_eq!(statistics.channel_id, 13);
1968 assert_eq!(statistics.stat_flags, 255);
1969 assert_eq!(statistics.sequence, 2);
1970 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1971 assert_eq!(statistics.ts_in_delta, 26961);
1972 assert_eq!(statistics.ts_event, msg.hd.ts_event);
1973 assert_eq!(statistics.ts_recv, msg.ts_recv);
1974 assert_eq!(statistics.ts_init, 0);
1975 }
1976
1977 #[rstest]
1978 fn test_decode_cmbp1_msg() {
1979 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1980 let mut dbn_stream = Decoder::from_zstd_file(path)
1981 .unwrap()
1982 .decode_stream::<dbn::Cmbp1Msg>();
1983 let msg = dbn_stream.next().unwrap().unwrap();
1984
1985 let instrument_id = InstrumentId::from("ESM4.GLBX");
1986 let (quote, trade) = decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1987
1988 assert_eq!(quote.instrument_id, instrument_id);
1989 assert!(quote.bid_price.raw > 0);
1990 assert!(quote.ask_price.raw > 0);
1991 assert!(quote.bid_size.raw > 0);
1992 assert!(quote.ask_size.raw > 0);
1993 assert_eq!(quote.ts_event, msg.ts_recv);
1994 assert_eq!(quote.ts_init, 0);
1995
1996 if msg.action as u8 as char == 'T' {
1998 assert!(trade.is_some());
1999 let trade = trade.unwrap();
2000 assert_eq!(trade.instrument_id, instrument_id);
2001 } else {
2002 assert!(trade.is_none());
2003 }
2004 }
2005
2006 #[rstest]
2007 fn test_decode_cbbo_1s_msg() {
2008 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2009 let mut dbn_stream = Decoder::from_zstd_file(path)
2010 .unwrap()
2011 .decode_stream::<dbn::CbboMsg>();
2012 let msg = dbn_stream.next().unwrap().unwrap();
2013
2014 let instrument_id = InstrumentId::from("ESM4.GLBX");
2015 let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2016
2017 assert_eq!(quote.instrument_id, instrument_id);
2018 assert!(quote.bid_price.raw > 0);
2019 assert!(quote.ask_price.raw > 0);
2020 assert!(quote.bid_size.raw > 0);
2021 assert!(quote.ask_size.raw > 0);
2022 assert_eq!(quote.ts_event, msg.ts_recv);
2023 assert_eq!(quote.ts_init, 0);
2024 }
2025
2026 #[rstest]
2027 fn test_decode_mbp10_msg_with_all_levels() {
2028 let mut msg = dbn::Mbp10Msg::default();
2029 for i in 0..10 {
2030 msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2031 msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2032 msg.levels[i].bid_sz = 10 + i as u32;
2033 msg.levels[i].ask_sz = 10 + i as u32;
2034 msg.levels[i].bid_ct = 1 + i as u32;
2035 msg.levels[i].ask_ct = 1 + i as u32;
2036 }
2037 msg.ts_recv = 1_609_160_400_000_704_060;
2038
2039 let instrument_id = InstrumentId::from("TEST.VENUE");
2040 let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2041
2042 assert!(result.is_ok());
2043 let depth = result.unwrap();
2044 assert_eq!(depth.bids.len(), 10);
2045 assert_eq!(depth.asks.len(), 10);
2046 assert_eq!(depth.bid_counts.len(), 10);
2047 assert_eq!(depth.ask_counts.len(), 10);
2048 }
2049
2050 #[rstest]
2051 fn test_array_conversion_error_handling() {
2052 let mut bids = Vec::new();
2053 let mut asks = Vec::new();
2054
2055 for i in 0..5 {
2057 bids.push(BookOrder::new(
2058 OrderSide::Buy,
2059 Price::from(format!("{}.00", 100 - i)),
2060 Quantity::from(10),
2061 i as u64,
2062 ));
2063 asks.push(BookOrder::new(
2064 OrderSide::Sell,
2065 Price::from(format!("{}.00", 101 + i)),
2066 Quantity::from(10),
2067 i as u64,
2068 ));
2069 }
2070
2071 let result: Result<[BookOrder; DEPTH10_LEN], _> =
2072 bids.try_into().map_err(|v: Vec<BookOrder>| {
2073 anyhow::anyhow!(
2074 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2075 v.len()
2076 )
2077 });
2078 assert!(result.is_err());
2079 assert!(
2080 result
2081 .unwrap_err()
2082 .to_string()
2083 .contains("Expected exactly 10 bid levels, received 5")
2084 );
2085 }
2086
2087 #[rstest]
2088 fn test_decode_tcbbo_msg() {
2089 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2091 let mut dbn_stream = Decoder::from_zstd_file(path)
2092 .unwrap()
2093 .decode_stream::<dbn::CbboMsg>();
2094 let msg = dbn_stream.next().unwrap().unwrap();
2095
2096 let mut tcbbo_msg = msg.clone();
2098 tcbbo_msg.price = 3702500000000;
2099 tcbbo_msg.size = 10;
2100
2101 let instrument_id = InstrumentId::from("ESM4.GLBX");
2102 let (quote, trade) =
2103 decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2104
2105 assert_eq!(quote.instrument_id, instrument_id);
2106 assert!(quote.bid_price.raw > 0);
2107 assert!(quote.ask_price.raw > 0);
2108 assert!(quote.bid_size.raw > 0);
2109 assert!(quote.ask_size.raw > 0);
2110 assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2111 assert_eq!(quote.ts_init, 0);
2112
2113 assert_eq!(trade.instrument_id, instrument_id);
2114 assert_eq!(trade.price, Price::from("3702.50"));
2115 assert_eq!(trade.size, Quantity::from(10));
2116 assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2117 assert_eq!(trade.ts_init, 0);
2118 }
2119
2120 #[rstest]
2121 fn test_decode_bar_type() {
2122 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2123 let instrument_id = InstrumentId::from("ESM4.GLBX");
2124
2125 msg.hd.rtype = 32;
2127 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2128 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2129
2130 msg.hd.rtype = 33;
2132 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2133 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2134
2135 msg.hd.rtype = 34;
2137 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2138 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2139
2140 msg.hd.rtype = 35;
2142 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2143 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2144
2145 msg.hd.rtype = 99;
2147 let result = decode_bar_type(&msg, instrument_id);
2148 assert!(result.is_err());
2149 }
2150
2151 #[rstest]
2152 fn test_decode_ts_event_adjustment() {
2153 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2154
2155 msg.hd.rtype = 32;
2157 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2158 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2159
2160 msg.hd.rtype = 33;
2162 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2163 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2164
2165 msg.hd.rtype = 34;
2167 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2168 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2169
2170 msg.hd.rtype = 35;
2172 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2173 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2174
2175 msg.hd.rtype = 36;
2177 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2178 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2179
2180 msg.hd.rtype = 99;
2182 let result = decode_ts_event_adjustment(&msg);
2183 assert!(result.is_err());
2184 }
2185
2186 #[rstest]
2187 fn test_decode_record() {
2188 let path = test_data_path().join("test_data.mbo.dbn.zst");
2190 let decoder = Decoder::from_zstd_file(path).unwrap();
2191 let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2192 let msg = dbn_stream.next().unwrap().unwrap();
2193
2194 let record_ref = dbn::RecordRef::from(msg);
2195 let instrument_id = InstrumentId::from("ESM4.GLBX");
2196
2197 let (data1, data2) =
2198 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2199
2200 assert!(data1.is_some());
2201 assert!(data2.is_none());
2202
2203 let path = test_data_path().join("test_data.trades.dbn.zst");
2205 let decoder = Decoder::from_zstd_file(path).unwrap();
2206 let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2207 let msg = dbn_stream.next().unwrap().unwrap();
2208
2209 let record_ref = dbn::RecordRef::from(msg);
2210
2211 let (data1, data2) =
2212 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2213
2214 assert!(data1.is_some());
2215 assert!(data2.is_none());
2216 assert!(matches!(data1.unwrap(), Data::Trade(_)));
2217 }
2218}