1use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24 },
25 enums::{
26 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28 },
29 identifiers::{InstrumentId, TradeId},
30 instruments::{
31 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32 },
33 types::{
34 Currency, Price, Quantity,
35 price::{PRICE_UNDEF, decode_raw_price_i64},
36 },
37};
38use ustr::Ustr;
39
40use super::{
41 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
42 types::{DatabentoImbalance, DatabentoStatistics},
43};
44
45const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
47
48const BAR_SPEC_1S: BarSpecification = BarSpecification {
49 step: STEP_ONE,
50 aggregation: BarAggregation::Second,
51 price_type: PriceType::Last,
52};
53const BAR_SPEC_1M: BarSpecification = BarSpecification {
54 step: STEP_ONE,
55 aggregation: BarAggregation::Minute,
56 price_type: PriceType::Last,
57};
58const BAR_SPEC_1H: BarSpecification = BarSpecification {
59 step: STEP_ONE,
60 aggregation: BarAggregation::Hour,
61 price_type: PriceType::Last,
62};
63const BAR_SPEC_1D: BarSpecification = BarSpecification {
64 step: STEP_ONE,
65 aggregation: BarAggregation::Day,
66 price_type: PriceType::Last,
67};
68
69const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
70const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
71const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
72const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
73
74#[must_use]
75pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
76 match c as u8 as char {
77 'Y' => Some(true),
78 'N' => Some(false),
79 _ => None,
80 }
81}
82
83#[must_use]
84pub const fn parse_order_side(c: c_char) -> OrderSide {
85 match c as u8 as char {
86 'A' => OrderSide::Sell,
87 'B' => OrderSide::Buy,
88 _ => OrderSide::NoOrderSide,
89 }
90}
91
92#[must_use]
93pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
94 match c as u8 as char {
95 'A' => AggressorSide::Seller,
96 'B' => AggressorSide::Buyer,
97 _ => AggressorSide::NoAggressor,
98 }
99}
100
101pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
107 match c as u8 as char {
108 'A' => Ok(BookAction::Add),
109 'C' => Ok(BookAction::Delete),
110 'F' => Ok(BookAction::Update),
111 'M' => Ok(BookAction::Update),
112 'R' => Ok(BookAction::Clear),
113 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
114 }
115}
116
117pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
123 match c as u8 as char {
124 'C' => Ok(OptionKind::Call),
125 'P' => Ok(OptionKind::Put),
126 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
127 }
128}
129
130fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
131 match value {
132 Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
133 tracing::warn!("Unknown currency code '{value}', defaulting to USD");
134 Currency::USD()
135 }),
136 Ok(_) => Currency::USD(),
137 Err(e) => {
138 tracing::error!("Error parsing currency: {e}");
139 Currency::USD()
140 }
141 }
142}
143
144pub fn parse_cfi_iso10926(
150 value: &str,
151) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
152 let chars: Vec<char> = value.chars().collect();
153 if chars.len() < 3 {
154 anyhow::bail!("Value string is too short");
155 }
156
157 let cfi_category = chars[0];
159 let cfi_group = chars[1];
160 let cfi_attribute1 = chars[2];
161 let mut asset_class = match cfi_category {
166 'D' => Some(AssetClass::Debt),
167 'E' => Some(AssetClass::Equity),
168 'S' => None,
169 _ => None,
170 };
171
172 let instrument_class = match cfi_group {
173 'I' => Some(InstrumentClass::Future),
174 _ => None,
175 };
176
177 if cfi_attribute1 == 'I' {
178 asset_class = Some(AssetClass::Index);
179 }
180
181 Ok((asset_class, instrument_class))
182}
183
184pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
192 let value_str = match value {
193 0 => return Ok(None),
194 1 => "Scheduled",
195 2 => "Surveillance intervention",
196 3 => "Market event",
197 4 => "Instrument activation",
198 5 => "Instrument expiration",
199 6 => "Recovery in process",
200 10 => "Regulatory",
201 11 => "Administrative",
202 12 => "Non-compliance",
203 13 => "Filings not current",
204 14 => "SEC trading suspension",
205 15 => "New issue",
206 16 => "Issue available",
207 17 => "Issues reviewed",
208 18 => "Filing requirements satisfied",
209 30 => "News pending",
210 31 => "News released",
211 32 => "News and resumption times",
212 33 => "News not forthcoming",
213 40 => "Order imbalance",
214 50 => "LULD pause",
215 60 => "Operational",
216 70 => "Additional information requested",
217 80 => "Merger effective",
218 90 => "ETF",
219 100 => "Corporate action",
220 110 => "New Security offering",
221 120 => "Market wide halt level 1",
222 121 => "Market wide halt level 2",
223 122 => "Market wide halt level 3",
224 123 => "Market wide halt carryover",
225 124 => "Market wide halt resumption",
226 130 => "Quotation not available",
227 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
228 };
229
230 Ok(Some(Ustr::from(value_str)))
231}
232
233pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
239 let value_str = match value {
240 0 => return Ok(None),
241 1 => "No cancel",
242 2 => "Change trading session",
243 3 => "Implied matching on",
244 4 => "Implied matching off",
245 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
246 };
247
248 Ok(Some(Ustr::from(value_str)))
249}
250
251#[must_use]
253pub fn decode_price(value: i64, precision: u8) -> Price {
254 Price::from_raw(decode_raw_price_i64(value), precision)
255}
256
257#[must_use]
259pub fn decode_quantity(value: u64) -> Quantity {
260 Quantity::from(value)
261}
262
263#[must_use]
265pub fn decode_price_increment(value: i64, precision: u8) -> Price {
266 match value {
267 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
268 _ => decode_price(value, precision),
269 }
270}
271
272#[must_use]
274pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
275 match value {
276 i64::MAX => None,
277 _ => Some(decode_price(value, precision)),
278 }
279}
280
281#[must_use]
283pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
284 match value {
285 i64::MAX => None,
286 _ => Some(Quantity::from(value)),
287 }
288}
289
290pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
297 match value {
298 0 | i64::MAX => Ok(Quantity::from(1)),
299 v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
300 v => {
301 let abs = v as u128;
304
305 const SCALE: u128 = 1_000_000_000;
306 let int_part = abs / SCALE;
307 let frac_part = abs % SCALE;
308
309 if frac_part == 0 {
312 Ok(Quantity::from(int_part as u64))
314 } else {
315 let mut frac_str = format!("{frac_part:09}");
316 while frac_str.ends_with('0') {
317 frac_str.pop();
318 }
319 let s = format!("{int_part}.{frac_str}");
320 Ok(Quantity::from(s))
321 }
322 }
323 }
324}
325
326#[must_use]
328pub fn decode_lot_size(value: i32) -> Quantity {
329 match value {
330 0 | i32::MAX => Quantity::from(1),
331 value => Quantity::from(value),
332 }
333}
334
335#[must_use]
336fn is_trade_msg(action: c_char) -> bool {
337 action as u8 as char == 'T'
338}
339
340pub fn decode_mbo_msg(
349 msg: &dbn::MboMsg,
350 instrument_id: InstrumentId,
351 price_precision: u8,
352 ts_init: Option<UnixNanos>,
353 include_trades: bool,
354) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
355 let side = parse_order_side(msg.side);
356 if is_trade_msg(msg.action) {
357 if include_trades && msg.size > 0 {
358 let price = Price::from_raw(decode_raw_price_i64(msg.price), price_precision);
359 let size = Quantity::from(msg.size);
360 let aggressor_side = parse_aggressor_side(msg.side);
361 let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
362 let ts_event = msg.ts_recv.into();
363 let ts_init = ts_init.unwrap_or(ts_event);
364
365 let trade = TradeTick::new(
366 instrument_id,
367 price,
368 size,
369 aggressor_side,
370 trade_id,
371 ts_event,
372 ts_init,
373 );
374 return Ok((None, Some(trade)));
375 }
376
377 return Ok((None, None));
378 }
379
380 let action = parse_book_action(msg.action)?;
381 let price = if msg.price == i64::MAX {
382 Price::from_raw(PRICE_UNDEF, 0)
383 } else {
384 Price::from_raw(decode_raw_price_i64(msg.price), price_precision)
385 };
386 let size = Quantity::from(msg.size);
387 let order = BookOrder::new(side, price, size, msg.order_id);
388
389 let ts_event = msg.ts_recv.into();
390 let ts_init = ts_init.unwrap_or(ts_event);
391
392 let delta = OrderBookDelta::new(
393 instrument_id,
394 action,
395 order,
396 msg.flags.raw(),
397 msg.sequence.into(),
398 ts_event,
399 ts_init,
400 );
401
402 Ok((Some(delta), None))
403}
404
405pub fn decode_trade_msg(
411 msg: &dbn::TradeMsg,
412 instrument_id: InstrumentId,
413 price_precision: u8,
414 ts_init: Option<UnixNanos>,
415) -> anyhow::Result<TradeTick> {
416 let ts_event = msg.ts_recv.into();
417 let ts_init = ts_init.unwrap_or(ts_event);
418
419 let trade = TradeTick::new(
420 instrument_id,
421 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
422 Quantity::from(msg.size),
423 parse_aggressor_side(msg.side),
424 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
425 ts_event,
426 ts_init,
427 );
428
429 Ok(trade)
430}
431
432pub fn decode_tbbo_msg(
438 msg: &dbn::TbboMsg,
439 instrument_id: InstrumentId,
440 price_precision: u8,
441 ts_init: Option<UnixNanos>,
442) -> anyhow::Result<(QuoteTick, TradeTick)> {
443 let top_level = &msg.levels[0];
444 let ts_event = msg.ts_recv.into();
445 let ts_init = ts_init.unwrap_or(ts_event);
446
447 let quote = QuoteTick::new(
448 instrument_id,
449 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
450 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
451 Quantity::from(top_level.bid_sz),
452 Quantity::from(top_level.ask_sz),
453 ts_event,
454 ts_init,
455 );
456
457 let trade = TradeTick::new(
458 instrument_id,
459 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
460 Quantity::from(msg.size),
461 parse_aggressor_side(msg.side),
462 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
463 ts_event,
464 ts_init,
465 );
466
467 Ok((quote, trade))
468}
469
470pub fn decode_mbp1_msg(
476 msg: &dbn::Mbp1Msg,
477 instrument_id: InstrumentId,
478 price_precision: u8,
479 ts_init: Option<UnixNanos>,
480 include_trades: bool,
481) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
482 let top_level = &msg.levels[0];
483 let ts_event = msg.ts_recv.into();
484 let ts_init = ts_init.unwrap_or(ts_event);
485
486 let quote = QuoteTick::new(
487 instrument_id,
488 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
489 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
490 Quantity::from(top_level.bid_sz),
491 Quantity::from(top_level.ask_sz),
492 ts_event,
493 ts_init,
494 );
495
496 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
497 Some(TradeTick::new(
498 instrument_id,
499 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
500 Quantity::from(msg.size),
501 parse_aggressor_side(msg.side),
502 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
503 ts_event,
504 ts_init,
505 ))
506 } else {
507 None
508 };
509
510 Ok((quote, maybe_trade))
511}
512
513pub fn decode_bbo_msg(
519 msg: &dbn::BboMsg,
520 instrument_id: InstrumentId,
521 price_precision: u8,
522 ts_init: Option<UnixNanos>,
523) -> anyhow::Result<QuoteTick> {
524 let top_level = &msg.levels[0];
525 let ts_event = msg.ts_recv.into();
526 let ts_init = ts_init.unwrap_or(ts_event);
527
528 let quote = QuoteTick::new(
529 instrument_id,
530 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
531 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
532 Quantity::from(top_level.bid_sz),
533 Quantity::from(top_level.ask_sz),
534 ts_event,
535 ts_init,
536 );
537
538 Ok(quote)
539}
540
541pub fn decode_mbp10_msg(
547 msg: &dbn::Mbp10Msg,
548 instrument_id: InstrumentId,
549 price_precision: u8,
550 ts_init: Option<UnixNanos>,
551) -> anyhow::Result<OrderBookDepth10> {
552 let mut bids = Vec::with_capacity(DEPTH10_LEN);
553 let mut asks = Vec::with_capacity(DEPTH10_LEN);
554 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
555 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
556
557 for level in &msg.levels {
558 let bid_order = BookOrder::new(
559 OrderSide::Buy,
560 Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
561 Quantity::from(level.bid_sz),
562 0,
563 );
564
565 let ask_order = BookOrder::new(
566 OrderSide::Sell,
567 Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
568 Quantity::from(level.ask_sz),
569 0,
570 );
571
572 bids.push(bid_order);
573 asks.push(ask_order);
574 bid_counts.push(level.bid_ct);
575 ask_counts.push(level.ask_ct);
576 }
577
578 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
579 anyhow::anyhow!(
580 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
581 v.len()
582 )
583 })?;
584
585 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
586 anyhow::anyhow!(
587 "Expected exactly {DEPTH10_LEN} ask levels, received {}",
588 v.len()
589 )
590 })?;
591
592 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
593 anyhow::anyhow!(
594 "Expected exactly {DEPTH10_LEN} bid counts, received {}",
595 v.len()
596 )
597 })?;
598
599 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
600 anyhow::anyhow!(
601 "Expected exactly {DEPTH10_LEN} ask counts, received {}",
602 v.len()
603 )
604 })?;
605
606 let ts_event = msg.ts_recv.into();
607 let ts_init = ts_init.unwrap_or(ts_event);
608
609 let depth = OrderBookDepth10::new(
610 instrument_id,
611 bids,
612 asks,
613 bid_counts,
614 ask_counts,
615 msg.flags.raw(),
616 msg.sequence.into(),
617 ts_event,
618 ts_init,
619 );
620
621 Ok(depth)
622}
623
624pub fn decode_cmbp1_msg(
632 msg: &dbn::Cmbp1Msg,
633 instrument_id: InstrumentId,
634 price_precision: u8,
635 ts_init: Option<UnixNanos>,
636 include_trades: bool,
637) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
638 let top_level = &msg.levels[0];
639 let ts_event = msg.ts_recv.into();
640 let ts_init = ts_init.unwrap_or(ts_event);
641
642 let quote = QuoteTick::new(
643 instrument_id,
644 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
645 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
646 Quantity::from(top_level.bid_sz),
647 Quantity::from(top_level.ask_sz),
648 ts_event,
649 ts_init,
650 );
651
652 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
653 Some(TradeTick::new(
655 instrument_id,
656 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
657 Quantity::from(msg.size),
658 parse_aggressor_side(msg.side),
659 TradeId::new(UUID4::new().to_string()),
660 ts_event,
661 ts_init,
662 ))
663 } else {
664 None
665 };
666
667 Ok((quote, maybe_trade))
668}
669
670pub fn decode_cbbo_msg(
678 msg: &dbn::CbboMsg,
679 instrument_id: InstrumentId,
680 price_precision: u8,
681 ts_init: Option<UnixNanos>,
682) -> anyhow::Result<QuoteTick> {
683 let top_level = &msg.levels[0];
684 let ts_event = msg.ts_recv.into();
685 let ts_init = ts_init.unwrap_or(ts_event);
686
687 let quote = QuoteTick::new(
688 instrument_id,
689 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
690 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
691 Quantity::from(top_level.bid_sz),
692 Quantity::from(top_level.ask_sz),
693 ts_event,
694 ts_init,
695 );
696
697 Ok(quote)
698}
699
700pub fn decode_tcbbo_msg(
708 msg: &dbn::CbboMsg,
709 instrument_id: InstrumentId,
710 price_precision: u8,
711 ts_init: Option<UnixNanos>,
712) -> anyhow::Result<(QuoteTick, TradeTick)> {
713 let top_level = &msg.levels[0];
714 let ts_event = msg.ts_recv.into();
715 let ts_init = ts_init.unwrap_or(ts_event);
716
717 let quote = QuoteTick::new(
718 instrument_id,
719 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
720 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
721 Quantity::from(top_level.bid_sz),
722 Quantity::from(top_level.ask_sz),
723 ts_event,
724 ts_init,
725 );
726
727 let trade = TradeTick::new(
729 instrument_id,
730 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
731 Quantity::from(msg.size),
732 parse_aggressor_side(msg.side),
733 TradeId::new(UUID4::new().to_string()),
734 ts_event,
735 ts_init,
736 );
737
738 Ok((quote, trade))
739}
740
741pub fn decode_bar_type(
745 msg: &dbn::OhlcvMsg,
746 instrument_id: InstrumentId,
747) -> anyhow::Result<BarType> {
748 let bar_type = match msg.hd.rtype {
749 32 => {
750 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
752 }
753 33 => {
754 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
756 }
757 34 => {
758 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
760 }
761 35 => {
762 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
764 }
765 36 => {
766 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
768 }
769 _ => anyhow::bail!(
770 "`rtype` is not a supported bar aggregation, was {}",
771 msg.hd.rtype
772 ),
773 };
774
775 Ok(bar_type)
776}
777
778pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
782 let adjustment = match msg.hd.rtype {
783 32 => {
784 BAR_CLOSE_ADJUSTMENT_1S
786 }
787 33 => {
788 BAR_CLOSE_ADJUSTMENT_1M
790 }
791 34 => {
792 BAR_CLOSE_ADJUSTMENT_1H
794 }
795 35 | 36 => {
796 BAR_CLOSE_ADJUSTMENT_1D
798 }
799 _ => anyhow::bail!(
800 "`rtype` is not a supported bar aggregation, was {}",
801 msg.hd.rtype
802 ),
803 };
804
805 Ok(adjustment.into())
806}
807
808pub fn decode_ohlcv_msg(
812 msg: &dbn::OhlcvMsg,
813 instrument_id: InstrumentId,
814 price_precision: u8,
815 ts_init: Option<UnixNanos>,
816 timestamp_on_close: bool,
817) -> anyhow::Result<Bar> {
818 let bar_type = decode_bar_type(msg, instrument_id)?;
819 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
820
821 let ts_event_raw = msg.hd.ts_event.into();
822 let ts_close = ts_event_raw + ts_event_adjustment;
823 let ts_init = ts_init.unwrap_or(ts_close); let ts_event = if timestamp_on_close {
826 ts_close
827 } else {
828 ts_event_raw
829 };
830
831 let bar = Bar::new(
832 bar_type,
833 Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
834 Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
835 Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
836 Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
837 Quantity::from(msg.volume),
838 ts_event,
839 ts_init,
840 );
841
842 Ok(bar)
843}
844
845pub fn decode_status_msg(
851 msg: &dbn::StatusMsg,
852 instrument_id: InstrumentId,
853 ts_init: Option<UnixNanos>,
854) -> anyhow::Result<InstrumentStatus> {
855 let ts_event = msg.hd.ts_event.into();
856 let ts_init = ts_init.unwrap_or(ts_event);
857
858 let action = MarketStatusAction::from_u16(msg.action)
859 .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
860
861 let status = InstrumentStatus::new(
862 instrument_id,
863 action,
864 ts_event,
865 ts_init,
866 parse_status_reason(msg.reason)?,
867 parse_status_trading_event(msg.trading_event)?,
868 parse_optional_bool(msg.is_trading),
869 parse_optional_bool(msg.is_quoting),
870 parse_optional_bool(msg.is_short_sell_restricted),
871 );
872
873 Ok(status)
874}
875
876pub fn decode_record(
880 record: &dbn::RecordRef,
881 instrument_id: InstrumentId,
882 price_precision: u8,
883 ts_init: Option<UnixNanos>,
884 include_trades: bool,
885 bars_timestamp_on_close: bool,
886) -> anyhow::Result<(Option<Data>, Option<Data>)> {
887 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
891 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
892 let result = decode_mbo_msg(
893 msg,
894 instrument_id,
895 price_precision,
896 Some(ts_init),
897 include_trades,
898 )?;
899 match result {
900 (Some(delta), None) => (Some(Data::Delta(delta)), None),
901 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
902 (None, None) => (None, None),
903 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
904 }
905 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
906 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
907 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
908 (Some(Data::Trade(trade)), None)
909 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
910 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
911 let result = decode_mbp1_msg(
912 msg,
913 instrument_id,
914 price_precision,
915 Some(ts_init),
916 include_trades,
917 )?;
918 match result {
919 (quote, None) => (Some(Data::Quote(quote)), None),
920 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
921 }
922 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
923 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
924 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
925 (Some(Data::Quote(quote)), None)
926 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
927 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
928 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
929 (Some(Data::Quote(quote)), None)
930 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
931 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
932 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
933 (Some(Data::from(depth)), None)
934 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
935 let bar = decode_ohlcv_msg(
938 msg,
939 instrument_id,
940 price_precision,
941 ts_init,
942 bars_timestamp_on_close,
943 )?;
944 (Some(Data::Bar(bar)), None)
945 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
946 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
947 let result = decode_cmbp1_msg(
948 msg,
949 instrument_id,
950 price_precision,
951 Some(ts_init),
952 include_trades,
953 )?;
954 match result {
955 (quote, None) => (Some(Data::Quote(quote)), None),
956 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
957 }
958 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
959 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
961 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
962 (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
963 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
964 if msg.price != i64::MAX && msg.size > 0 {
966 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
968 let (quote, trade) =
969 decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
970 (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
971 } else {
972 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
974 let quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
975 (Some(Data::Quote(quote)), None)
976 }
977 } else {
978 anyhow::bail!("DBN message type is not currently supported")
979 };
980
981 Ok(result)
982}
983
984const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
985 match ts_init {
986 Some(ts_init) => ts_init,
987 None => msg_timestamp,
988 }
989}
990
991pub fn decode_instrument_def_msg(
995 msg: &dbn::InstrumentDefMsg,
996 instrument_id: InstrumentId,
997 ts_init: Option<UnixNanos>,
998) -> anyhow::Result<InstrumentAny> {
999 match msg.instrument_class as u8 as char {
1000 'K' => Ok(InstrumentAny::Equity(decode_equity(
1001 msg,
1002 instrument_id,
1003 ts_init,
1004 )?)),
1005 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1006 msg,
1007 instrument_id,
1008 ts_init,
1009 )?)),
1010 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1011 msg,
1012 instrument_id,
1013 ts_init,
1014 )?)),
1015 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1016 msg,
1017 instrument_id,
1018 ts_init,
1019 )?)),
1020 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1021 msg,
1022 instrument_id,
1023 ts_init,
1024 )?)),
1025 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1026 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1027 _ => anyhow::bail!(
1028 "Unsupported `instrument_class` '{}'",
1029 msg.instrument_class as u8 as char
1030 ),
1031 }
1032}
1033
1034pub fn decode_equity(
1040 msg: &dbn::InstrumentDefMsg,
1041 instrument_id: InstrumentId,
1042 ts_init: Option<UnixNanos>,
1043) -> anyhow::Result<Equity> {
1044 let currency = parse_currency_or_usd_default(msg.currency());
1045 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1046 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1047 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1049
1050 Ok(Equity::new(
1051 instrument_id,
1052 instrument_id.symbol,
1053 None, currency,
1055 price_increment.precision,
1056 price_increment,
1057 Some(lot_size),
1058 None, None, None, None, None, None, None, None, ts_event,
1067 ts_init,
1068 ))
1069}
1070
1071pub fn decode_futures_contract(
1077 msg: &dbn::InstrumentDefMsg,
1078 instrument_id: InstrumentId,
1079 ts_init: Option<UnixNanos>,
1080) -> anyhow::Result<FuturesContract> {
1081 let currency = parse_currency_or_usd_default(msg.currency());
1082 let exchange = Ustr::from(msg.exchange()?);
1083 let underlying = Ustr::from(msg.asset()?);
1084 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1085 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1086 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1087 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1088 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1090
1091 FuturesContract::new_checked(
1092 instrument_id,
1093 instrument_id.symbol,
1094 asset_class.unwrap_or(AssetClass::Commodity),
1095 Some(exchange),
1096 underlying,
1097 msg.activation.into(),
1098 msg.expiration.into(),
1099 currency,
1100 price_increment.precision,
1101 price_increment,
1102 multiplier,
1103 lot_size,
1104 None, None, None, None, None, None, None, None, ts_event,
1113 ts_init,
1114 )
1115}
1116
1117pub fn decode_futures_spread(
1123 msg: &dbn::InstrumentDefMsg,
1124 instrument_id: InstrumentId,
1125 ts_init: Option<UnixNanos>,
1126) -> anyhow::Result<FuturesSpread> {
1127 let exchange = Ustr::from(msg.exchange()?);
1128 let underlying = Ustr::from(msg.asset()?);
1129 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1130 let strategy_type = Ustr::from(msg.secsubtype()?);
1131 let currency = parse_currency_or_usd_default(msg.currency());
1132 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1133 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1134 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1135 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1137
1138 FuturesSpread::new_checked(
1139 instrument_id,
1140 instrument_id.symbol,
1141 asset_class.unwrap_or(AssetClass::Commodity),
1142 Some(exchange),
1143 underlying,
1144 strategy_type,
1145 msg.activation.into(),
1146 msg.expiration.into(),
1147 currency,
1148 price_increment.precision,
1149 price_increment,
1150 multiplier,
1151 lot_size,
1152 None, None, None, None, None, None, None, None, ts_event,
1161 ts_init,
1162 )
1163}
1164
1165pub fn decode_option_contract(
1171 msg: &dbn::InstrumentDefMsg,
1172 instrument_id: InstrumentId,
1173 ts_init: Option<UnixNanos>,
1174) -> anyhow::Result<OptionContract> {
1175 let currency = parse_currency_or_usd_default(msg.currency());
1176 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1177 let exchange = Ustr::from(msg.exchange()?);
1178 let underlying = Ustr::from(msg.underlying()?);
1179 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1180 Some(AssetClass::Equity)
1181 } else {
1182 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1183 asset_class
1184 };
1185 let option_kind = parse_option_kind(msg.instrument_class)?;
1186 let strike_price = Price::from_raw(
1187 decode_raw_price_i64(msg.strike_price),
1188 strike_price_currency.precision,
1189 );
1190 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1191 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1192 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1193 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1195
1196 OptionContract::new_checked(
1197 instrument_id,
1198 instrument_id.symbol,
1199 asset_class_opt.unwrap_or(AssetClass::Commodity),
1200 Some(exchange),
1201 underlying,
1202 option_kind,
1203 strike_price,
1204 currency,
1205 msg.activation.into(),
1206 msg.expiration.into(),
1207 price_increment.precision,
1208 price_increment,
1209 multiplier,
1210 lot_size,
1211 None, None, None, None, None, None, None, None, ts_event,
1220 ts_init,
1221 )
1222}
1223
1224pub fn decode_option_spread(
1230 msg: &dbn::InstrumentDefMsg,
1231 instrument_id: InstrumentId,
1232 ts_init: Option<UnixNanos>,
1233) -> anyhow::Result<OptionSpread> {
1234 let exchange = Ustr::from(msg.exchange()?);
1235 let underlying = Ustr::from(msg.underlying()?);
1236 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1237 Some(AssetClass::Equity)
1238 } else {
1239 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1240 asset_class
1241 };
1242 let strategy_type = Ustr::from(msg.secsubtype()?);
1243 let currency = parse_currency_or_usd_default(msg.currency());
1244 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1245 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1246 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1247 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1249
1250 OptionSpread::new_checked(
1251 instrument_id,
1252 instrument_id.symbol,
1253 asset_class_opt.unwrap_or(AssetClass::Commodity),
1254 Some(exchange),
1255 underlying,
1256 strategy_type,
1257 msg.activation.into(),
1258 msg.expiration.into(),
1259 currency,
1260 price_increment.precision,
1261 price_increment,
1262 multiplier,
1263 lot_size,
1264 None, None, None, None, None, None, None, None, ts_event,
1273 ts_init,
1274 )
1275}
1276
1277pub fn decode_imbalance_msg(
1283 msg: &dbn::ImbalanceMsg,
1284 instrument_id: InstrumentId,
1285 price_precision: u8,
1286 ts_init: Option<UnixNanos>,
1287) -> anyhow::Result<DatabentoImbalance> {
1288 let ts_event = msg.ts_recv.into();
1289 let ts_init = ts_init.unwrap_or(ts_event);
1290
1291 Ok(DatabentoImbalance::new(
1292 instrument_id,
1293 Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1294 Price::from_raw(
1295 decode_raw_price_i64(msg.cont_book_clr_price),
1296 price_precision,
1297 ),
1298 Price::from_raw(
1299 decode_raw_price_i64(msg.auct_interest_clr_price),
1300 price_precision,
1301 ),
1302 Quantity::new(f64::from(msg.paired_qty), 0),
1303 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1304 parse_order_side(msg.side),
1305 msg.significant_imbalance as c_char,
1306 msg.hd.ts_event.into(),
1307 ts_event,
1308 ts_init,
1309 ))
1310}
1311
1312pub fn decode_statistics_msg(
1319 msg: &dbn::StatMsg,
1320 instrument_id: InstrumentId,
1321 price_precision: u8,
1322 ts_init: Option<UnixNanos>,
1323) -> anyhow::Result<DatabentoStatistics> {
1324 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1325 .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1326 let update_action =
1327 DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1328 anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1329 })?;
1330 let ts_event = msg.ts_recv.into();
1331 let ts_init = ts_init.unwrap_or(ts_event);
1332
1333 Ok(DatabentoStatistics::new(
1334 instrument_id,
1335 stat_type,
1336 update_action,
1337 decode_optional_price(msg.price, price_precision),
1338 decode_optional_quantity(msg.quantity),
1339 msg.channel_id,
1340 msg.stat_flags,
1341 msg.sequence,
1342 msg.ts_ref.into(),
1343 msg.ts_in_delta,
1344 msg.hd.ts_event.into(),
1345 ts_event,
1346 ts_init,
1347 ))
1348}
1349
1350#[cfg(test)]
1354mod tests {
1355 use std::path::{Path, PathBuf};
1356
1357 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1358 use fallible_streaming_iterator::FallibleStreamingIterator;
1359 use nautilus_model::instruments::Instrument;
1360 use rstest::*;
1361
1362 use super::*;
1363
1364 fn test_data_path() -> PathBuf {
1365 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1366 }
1367
1368 #[rstest]
1369 #[case('Y' as c_char, Some(true))]
1370 #[case('N' as c_char, Some(false))]
1371 #[case('X' as c_char, None)]
1372 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1373 assert_eq!(parse_optional_bool(input), expected);
1374 }
1375
1376 #[rstest]
1377 #[case('A' as c_char, OrderSide::Sell)]
1378 #[case('B' as c_char, OrderSide::Buy)]
1379 #[case('X' as c_char, OrderSide::NoOrderSide)]
1380 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1381 assert_eq!(parse_order_side(input), expected);
1382 }
1383
1384 #[rstest]
1385 #[case('A' as c_char, AggressorSide::Seller)]
1386 #[case('B' as c_char, AggressorSide::Buyer)]
1387 #[case('X' as c_char, AggressorSide::NoAggressor)]
1388 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1389 assert_eq!(parse_aggressor_side(input), expected);
1390 }
1391
1392 #[rstest]
1393 #[case('T' as c_char, true)]
1394 #[case('A' as c_char, false)]
1395 #[case('C' as c_char, false)]
1396 #[case('F' as c_char, false)]
1397 #[case('M' as c_char, false)]
1398 #[case('R' as c_char, false)]
1399 fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1400 assert_eq!(is_trade_msg(action), expected);
1401 }
1402
1403 #[rstest]
1404 #[case('A' as c_char, Ok(BookAction::Add))]
1405 #[case('C' as c_char, Ok(BookAction::Delete))]
1406 #[case('F' as c_char, Ok(BookAction::Update))]
1407 #[case('M' as c_char, Ok(BookAction::Update))]
1408 #[case('R' as c_char, Ok(BookAction::Clear))]
1409 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1410 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1411 match parse_book_action(input) {
1412 Ok(action) => assert_eq!(Ok(action), expected),
1413 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1414 }
1415 }
1416
1417 #[rstest]
1418 #[case('C' as c_char, Ok(OptionKind::Call))]
1419 #[case('P' as c_char, Ok(OptionKind::Put))]
1420 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1421 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1422 match parse_option_kind(input) {
1423 Ok(kind) => assert_eq!(Ok(kind), expected),
1424 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1425 }
1426 }
1427
1428 #[rstest]
1429 #[case(Ok("USD"), Currency::USD())]
1430 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1431 #[case(Ok(""), Currency::USD())]
1432 #[case(Err("Error"), Currency::USD())]
1433 fn test_parse_currency_or_usd_default(
1434 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1436 ) {
1437 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1438 assert_eq!(actual, expected);
1439 }
1440
1441 #[rstest]
1442 #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1443 #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1444 #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1445 #[case("XXX", Ok((None, None)))]
1446 #[case("D", Err("Value string is too short"))]
1447 fn test_parse_cfi_iso10926(
1448 #[case] input: &str,
1449 #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1450 ) {
1451 match parse_cfi_iso10926(input) {
1452 Ok(result) => assert_eq!(Ok(result), expected),
1453 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1454 }
1455 }
1456
1457 #[rstest]
1458 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1462 let actual = decode_price_increment(value, precision);
1463 assert_eq!(actual, expected);
1464 }
1465
1466 #[rstest]
1467 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] fn test_decode_optional_price(
1471 #[case] value: i64,
1472 #[case] precision: u8,
1473 #[case] expected: Option<Price>,
1474 ) {
1475 let actual = decode_optional_price(value, precision);
1476 assert_eq!(actual, expected);
1477 }
1478
1479 #[rstest]
1480 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1484 let actual = decode_optional_quantity(value);
1485 assert_eq!(actual, expected);
1486 }
1487
1488 #[rstest]
1489 #[case(0, Quantity::from(1))] #[case(i64::MAX, Quantity::from(1))] #[case(50_000_000_000, Quantity::from("50"))] #[case(12_500_000_000, Quantity::from("12.5"))] #[case(1_000_000_000, Quantity::from("1"))] #[case(1, Quantity::from("0.000000001"))] #[case(1_000_000_001, Quantity::from("1.000000001"))] #[case(999_999_999, Quantity::from("0.999999999"))] #[case(123_456_789_000, Quantity::from("123.456789"))] fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1499 assert_eq!(decode_multiplier(raw).unwrap(), expected);
1500 }
1501
1502 #[rstest]
1503 #[case(-1_500_000_000)] #[case(-1)] #[case(-999_999_999)] fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1507 let result = decode_multiplier(raw);
1508 assert!(result.is_err());
1509 assert!(
1510 result
1511 .unwrap_err()
1512 .to_string()
1513 .contains("Invalid negative multiplier")
1514 );
1515 }
1516
1517 #[rstest]
1518 #[case(100, Quantity::from(100))]
1519 #[case(1000, Quantity::from(1000))]
1520 #[case(5, Quantity::from(5))]
1521 fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1522 assert_eq!(decode_quantity(value), expected);
1523 }
1524
1525 #[rstest]
1526 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] fn test_decode_price_increment(
1530 #[case] value: i64,
1531 #[case] precision: u8,
1532 #[case] expected: Price,
1533 ) {
1534 assert_eq!(decode_price_increment(value, precision), expected);
1535 }
1536
1537 #[rstest]
1538 #[case(0, Quantity::from(1))] #[case(i32::MAX, Quantity::from(1))] #[case(100, Quantity::from(100))]
1541 #[case(1, Quantity::from(1))]
1542 #[case(1000, Quantity::from(1000))]
1543 fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1544 assert_eq!(decode_lot_size(value), expected);
1545 }
1546
1547 #[rstest]
1548 #[case(0, None)] #[case(1, Some(Ustr::from("Scheduled")))]
1550 #[case(2, Some(Ustr::from("Surveillance intervention")))]
1551 #[case(3, Some(Ustr::from("Market event")))]
1552 #[case(10, Some(Ustr::from("Regulatory")))]
1553 #[case(30, Some(Ustr::from("News pending")))]
1554 #[case(40, Some(Ustr::from("Order imbalance")))]
1555 #[case(50, Some(Ustr::from("LULD pause")))]
1556 #[case(60, Some(Ustr::from("Operational")))]
1557 #[case(100, Some(Ustr::from("Corporate action")))]
1558 #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1559 fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1560 assert_eq!(parse_status_reason(value).unwrap(), expected);
1561 }
1562
1563 #[rstest]
1564 #[case(999)] fn test_parse_status_reason_invalid(#[case] value: u16) {
1566 assert!(parse_status_reason(value).is_err());
1567 }
1568
1569 #[rstest]
1570 #[case(0, None)] #[case(1, Some(Ustr::from("No cancel")))]
1572 #[case(2, Some(Ustr::from("Change trading session")))]
1573 #[case(3, Some(Ustr::from("Implied matching on")))]
1574 #[case(4, Some(Ustr::from("Implied matching off")))]
1575 fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1576 assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1577 }
1578
1579 #[rstest]
1580 #[case(5)] #[case(100)] fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1583 assert!(parse_status_trading_event(value).is_err());
1584 }
1585
1586 #[rstest]
1587 fn test_decode_mbo_msg() {
1588 let path = test_data_path().join("test_data.mbo.dbn.zst");
1589 let mut dbn_stream = Decoder::from_zstd_file(path)
1590 .unwrap()
1591 .decode_stream::<dbn::MboMsg>();
1592 let msg = dbn_stream.next().unwrap().unwrap();
1593
1594 let instrument_id = InstrumentId::from("ESM4.GLBX");
1595 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1596 let delta = delta.unwrap();
1597
1598 assert_eq!(delta.instrument_id, instrument_id);
1599 assert_eq!(delta.action, BookAction::Delete);
1600 assert_eq!(delta.order.side, OrderSide::Sell);
1601 assert_eq!(delta.order.price, Price::from("3722.75"));
1602 assert_eq!(delta.order.size, Quantity::from("1"));
1603 assert_eq!(delta.order.order_id, 647_784_973_705);
1604 assert_eq!(delta.flags, 128);
1605 assert_eq!(delta.sequence, 1_170_352);
1606 assert_eq!(delta.ts_event, msg.ts_recv);
1607 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1608 assert_eq!(delta.ts_init, 0);
1609 }
1610
1611 #[rstest]
1612 fn test_decode_mbo_msg_clear_action() {
1613 let ts_recv = 1_609_160_400_000_000_000;
1615 let msg = dbn::MboMsg {
1616 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1617 order_id: 0,
1618 price: i64::MAX,
1619 size: 0,
1620 flags: dbn::FlagSet::empty(),
1621 channel_id: 0,
1622 action: 'R' as c_char,
1623 side: 'N' as c_char, ts_recv,
1625 ts_in_delta: 0,
1626 sequence: 1_000_000,
1627 };
1628
1629 let instrument_id = InstrumentId::from("ESM4.GLBX");
1630 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1631
1632 assert!(trade.is_none());
1634 let delta = delta.expect("Clear action should produce OrderBookDelta");
1635
1636 assert_eq!(delta.instrument_id, instrument_id);
1637 assert_eq!(delta.action, BookAction::Clear);
1638 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1639 assert_eq!(delta.order.size, Quantity::from("0"));
1640 assert_eq!(delta.order.order_id, 0);
1641 assert_eq!(delta.sequence, 1_000_000);
1642 assert_eq!(delta.ts_event, ts_recv);
1643 assert_eq!(delta.ts_init, 0);
1644 }
1645
1646 #[rstest]
1647 fn test_decode_mbo_msg_no_order_side_update() {
1648 let ts_recv = 1_609_160_400_000_000_000;
1651 let msg = dbn::MboMsg {
1652 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1653 order_id: 123_456_789,
1654 price: 4_800_250_000_000, size: 1,
1656 flags: dbn::FlagSet::empty(),
1657 channel_id: 1,
1658 action: 'M' as c_char, side: 'N' as c_char, ts_recv,
1661 ts_in_delta: 0,
1662 sequence: 1_000_000,
1663 };
1664
1665 let instrument_id = InstrumentId::from("ESM4.GLBX");
1666 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1667
1668 assert!(delta.is_some());
1670 assert!(trade.is_none());
1671 let delta = delta.unwrap();
1672 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1673 assert_eq!(delta.order.order_id, 123_456_789);
1674 assert_eq!(delta.action, BookAction::Update);
1675 }
1676
1677 #[rstest]
1678 fn test_decode_mbp1_msg() {
1679 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1680 let mut dbn_stream = Decoder::from_zstd_file(path)
1681 .unwrap()
1682 .decode_stream::<dbn::Mbp1Msg>();
1683 let msg = dbn_stream.next().unwrap().unwrap();
1684
1685 let instrument_id = InstrumentId::from("ESM4.GLBX");
1686 let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1687
1688 assert_eq!(quote.instrument_id, instrument_id);
1689 assert_eq!(quote.bid_price, Price::from("3720.25"));
1690 assert_eq!(quote.ask_price, Price::from("3720.50"));
1691 assert_eq!(quote.bid_size, Quantity::from("24"));
1692 assert_eq!(quote.ask_size, Quantity::from("11"));
1693 assert_eq!(quote.ts_event, msg.ts_recv);
1694 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1695 assert_eq!(quote.ts_init, 0);
1696 }
1697
1698 #[rstest]
1699 fn test_decode_bbo_1s_msg() {
1700 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1701 let mut dbn_stream = Decoder::from_zstd_file(path)
1702 .unwrap()
1703 .decode_stream::<dbn::BboMsg>();
1704 let msg = dbn_stream.next().unwrap().unwrap();
1705
1706 let instrument_id = InstrumentId::from("ESM4.GLBX");
1707 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1708
1709 assert_eq!(quote.instrument_id, instrument_id);
1710 assert_eq!(quote.bid_price, Price::from("3702.25"));
1711 assert_eq!(quote.ask_price, Price::from("3702.75"));
1712 assert_eq!(quote.bid_size, Quantity::from("18"));
1713 assert_eq!(quote.ask_size, Quantity::from("13"));
1714 assert_eq!(quote.ts_event, msg.ts_recv);
1715 assert_eq!(quote.ts_event, 1609113600000000000);
1716 assert_eq!(quote.ts_init, 0);
1717 }
1718
1719 #[rstest]
1720 fn test_decode_bbo_1m_msg() {
1721 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1722 let mut dbn_stream = Decoder::from_zstd_file(path)
1723 .unwrap()
1724 .decode_stream::<dbn::BboMsg>();
1725 let msg = dbn_stream.next().unwrap().unwrap();
1726
1727 let instrument_id = InstrumentId::from("ESM4.GLBX");
1728 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1729
1730 assert_eq!(quote.instrument_id, instrument_id);
1731 assert_eq!(quote.bid_price, Price::from("3702.25"));
1732 assert_eq!(quote.ask_price, Price::from("3702.75"));
1733 assert_eq!(quote.bid_size, Quantity::from("18"));
1734 assert_eq!(quote.ask_size, Quantity::from("13"));
1735 assert_eq!(quote.ts_event, msg.ts_recv);
1736 assert_eq!(quote.ts_event, 1609113600000000000);
1737 assert_eq!(quote.ts_init, 0);
1738 }
1739
1740 #[rstest]
1741 fn test_decode_mbp10_msg() {
1742 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1743 let mut dbn_stream = Decoder::from_zstd_file(path)
1744 .unwrap()
1745 .decode_stream::<dbn::Mbp10Msg>();
1746 let msg = dbn_stream.next().unwrap().unwrap();
1747
1748 let instrument_id = InstrumentId::from("ESM4.GLBX");
1749 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1750
1751 assert_eq!(depth10.instrument_id, instrument_id);
1752 assert_eq!(depth10.bids.len(), 10);
1753 assert_eq!(depth10.asks.len(), 10);
1754 assert_eq!(depth10.bid_counts.len(), 10);
1755 assert_eq!(depth10.ask_counts.len(), 10);
1756 assert_eq!(depth10.flags, 128);
1757 assert_eq!(depth10.sequence, 1_170_352);
1758 assert_eq!(depth10.ts_event, msg.ts_recv);
1759 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1760 assert_eq!(depth10.ts_init, 0);
1761 }
1762
1763 #[rstest]
1764 fn test_decode_trade_msg() {
1765 let path = test_data_path().join("test_data.trades.dbn.zst");
1766 let mut dbn_stream = Decoder::from_zstd_file(path)
1767 .unwrap()
1768 .decode_stream::<dbn::TradeMsg>();
1769 let msg = dbn_stream.next().unwrap().unwrap();
1770
1771 let instrument_id = InstrumentId::from("ESM4.GLBX");
1772 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1773
1774 assert_eq!(trade.instrument_id, instrument_id);
1775 assert_eq!(trade.price, Price::from("3720.25"));
1776 assert_eq!(trade.size, Quantity::from("5"));
1777 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1778 assert_eq!(trade.trade_id.to_string(), "1170380");
1779 assert_eq!(trade.ts_event, msg.ts_recv);
1780 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1781 assert_eq!(trade.ts_init, 0);
1782 }
1783
1784 #[rstest]
1785 fn test_decode_tbbo_msg() {
1786 let path = test_data_path().join("test_data.tbbo.dbn.zst");
1787 let mut dbn_stream = Decoder::from_zstd_file(path)
1788 .unwrap()
1789 .decode_stream::<dbn::Mbp1Msg>();
1790 let msg = dbn_stream.next().unwrap().unwrap();
1791
1792 let instrument_id = InstrumentId::from("ESM4.GLBX");
1793 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1794
1795 assert_eq!(quote.instrument_id, instrument_id);
1796 assert_eq!(quote.bid_price, Price::from("3720.25"));
1797 assert_eq!(quote.ask_price, Price::from("3720.50"));
1798 assert_eq!(quote.bid_size, Quantity::from("26"));
1799 assert_eq!(quote.ask_size, Quantity::from("7"));
1800 assert_eq!(quote.ts_event, msg.ts_recv);
1801 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1802 assert_eq!(quote.ts_init, 0);
1803
1804 assert_eq!(trade.instrument_id, instrument_id);
1805 assert_eq!(trade.price, Price::from("3720.25"));
1806 assert_eq!(trade.size, Quantity::from("5"));
1807 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1808 assert_eq!(trade.trade_id.to_string(), "1170380");
1809 assert_eq!(trade.ts_event, msg.ts_recv);
1810 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1811 assert_eq!(trade.ts_init, 0);
1812 }
1813
1814 #[rstest]
1815 fn test_decode_ohlcv_msg() {
1816 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1817 let mut dbn_stream = Decoder::from_zstd_file(path)
1818 .unwrap()
1819 .decode_stream::<dbn::OhlcvMsg>();
1820 let msg = dbn_stream.next().unwrap().unwrap();
1821
1822 let instrument_id = InstrumentId::from("ESM4.GLBX");
1823 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1824
1825 assert_eq!(
1826 bar.bar_type,
1827 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1828 );
1829 assert_eq!(bar.open, Price::from("372025.00"));
1830 assert_eq!(bar.high, Price::from("372050.00"));
1831 assert_eq!(bar.low, Price::from("372025.00"));
1832 assert_eq!(bar.close, Price::from("372050.00"));
1833 assert_eq!(bar.volume, Quantity::from("57"));
1834 assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); assert_eq!(bar.ts_init, 0); }
1837
1838 #[rstest]
1839 fn test_decode_definition_msg() {
1840 let path = test_data_path().join("test_data.definition.dbn.zst");
1841 let mut dbn_stream = Decoder::from_zstd_file(path)
1842 .unwrap()
1843 .decode_stream::<dbn::InstrumentDefMsg>();
1844 let msg = dbn_stream.next().unwrap().unwrap();
1845
1846 let instrument_id = InstrumentId::from("ESM4.GLBX");
1847 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1848
1849 assert!(result.is_ok());
1850 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1851 }
1852
1853 #[rstest]
1854 fn test_decode_status_msg() {
1855 let path = test_data_path().join("test_data.status.dbn.zst");
1856 let mut dbn_stream = Decoder::from_zstd_file(path)
1857 .unwrap()
1858 .decode_stream::<dbn::StatusMsg>();
1859 let msg = dbn_stream.next().unwrap().unwrap();
1860
1861 let instrument_id = InstrumentId::from("ESM4.GLBX");
1862 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1863
1864 assert_eq!(status.instrument_id, instrument_id);
1865 assert_eq!(status.action, MarketStatusAction::Trading);
1866 assert_eq!(status.ts_event, msg.hd.ts_event);
1867 assert_eq!(status.ts_init, 0);
1868 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1869 assert_eq!(status.trading_event, None);
1870 assert_eq!(status.is_trading, Some(true));
1871 assert_eq!(status.is_quoting, Some(true));
1872 assert_eq!(status.is_short_sell_restricted, None);
1873 }
1874
1875 #[rstest]
1876 fn test_decode_imbalance_msg() {
1877 let path = test_data_path().join("test_data.imbalance.dbn.zst");
1878 let mut dbn_stream = Decoder::from_zstd_file(path)
1879 .unwrap()
1880 .decode_stream::<dbn::ImbalanceMsg>();
1881 let msg = dbn_stream.next().unwrap().unwrap();
1882
1883 let instrument_id = InstrumentId::from("ESM4.GLBX");
1884 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1885
1886 assert_eq!(imbalance.instrument_id, instrument_id);
1887 assert_eq!(imbalance.ref_price, Price::from("229.43"));
1888 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1889 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1890 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1891 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1892 assert_eq!(imbalance.side, OrderSide::Buy);
1893 assert_eq!(imbalance.significant_imbalance, 126);
1894 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1895 assert_eq!(imbalance.ts_recv, msg.ts_recv);
1896 assert_eq!(imbalance.ts_init, 0);
1897 }
1898
1899 #[rstest]
1900 fn test_decode_statistics_msg() {
1901 let path = test_data_path().join("test_data.statistics.dbn.zst");
1902 let mut dbn_stream = Decoder::from_zstd_file(path)
1903 .unwrap()
1904 .decode_stream::<dbn::StatMsg>();
1905 let msg = dbn_stream.next().unwrap().unwrap();
1906
1907 let instrument_id = InstrumentId::from("ESM4.GLBX");
1908 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1909
1910 assert_eq!(statistics.instrument_id, instrument_id);
1911 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1912 assert_eq!(
1913 statistics.update_action,
1914 DatabentoStatisticUpdateAction::Added
1915 );
1916 assert_eq!(statistics.price, Some(Price::from("100.00")));
1917 assert_eq!(statistics.quantity, None);
1918 assert_eq!(statistics.channel_id, 13);
1919 assert_eq!(statistics.stat_flags, 255);
1920 assert_eq!(statistics.sequence, 2);
1921 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1922 assert_eq!(statistics.ts_in_delta, 26961);
1923 assert_eq!(statistics.ts_event, msg.hd.ts_event);
1924 assert_eq!(statistics.ts_recv, msg.ts_recv);
1925 assert_eq!(statistics.ts_init, 0);
1926 }
1927
1928 #[rstest]
1929 fn test_decode_cmbp1_msg() {
1930 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1931 let mut dbn_stream = Decoder::from_zstd_file(path)
1932 .unwrap()
1933 .decode_stream::<dbn::Cmbp1Msg>();
1934 let msg = dbn_stream.next().unwrap().unwrap();
1935
1936 let instrument_id = InstrumentId::from("ESM4.GLBX");
1937 let (quote, trade) = decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1938
1939 assert_eq!(quote.instrument_id, instrument_id);
1940 assert!(quote.bid_price.raw > 0);
1941 assert!(quote.ask_price.raw > 0);
1942 assert!(quote.bid_size.raw > 0);
1943 assert!(quote.ask_size.raw > 0);
1944 assert_eq!(quote.ts_event, msg.ts_recv);
1945 assert_eq!(quote.ts_init, 0);
1946
1947 if msg.action as u8 as char == 'T' {
1949 assert!(trade.is_some());
1950 let trade = trade.unwrap();
1951 assert_eq!(trade.instrument_id, instrument_id);
1952 } else {
1953 assert!(trade.is_none());
1954 }
1955 }
1956
1957 #[rstest]
1958 fn test_decode_cbbo_1s_msg() {
1959 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1960 let mut dbn_stream = Decoder::from_zstd_file(path)
1961 .unwrap()
1962 .decode_stream::<dbn::CbboMsg>();
1963 let msg = dbn_stream.next().unwrap().unwrap();
1964
1965 let instrument_id = InstrumentId::from("ESM4.GLBX");
1966 let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1967
1968 assert_eq!(quote.instrument_id, instrument_id);
1969 assert!(quote.bid_price.raw > 0);
1970 assert!(quote.ask_price.raw > 0);
1971 assert!(quote.bid_size.raw > 0);
1972 assert!(quote.ask_size.raw > 0);
1973 assert_eq!(quote.ts_event, msg.ts_recv);
1974 assert_eq!(quote.ts_init, 0);
1975 }
1976
1977 #[rstest]
1978 fn test_decode_mbp10_msg_with_all_levels() {
1979 let mut msg = dbn::Mbp10Msg::default();
1980 for i in 0..10 {
1981 msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
1982 msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
1983 msg.levels[i].bid_sz = 10 + i as u32;
1984 msg.levels[i].ask_sz = 10 + i as u32;
1985 msg.levels[i].bid_ct = 1 + i as u32;
1986 msg.levels[i].ask_ct = 1 + i as u32;
1987 }
1988 msg.ts_recv = 1_609_160_400_000_704_060;
1989
1990 let instrument_id = InstrumentId::from("TEST.VENUE");
1991 let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
1992
1993 assert!(result.is_ok());
1994 let depth = result.unwrap();
1995 assert_eq!(depth.bids.len(), 10);
1996 assert_eq!(depth.asks.len(), 10);
1997 assert_eq!(depth.bid_counts.len(), 10);
1998 assert_eq!(depth.ask_counts.len(), 10);
1999 }
2000
2001 #[rstest]
2002 fn test_array_conversion_error_handling() {
2003 let mut bids = Vec::new();
2004 let mut asks = Vec::new();
2005
2006 for i in 0..5 {
2008 bids.push(BookOrder::new(
2009 OrderSide::Buy,
2010 Price::from(format!("{}.00", 100 - i)),
2011 Quantity::from(10),
2012 i as u64,
2013 ));
2014 asks.push(BookOrder::new(
2015 OrderSide::Sell,
2016 Price::from(format!("{}.00", 101 + i)),
2017 Quantity::from(10),
2018 i as u64,
2019 ));
2020 }
2021
2022 let result: Result<[BookOrder; DEPTH10_LEN], _> =
2023 bids.try_into().map_err(|v: Vec<BookOrder>| {
2024 anyhow::anyhow!(
2025 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2026 v.len()
2027 )
2028 });
2029 assert!(result.is_err());
2030 assert!(
2031 result
2032 .unwrap_err()
2033 .to_string()
2034 .contains("Expected exactly 10 bid levels, received 5")
2035 );
2036 }
2037
2038 #[rstest]
2039 fn test_decode_tcbbo_msg() {
2040 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2042 let mut dbn_stream = Decoder::from_zstd_file(path)
2043 .unwrap()
2044 .decode_stream::<dbn::CbboMsg>();
2045 let msg = dbn_stream.next().unwrap().unwrap();
2046
2047 let mut tcbbo_msg = msg.clone();
2049 tcbbo_msg.price = 3702500000000;
2050 tcbbo_msg.size = 10;
2051
2052 let instrument_id = InstrumentId::from("ESM4.GLBX");
2053 let (quote, trade) =
2054 decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2055
2056 assert_eq!(quote.instrument_id, instrument_id);
2057 assert!(quote.bid_price.raw > 0);
2058 assert!(quote.ask_price.raw > 0);
2059 assert!(quote.bid_size.raw > 0);
2060 assert!(quote.ask_size.raw > 0);
2061 assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2062 assert_eq!(quote.ts_init, 0);
2063
2064 assert_eq!(trade.instrument_id, instrument_id);
2065 assert_eq!(trade.price, Price::from("3702.50"));
2066 assert_eq!(trade.size, Quantity::from(10));
2067 assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2068 assert_eq!(trade.ts_init, 0);
2069 }
2070
2071 #[rstest]
2072 fn test_decode_bar_type() {
2073 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2074 let instrument_id = InstrumentId::from("ESM4.GLBX");
2075
2076 msg.hd.rtype = 32;
2078 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2079 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2080
2081 msg.hd.rtype = 33;
2083 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2084 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2085
2086 msg.hd.rtype = 34;
2088 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2089 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2090
2091 msg.hd.rtype = 35;
2093 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2094 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2095
2096 msg.hd.rtype = 99;
2098 let result = decode_bar_type(&msg, instrument_id);
2099 assert!(result.is_err());
2100 }
2101
2102 #[rstest]
2103 fn test_decode_ts_event_adjustment() {
2104 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2105
2106 msg.hd.rtype = 32;
2108 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2109 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2110
2111 msg.hd.rtype = 33;
2113 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2114 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2115
2116 msg.hd.rtype = 34;
2118 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2119 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2120
2121 msg.hd.rtype = 35;
2123 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2124 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2125
2126 msg.hd.rtype = 36;
2128 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2129 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2130
2131 msg.hd.rtype = 99;
2133 let result = decode_ts_event_adjustment(&msg);
2134 assert!(result.is_err());
2135 }
2136
2137 #[rstest]
2138 fn test_decode_record() {
2139 let path = test_data_path().join("test_data.mbo.dbn.zst");
2141 let decoder = Decoder::from_zstd_file(path).unwrap();
2142 let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2143 let msg = dbn_stream.next().unwrap().unwrap();
2144
2145 let record_ref = dbn::RecordRef::from(msg);
2146 let instrument_id = InstrumentId::from("ESM4.GLBX");
2147
2148 let (data1, data2) =
2149 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2150
2151 assert!(data1.is_some());
2152 assert!(data2.is_none());
2153
2154 let path = test_data_path().join("test_data.trades.dbn.zst");
2156 let decoder = Decoder::from_zstd_file(path).unwrap();
2157 let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2158 let msg = dbn_stream.next().unwrap().unwrap();
2159
2160 let record_ref = dbn::RecordRef::from(msg);
2161
2162 let (data1, data2) =
2163 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2164
2165 assert!(data1.is_some());
2166 assert!(data2.is_none());
2167 assert!(matches!(data1.unwrap(), Data::Trade(_)));
2168 }
2169}