1use std::{cmp, ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24 },
25 enums::{
26 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28 },
29 identifiers::{InstrumentId, TradeId},
30 instruments::{
31 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32 },
33 types::{Currency, Price, Quantity, price::decode_raw_price_i64},
34};
35use ustr::Ustr;
36
37use super::{
38 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
39 types::{DatabentoImbalance, DatabentoStatistics},
40};
41
42const DATABENTO_FIXED_SCALAR: f64 = 1_000_000_000.0;
43
44const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
46
47const BAR_SPEC_1S: BarSpecification = BarSpecification {
48 step: STEP_ONE,
49 aggregation: BarAggregation::Second,
50 price_type: PriceType::Last,
51};
52const BAR_SPEC_1M: BarSpecification = BarSpecification {
53 step: STEP_ONE,
54 aggregation: BarAggregation::Minute,
55 price_type: PriceType::Last,
56};
57const BAR_SPEC_1H: BarSpecification = BarSpecification {
58 step: STEP_ONE,
59 aggregation: BarAggregation::Hour,
60 price_type: PriceType::Last,
61};
62const BAR_SPEC_1D: BarSpecification = BarSpecification {
63 step: STEP_ONE,
64 aggregation: BarAggregation::Day,
65 price_type: PriceType::Last,
66};
67
68const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
69const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
70const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
71const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
72
73#[must_use]
74pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
75 match c as u8 as char {
76 'Y' => Some(true),
77 'N' => Some(false),
78 _ => None,
79 }
80}
81
82#[must_use]
83pub const fn parse_order_side(c: c_char) -> OrderSide {
84 match c as u8 as char {
85 'A' => OrderSide::Sell,
86 'B' => OrderSide::Buy,
87 _ => OrderSide::NoOrderSide,
88 }
89}
90
91#[must_use]
92pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
93 match c as u8 as char {
94 'A' => AggressorSide::Seller,
95 'B' => AggressorSide::Buyer,
96 _ => AggressorSide::NoAggressor,
97 }
98}
99
100pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
101 match c as u8 as char {
102 'A' => Ok(BookAction::Add),
103 'C' => Ok(BookAction::Delete),
104 'F' => Ok(BookAction::Update),
105 'M' => Ok(BookAction::Update),
106 'R' => Ok(BookAction::Clear),
107 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
108 }
109}
110
111pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
112 match c as u8 as char {
113 'C' => Ok(OptionKind::Call),
114 'P' => Ok(OptionKind::Put),
115 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
116 }
117}
118
119fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
120 match value {
121 Ok(value) if !value.is_empty() => {
122 Currency::try_from_str(value).unwrap_or_else(Currency::USD)
123 }
124 Ok(_) => Currency::USD(),
125 Err(e) => {
126 log::error!("Error parsing currency: {e}");
127 Currency::USD()
128 }
129 }
130}
131
132pub fn parse_cfi_iso10926(
133 value: &str,
134) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
135 let chars: Vec<char> = value.chars().collect();
136 if chars.len() < 3 {
137 anyhow::bail!("Value string is too short");
138 }
139
140 let cfi_category = chars[0];
142 let cfi_group = chars[1];
143 let cfi_attribute1 = chars[2];
144 let mut asset_class = match cfi_category {
149 'D' => Some(AssetClass::Debt),
150 'E' => Some(AssetClass::Equity),
151 'S' => None,
152 _ => None,
153 };
154
155 let instrument_class = match cfi_group {
156 'I' => Some(InstrumentClass::Future),
157 _ => None,
158 };
159
160 if cfi_attribute1 == 'I' {
161 asset_class = Some(AssetClass::Index);
162 }
163
164 Ok((asset_class, instrument_class))
165}
166
167pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
169 let value_str = match value {
170 0 => return Ok(None),
171 1 => "Scheduled",
172 2 => "Surveillance intervention",
173 3 => "Market event",
174 4 => "Instrument activation",
175 5 => "Instrument expiration",
176 6 => "Recovery in process",
177 10 => "Regulatory",
178 11 => "Administrative",
179 12 => "Non-compliance",
180 13 => "Filings not current",
181 14 => "SEC trading suspension",
182 15 => "New issue",
183 16 => "Issue available",
184 17 => "Issues reviewed",
185 18 => "Filing requirements satisfied",
186 30 => "News pending",
187 31 => "News released",
188 32 => "News and resumption times",
189 33 => "News not forthcoming",
190 40 => "Order imbalance",
191 50 => "LULD pause",
192 60 => "Operational",
193 70 => "Additional information requested",
194 80 => "Merger effective",
195 90 => "ETF",
196 100 => "Corporate action",
197 110 => "New Security offering",
198 120 => "Market wide halt level 1",
199 121 => "Market wide halt level 2",
200 122 => "Market wide halt level 3",
201 123 => "Market wide halt carryover",
202 124 => "Market wide halt resumption",
203 130 => "Quotation not available",
204 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
205 };
206
207 Ok(Some(Ustr::from(value_str)))
208}
209
210pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
211 let value_str = match value {
212 0 => return Ok(None),
213 1 => "No cancel",
214 2 => "Change trading session",
215 3 => "Implied matching on",
216 4 => "Implied matching off",
217 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
218 };
219
220 Ok(Some(Ustr::from(value_str)))
221}
222
223#[must_use]
225pub fn decode_price(value: i64, precision: u8) -> Price {
226 Price::from_raw(decode_raw_price_i64(value), precision)
227}
228
229#[must_use]
231pub fn decode_quantity(value: u64) -> Quantity {
232 Quantity::from(value)
233}
234
235#[must_use]
237pub fn decode_price_increment(value: i64, precision: u8) -> Price {
238 match value {
239 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
240 _ => decode_price(value, precision),
241 }
242}
243
244#[must_use]
246pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
247 match value {
248 i64::MAX => None,
249 _ => Some(decode_price(value, precision)),
250 }
251}
252
253#[must_use]
255pub fn decode_optional_quantity(value: i32) -> Option<Quantity> {
256 match value {
257 i32::MAX => None,
258 _ => Some(Quantity::from(value)),
259 }
260}
261
262#[must_use]
264pub fn decode_multiplier(value: i64) -> Quantity {
265 match value {
266 0 | i64::MAX => Quantity::from(1),
267 _ => Quantity::from(format!("{}", value as f64 / DATABENTO_FIXED_SCALAR)),
268 }
269}
270
271#[must_use]
273pub fn decode_lot_size(value: i32) -> Quantity {
274 match value {
275 0 | i32::MAX => Quantity::from(1),
276 value => Quantity::from(value),
277 }
278}
279
280pub fn decode_equity_v1(
281 msg: &dbn::compat::InstrumentDefMsgV1,
282 instrument_id: InstrumentId,
283 ts_init: UnixNanos,
284) -> anyhow::Result<Equity> {
285 let currency = parse_currency_or_usd_default(msg.currency());
286 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
287 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
288 let ts_event = UnixNanos::from(msg.ts_recv); Equity::new_checked(
291 instrument_id,
292 instrument_id.symbol,
293 None, currency,
295 price_increment.precision,
296 price_increment,
297 Some(lot_size),
298 None, None, None, None, None, None, None, None, ts_event,
307 ts_init,
308 )
309}
310
311pub fn decode_futures_contract_v1(
312 msg: &dbn::compat::InstrumentDefMsgV1,
313 instrument_id: InstrumentId,
314 ts_init: UnixNanos,
315) -> anyhow::Result<FuturesContract> {
316 let currency = parse_currency_or_usd_default(msg.currency());
317 let exchange = Ustr::from(msg.exchange()?);
318 let underlying = Ustr::from(msg.asset()?);
319 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
320 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
321 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
322 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
323 let ts_event = UnixNanos::from(msg.ts_recv); FuturesContract::new_checked(
326 instrument_id,
327 instrument_id.symbol,
328 asset_class.unwrap_or(AssetClass::Commodity),
329 Some(exchange),
330 underlying,
331 msg.activation.into(),
332 msg.expiration.into(),
333 currency,
334 price_increment.precision,
335 price_increment,
336 multiplier,
337 lot_size,
338 None, None, None, None, None, None, None, None, ts_event,
347 ts_init,
348 )
349}
350
351pub fn decode_futures_spread_v1(
352 msg: &dbn::compat::InstrumentDefMsgV1,
353 instrument_id: InstrumentId,
354 ts_init: UnixNanos,
355) -> anyhow::Result<FuturesSpread> {
356 let exchange = Ustr::from(msg.exchange()?);
357 let underlying = Ustr::from(msg.asset()?);
358 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
359 let strategy_type = Ustr::from(msg.secsubtype()?);
360 let currency = parse_currency_or_usd_default(msg.currency());
361 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
362 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
363 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
364 let ts_event = UnixNanos::from(msg.ts_recv); FuturesSpread::new_checked(
367 instrument_id,
368 instrument_id.symbol,
369 asset_class.unwrap_or(AssetClass::Commodity),
370 Some(exchange),
371 underlying,
372 strategy_type,
373 msg.activation.into(),
374 msg.expiration.into(),
375 currency,
376 price_increment.precision,
377 price_increment,
378 multiplier,
379 lot_size,
380 None, None, None, None, None, None, None, None, ts_event,
389 ts_init,
390 )
391}
392
393pub fn decode_option_contract_v1(
394 msg: &dbn::compat::InstrumentDefMsgV1,
395 instrument_id: InstrumentId,
396 ts_init: UnixNanos,
397) -> anyhow::Result<OptionContract> {
398 let currency = parse_currency_or_usd_default(msg.currency());
399 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
400 let exchange = Ustr::from(msg.exchange()?);
401 let underlying = Ustr::from(msg.underlying()?);
402 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
403 Some(AssetClass::Equity)
404 } else {
405 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
406 asset_class
407 };
408 let option_kind = parse_option_kind(msg.instrument_class)?;
409 let strike_price = Price::from_raw(
410 decode_raw_price_i64(msg.strike_price),
411 strike_price_currency.precision,
412 );
413 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
414 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
415 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
416 let ts_event = UnixNanos::from(msg.ts_recv); OptionContract::new_checked(
419 instrument_id,
420 instrument_id.symbol,
421 asset_class_opt.unwrap_or(AssetClass::Commodity),
422 Some(exchange),
423 underlying,
424 option_kind,
425 strike_price,
426 currency,
427 msg.activation.into(),
428 msg.expiration.into(),
429 price_increment.precision,
430 price_increment,
431 multiplier,
432 lot_size,
433 None, None, None, None, None, None, None, None, ts_event,
442 ts_init,
443 )
444}
445
446pub fn decode_option_spread_v1(
447 msg: &dbn::compat::InstrumentDefMsgV1,
448 instrument_id: InstrumentId,
449 ts_init: UnixNanos,
450) -> anyhow::Result<OptionSpread> {
451 let currency = parse_currency_or_usd_default(msg.currency());
452 let exchange = Ustr::from(msg.exchange()?);
453 let underlying = Ustr::from(msg.underlying()?);
454 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
455 Some(AssetClass::Equity)
456 } else {
457 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
458 asset_class
459 };
460 let strategy_type = Ustr::from(msg.secsubtype()?);
461 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
462 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
463 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
464 let ts_event = UnixNanos::from(msg.ts_recv); OptionSpread::new_checked(
467 instrument_id,
468 instrument_id.symbol,
469 asset_class_opt.unwrap_or(AssetClass::Commodity),
470 Some(exchange),
471 underlying,
472 strategy_type,
473 msg.activation.into(),
474 msg.expiration.into(),
475 currency,
476 price_increment.precision,
477 price_increment,
478 multiplier,
479 lot_size,
480 None, None, None, None, None, None, None, None, ts_event,
489 ts_init,
490 )
491}
492
493#[must_use]
494fn is_trade_msg(order_side: OrderSide, action: c_char) -> bool {
495 order_side == OrderSide::NoOrderSide || action as u8 as char == 'T'
496}
497
498pub fn decode_mbo_msg(
499 msg: &dbn::MboMsg,
500 instrument_id: InstrumentId,
501 price_precision: u8,
502 ts_init: UnixNanos,
503 include_trades: bool,
504) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
505 let side = parse_order_side(msg.side);
506 if is_trade_msg(side, msg.action) {
507 if include_trades {
508 let trade = TradeTick::new(
509 instrument_id,
510 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
511 Quantity::from(msg.size),
512 parse_aggressor_side(msg.side),
513 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
514 msg.ts_recv.into(),
515 ts_init,
516 );
517 return Ok((None, Some(trade)));
518 }
519
520 return Ok((None, None));
521 }
522
523 let order = BookOrder::new(
524 side,
525 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
526 Quantity::from(msg.size),
527 msg.order_id,
528 );
529
530 let delta = OrderBookDelta::new(
531 instrument_id,
532 parse_book_action(msg.action)?,
533 order,
534 msg.flags.raw(),
535 msg.sequence.into(),
536 msg.ts_recv.into(),
537 ts_init,
538 );
539
540 Ok((Some(delta), None))
541}
542
543pub fn decode_trade_msg(
544 msg: &dbn::TradeMsg,
545 instrument_id: InstrumentId,
546 price_precision: u8,
547 ts_init: UnixNanos,
548) -> anyhow::Result<TradeTick> {
549 let trade = TradeTick::new(
550 instrument_id,
551 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
552 Quantity::from(msg.size),
553 parse_aggressor_side(msg.side),
554 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
555 msg.ts_recv.into(),
556 ts_init,
557 );
558
559 Ok(trade)
560}
561
562pub fn decode_tbbo_msg(
563 msg: &dbn::TbboMsg,
564 instrument_id: InstrumentId,
565 price_precision: u8,
566 ts_init: UnixNanos,
567) -> anyhow::Result<(QuoteTick, TradeTick)> {
568 let top_level = &msg.levels[0];
569 let quote = QuoteTick::new(
570 instrument_id,
571 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
572 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
573 Quantity::from(top_level.bid_sz),
574 Quantity::from(top_level.ask_sz),
575 msg.ts_recv.into(),
576 ts_init,
577 );
578
579 let trade = TradeTick::new(
580 instrument_id,
581 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
582 Quantity::from(msg.size),
583 parse_aggressor_side(msg.side),
584 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
585 msg.ts_recv.into(),
586 ts_init,
587 );
588
589 Ok((quote, trade))
590}
591
592pub fn decode_mbp1_msg(
593 msg: &dbn::Mbp1Msg,
594 instrument_id: InstrumentId,
595 price_precision: u8,
596 ts_init: UnixNanos,
597 include_trades: bool,
598) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
599 let top_level = &msg.levels[0];
600 let quote = QuoteTick::new(
601 instrument_id,
602 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
603 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
604 Quantity::from(top_level.bid_sz),
605 Quantity::from(top_level.ask_sz),
606 msg.ts_recv.into(),
607 ts_init,
608 );
609
610 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
611 Some(TradeTick::new(
612 instrument_id,
613 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
614 Quantity::from(msg.size),
615 parse_aggressor_side(msg.side),
616 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
617 msg.ts_recv.into(),
618 ts_init,
619 ))
620 } else {
621 None
622 };
623
624 Ok((quote, maybe_trade))
625}
626
627pub fn decode_bbo_msg(
628 msg: &dbn::BboMsg,
629 instrument_id: InstrumentId,
630 price_precision: u8,
631 ts_init: UnixNanos,
632) -> anyhow::Result<QuoteTick> {
633 let top_level = &msg.levels[0];
634 let quote = QuoteTick::new(
635 instrument_id,
636 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
637 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
638 Quantity::from(top_level.bid_sz),
639 Quantity::from(top_level.ask_sz),
640 msg.ts_recv.into(),
641 ts_init,
642 );
643
644 Ok(quote)
645}
646
647pub fn decode_mbp10_msg(
648 msg: &dbn::Mbp10Msg,
649 instrument_id: InstrumentId,
650 price_precision: u8,
651 ts_init: UnixNanos,
652) -> anyhow::Result<OrderBookDepth10> {
653 let mut bids = Vec::with_capacity(DEPTH10_LEN);
654 let mut asks = Vec::with_capacity(DEPTH10_LEN);
655 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
656 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
657
658 for level in &msg.levels {
659 let bid_order = BookOrder::new(
660 OrderSide::Buy,
661 Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
662 Quantity::from(level.bid_sz),
663 0,
664 );
665
666 let ask_order = BookOrder::new(
667 OrderSide::Sell,
668 Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
669 Quantity::from(level.ask_sz),
670 0,
671 );
672
673 bids.push(bid_order);
674 asks.push(ask_order);
675 bid_counts.push(level.bid_ct);
676 ask_counts.push(level.ask_ct);
677 }
678
679 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().expect("`bids` length != 10");
680 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().expect("`asks` length != 10");
681 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().expect("`bid_counts` length != 10");
682 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().expect("`ask_counts` length != 10");
683
684 let depth = OrderBookDepth10::new(
685 instrument_id,
686 bids,
687 asks,
688 bid_counts,
689 ask_counts,
690 msg.flags.raw(),
691 msg.sequence.into(),
692 msg.ts_recv.into(),
693 ts_init,
694 );
695
696 Ok(depth)
697}
698
699pub fn decode_bar_type(
700 msg: &dbn::OhlcvMsg,
701 instrument_id: InstrumentId,
702) -> anyhow::Result<BarType> {
703 let bar_type = match msg.hd.rtype {
704 32 => {
705 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
707 }
708 33 => {
709 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
711 }
712 34 => {
713 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
715 }
716 35 => {
717 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
719 }
720 _ => anyhow::bail!(
721 "`rtype` is not a supported bar aggregation, was {}",
722 msg.hd.rtype
723 ),
724 };
725
726 Ok(bar_type)
727}
728
729pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
730 let adjustment = match msg.hd.rtype {
731 32 => {
732 BAR_CLOSE_ADJUSTMENT_1S
734 }
735 33 => {
736 BAR_CLOSE_ADJUSTMENT_1M
738 }
739 34 => {
740 BAR_CLOSE_ADJUSTMENT_1H
742 }
743 35 => {
744 BAR_CLOSE_ADJUSTMENT_1D
746 }
747 _ => anyhow::bail!(
748 "`rtype` is not a supported bar aggregation, was {}",
749 msg.hd.rtype
750 ),
751 };
752
753 Ok(adjustment.into())
754}
755
756pub fn decode_ohlcv_msg(
757 msg: &dbn::OhlcvMsg,
758 instrument_id: InstrumentId,
759 price_precision: u8,
760 ts_init: UnixNanos,
761) -> anyhow::Result<Bar> {
762 let bar_type = decode_bar_type(msg, instrument_id)?;
763 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
764
765 let ts_event = UnixNanos::from(msg.hd.ts_event);
767 let ts_init = cmp::max(ts_init, ts_event) + ts_event_adjustment;
768
769 let bar = Bar::new(
770 bar_type,
771 Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
772 Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
773 Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
774 Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
775 Quantity::from(msg.volume),
776 ts_event,
777 ts_init,
778 );
779
780 Ok(bar)
781}
782
783pub fn decode_status_msg(
784 msg: &dbn::StatusMsg,
785 instrument_id: InstrumentId,
786 ts_init: UnixNanos,
787) -> anyhow::Result<InstrumentStatus> {
788 let status = InstrumentStatus::new(
789 instrument_id,
790 MarketStatusAction::from_u16(msg.action).expect("Invalid `MarketStatusAction`"),
791 msg.hd.ts_event.into(),
792 ts_init,
793 parse_status_reason(msg.reason)?,
794 parse_status_trading_event(msg.trading_event)?,
795 parse_optional_bool(msg.is_trading),
796 parse_optional_bool(msg.is_quoting),
797 parse_optional_bool(msg.is_short_sell_restricted),
798 );
799
800 Ok(status)
801}
802
803pub fn decode_record(
804 record: &dbn::RecordRef,
805 instrument_id: InstrumentId,
806 price_precision: u8,
807 ts_init: Option<UnixNanos>,
808 include_trades: bool,
809) -> anyhow::Result<(Option<Data>, Option<Data>)> {
810 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
814 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
815 let result = decode_mbo_msg(msg, instrument_id, price_precision, ts_init, include_trades)?;
816 match result {
817 (Some(delta), None) => (Some(Data::Delta(delta)), None),
818 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
819 (None, None) => (None, None),
820 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
821 }
822 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
823 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
824 let trade = decode_trade_msg(msg, instrument_id, price_precision, ts_init)?;
825 (Some(Data::Trade(trade)), None)
826 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
827 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
828 let result = decode_mbp1_msg(msg, instrument_id, price_precision, ts_init, include_trades)?;
829 match result {
830 (quote, None) => (Some(Data::Quote(quote)), None),
831 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
832 }
833 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
834 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
835 let quote = decode_bbo_msg(msg, instrument_id, price_precision, ts_init)?;
836 (Some(Data::Quote(quote)), None)
837 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
838 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
839 let quote = decode_bbo_msg(msg, instrument_id, price_precision, ts_init)?;
840 (Some(Data::Quote(quote)), None)
841 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
842 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
843 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, ts_init)?;
844 (Some(Data::from(depth)), None)
845 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
846 let ts_init = determine_timestamp(ts_init, msg.hd.ts_event.into());
847 let bar = decode_ohlcv_msg(msg, instrument_id, price_precision, ts_init)?;
848 (Some(Data::Bar(bar)), None)
849 } else {
850 anyhow::bail!("DBN message type is not currently supported")
851 };
852
853 Ok(result)
854}
855
856const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
857 match ts_init {
858 Some(ts_init) => ts_init,
859 None => msg_timestamp,
860 }
861}
862
863pub fn decode_instrument_def_msg_v1(
864 msg: &dbn::compat::InstrumentDefMsgV1,
865 instrument_id: InstrumentId,
866 ts_init: UnixNanos,
867) -> anyhow::Result<InstrumentAny> {
868 match msg.instrument_class as u8 as char {
869 'K' => Ok(InstrumentAny::Equity(decode_equity_v1(
870 msg,
871 instrument_id,
872 ts_init,
873 )?)),
874 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract_v1(
875 msg,
876 instrument_id,
877 ts_init,
878 )?)),
879 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread_v1(
880 msg,
881 instrument_id,
882 ts_init,
883 )?)),
884 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract_v1(
885 msg,
886 instrument_id,
887 ts_init,
888 )?)),
889 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread_v1(
890 msg,
891 instrument_id,
892 ts_init,
893 )?)),
894 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
895 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
896 _ => anyhow::bail!(
897 "Unsupported `instrument_class` '{}'",
898 msg.instrument_class as u8 as char
899 ),
900 }
901}
902
903pub fn decode_instrument_def_msg(
904 msg: &dbn::InstrumentDefMsg,
905 instrument_id: InstrumentId,
906 ts_init: UnixNanos,
907) -> anyhow::Result<InstrumentAny> {
908 match msg.instrument_class as u8 as char {
909 'K' => Ok(InstrumentAny::Equity(decode_equity(
910 msg,
911 instrument_id,
912 ts_init,
913 )?)),
914 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
915 msg,
916 instrument_id,
917 ts_init,
918 )?)),
919 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
920 msg,
921 instrument_id,
922 ts_init,
923 )?)),
924 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
925 msg,
926 instrument_id,
927 ts_init,
928 )?)),
929 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
930 msg,
931 instrument_id,
932 ts_init,
933 )?)),
934 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
935 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
936 _ => anyhow::bail!(
937 "Unsupported `instrument_class` '{}'",
938 msg.instrument_class as u8 as char
939 ),
940 }
941}
942
943pub fn decode_equity(
944 msg: &dbn::InstrumentDefMsg,
945 instrument_id: InstrumentId,
946 ts_init: UnixNanos,
947) -> anyhow::Result<Equity> {
948 let currency = parse_currency_or_usd_default(msg.currency());
949 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
950 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
951 let ts_event = UnixNanos::from(msg.ts_recv); Ok(Equity::new(
954 instrument_id,
955 instrument_id.symbol,
956 None, currency,
958 price_increment.precision,
959 price_increment,
960 Some(lot_size),
961 None, None, None, None, None, None, None, None, ts_event,
970 ts_init,
971 ))
972}
973
974pub fn decode_futures_contract(
975 msg: &dbn::InstrumentDefMsg,
976 instrument_id: InstrumentId,
977 ts_init: UnixNanos,
978) -> anyhow::Result<FuturesContract> {
979 let currency = parse_currency_or_usd_default(msg.currency());
980 let exchange = Ustr::from(msg.exchange()?);
981 let underlying = Ustr::from(msg.asset()?);
982 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
983 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
984 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
985 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
986 let ts_event = UnixNanos::from(msg.ts_recv); FuturesContract::new_checked(
989 instrument_id,
990 instrument_id.symbol,
991 asset_class.unwrap_or(AssetClass::Commodity),
992 Some(exchange),
993 underlying,
994 msg.activation.into(),
995 msg.expiration.into(),
996 currency,
997 price_increment.precision,
998 price_increment,
999 multiplier,
1000 lot_size,
1001 None, None, None, None, None, None, None, None, ts_event,
1010 ts_init,
1011 )
1012}
1013
1014pub fn decode_futures_spread(
1015 msg: &dbn::InstrumentDefMsg,
1016 instrument_id: InstrumentId,
1017 ts_init: UnixNanos,
1018) -> anyhow::Result<FuturesSpread> {
1019 let exchange = Ustr::from(msg.exchange()?);
1020 let underlying = Ustr::from(msg.asset()?);
1021 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1022 let strategy_type = Ustr::from(msg.secsubtype()?);
1023 let currency = parse_currency_or_usd_default(msg.currency());
1024 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1025 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1026 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1027 let ts_event = UnixNanos::from(msg.ts_recv); FuturesSpread::new_checked(
1030 instrument_id,
1031 instrument_id.symbol,
1032 asset_class.unwrap_or(AssetClass::Commodity),
1033 Some(exchange),
1034 underlying,
1035 strategy_type,
1036 msg.activation.into(),
1037 msg.expiration.into(),
1038 currency,
1039 price_increment.precision,
1040 price_increment,
1041 multiplier,
1042 lot_size,
1043 None, None, None, None, None, None, None, None, ts_event,
1052 ts_init,
1053 )
1054}
1055
1056pub fn decode_option_contract(
1057 msg: &dbn::InstrumentDefMsg,
1058 instrument_id: InstrumentId,
1059 ts_init: UnixNanos,
1060) -> anyhow::Result<OptionContract> {
1061 let currency = parse_currency_or_usd_default(msg.currency());
1062 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1063 let exchange = Ustr::from(msg.exchange()?);
1064 let underlying = Ustr::from(msg.underlying()?);
1065 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1066 Some(AssetClass::Equity)
1067 } else {
1068 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1069 asset_class
1070 };
1071 let option_kind = parse_option_kind(msg.instrument_class)?;
1072 let strike_price = Price::from_raw(
1073 decode_raw_price_i64(msg.strike_price),
1074 strike_price_currency.precision,
1075 );
1076 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1077 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1078 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1079 let ts_event = UnixNanos::from(msg.ts_recv); OptionContract::new_checked(
1082 instrument_id,
1083 instrument_id.symbol,
1084 asset_class_opt.unwrap_or(AssetClass::Commodity),
1085 Some(exchange),
1086 underlying,
1087 option_kind,
1088 strike_price,
1089 currency,
1090 msg.activation.into(),
1091 msg.expiration.into(),
1092 price_increment.precision,
1093 price_increment,
1094 multiplier,
1095 lot_size,
1096 None, None, None, None, None, None, None, None, ts_event,
1105 ts_init,
1106 )
1107}
1108
1109pub fn decode_option_spread(
1110 msg: &dbn::InstrumentDefMsg,
1111 instrument_id: InstrumentId,
1112 ts_init: UnixNanos,
1113) -> anyhow::Result<OptionSpread> {
1114 let exchange = Ustr::from(msg.exchange()?);
1115 let underlying = Ustr::from(msg.underlying()?);
1116 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1117 Some(AssetClass::Equity)
1118 } else {
1119 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1120 asset_class
1121 };
1122 let strategy_type = Ustr::from(msg.secsubtype()?);
1123 let currency = parse_currency_or_usd_default(msg.currency());
1124 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1125 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1126 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1127 let ts_event = UnixNanos::from(msg.ts_recv); OptionSpread::new_checked(
1130 instrument_id,
1131 instrument_id.symbol,
1132 asset_class_opt.unwrap_or(AssetClass::Commodity),
1133 Some(exchange),
1134 underlying,
1135 strategy_type,
1136 msg.activation.into(),
1137 msg.expiration.into(),
1138 currency,
1139 price_increment.precision,
1140 price_increment,
1141 multiplier,
1142 lot_size,
1143 None, None, None, None, None, None, None, None, ts_event,
1152 ts_init,
1153 )
1154}
1155
1156pub fn decode_imbalance_msg(
1157 msg: &dbn::ImbalanceMsg,
1158 instrument_id: InstrumentId,
1159 price_precision: u8,
1160 ts_init: UnixNanos,
1161) -> anyhow::Result<DatabentoImbalance> {
1162 DatabentoImbalance::new(
1163 instrument_id,
1164 Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1165 Price::from_raw(
1166 decode_raw_price_i64(msg.cont_book_clr_price),
1167 price_precision,
1168 ),
1169 Price::from_raw(
1170 decode_raw_price_i64(msg.auct_interest_clr_price),
1171 price_precision,
1172 ),
1173 Quantity::new(f64::from(msg.paired_qty), 0),
1174 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1175 parse_order_side(msg.side),
1176 msg.significant_imbalance as c_char,
1177 msg.hd.ts_event.into(),
1178 msg.ts_recv.into(),
1179 ts_init,
1180 )
1181}
1182
1183pub fn decode_statistics_msg(
1184 msg: &dbn::StatMsg,
1185 instrument_id: InstrumentId,
1186 price_precision: u8,
1187 ts_init: UnixNanos,
1188) -> anyhow::Result<DatabentoStatistics> {
1189 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1190 .expect("Invalid value for `stat_type`");
1191 let update_action = DatabentoStatisticUpdateAction::from_u8(msg.update_action)
1192 .expect("Invalid value for `update_action`");
1193
1194 DatabentoStatistics::new(
1195 instrument_id,
1196 stat_type,
1197 update_action,
1198 decode_optional_price(msg.price, price_precision),
1199 decode_optional_quantity(msg.quantity),
1200 msg.channel_id,
1201 msg.stat_flags,
1202 msg.sequence,
1203 msg.ts_ref.into(),
1204 msg.ts_in_delta,
1205 msg.hd.ts_event.into(),
1206 msg.ts_recv.into(),
1207 ts_init,
1208 )
1209}
1210
1211#[cfg(test)]
1215mod tests {
1216 use std::path::{Path, PathBuf};
1217
1218 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1219 use fallible_streaming_iterator::FallibleStreamingIterator;
1220 use rstest::*;
1221
1222 use super::*;
1223
1224 fn test_data_path() -> PathBuf {
1225 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1226 }
1227
1228 #[rstest]
1229 #[case('Y' as c_char, Some(true))]
1230 #[case('N' as c_char, Some(false))]
1231 #[case('X' as c_char, None)]
1232 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1233 assert_eq!(parse_optional_bool(input), expected);
1234 }
1235
1236 #[rstest]
1237 #[case('A' as c_char, OrderSide::Sell)]
1238 #[case('B' as c_char, OrderSide::Buy)]
1239 #[case('X' as c_char, OrderSide::NoOrderSide)]
1240 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1241 assert_eq!(parse_order_side(input), expected);
1242 }
1243
1244 #[rstest]
1245 #[case('A' as c_char, AggressorSide::Seller)]
1246 #[case('B' as c_char, AggressorSide::Buyer)]
1247 #[case('X' as c_char, AggressorSide::NoAggressor)]
1248 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1249 assert_eq!(parse_aggressor_side(input), expected);
1250 }
1251
1252 #[rstest]
1253 #[case('A' as c_char, Ok(BookAction::Add))]
1254 #[case('C' as c_char, Ok(BookAction::Delete))]
1255 #[case('F' as c_char, Ok(BookAction::Update))]
1256 #[case('M' as c_char, Ok(BookAction::Update))]
1257 #[case('R' as c_char, Ok(BookAction::Clear))]
1258 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1259 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1260 match parse_book_action(input) {
1261 Ok(action) => assert_eq!(Ok(action), expected),
1262 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1263 }
1264 }
1265
1266 #[rstest]
1267 #[case('C' as c_char, Ok(OptionKind::Call))]
1268 #[case('P' as c_char, Ok(OptionKind::Put))]
1269 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1270 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1271 match parse_option_kind(input) {
1272 Ok(kind) => assert_eq!(Ok(kind), expected),
1273 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1274 }
1275 }
1276
1277 #[rstest]
1278 #[case(Ok("USD"), Currency::USD())]
1279 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1280 #[case(Ok(""), Currency::USD())]
1281 #[case(Err("Error"), Currency::USD())]
1282 fn test_parse_currency_or_usd_default(
1283 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1285 ) {
1286 let actual = parse_currency_or_usd_default(
1287 input.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
1288 );
1289 assert_eq!(actual, expected);
1290 }
1291
1292 #[rstest]
1293 #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1294 #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1295 #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1296 #[case("XXX", Ok((None, None)))]
1297 #[case("D", Err("Value string is too short"))]
1298 fn test_parse_cfi_iso10926(
1299 #[case] input: &str,
1300 #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1301 ) {
1302 match parse_cfi_iso10926(input) {
1303 Ok(result) => assert_eq!(Ok(result), expected),
1304 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1305 }
1306 }
1307
1308 #[rstest]
1309 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1313 let actual = decode_price_increment(value, precision);
1314 assert_eq!(actual, expected);
1315 }
1316
1317 #[rstest]
1318 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] fn test_decode_optional_price(
1322 #[case] value: i64,
1323 #[case] precision: u8,
1324 #[case] expected: Option<Price>,
1325 ) {
1326 let actual = decode_optional_price(value, precision);
1327 assert_eq!(actual, expected);
1328 }
1329
1330 #[rstest]
1331 #[case(i32::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i32, #[case] expected: Option<Quantity>) {
1335 let actual = decode_optional_quantity(value);
1336 assert_eq!(actual, expected);
1337 }
1338
1339 #[rstest]
1340 fn test_decode_mbo_msg() {
1341 let path = test_data_path().join("test_data.mbo.dbn.zst");
1342 let mut dbn_stream = Decoder::from_zstd_file(path)
1343 .unwrap()
1344 .decode_stream::<dbn::MboMsg>();
1345 let msg = dbn_stream.next().unwrap().unwrap();
1346
1347 let instrument_id = InstrumentId::from("ESM4.GLBX");
1348 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, 0.into(), false).unwrap();
1349 let delta = delta.unwrap();
1350
1351 assert_eq!(delta.instrument_id, instrument_id);
1352 assert_eq!(delta.action, BookAction::Delete);
1353 assert_eq!(delta.order.side, OrderSide::Sell);
1354 assert_eq!(delta.order.price, Price::from("3722.75"));
1355 assert_eq!(delta.order.size, Quantity::from("1"));
1356 assert_eq!(delta.order.order_id, 647_784_973_705);
1357 assert_eq!(delta.flags, 128);
1358 assert_eq!(delta.sequence, 1_170_352);
1359 assert_eq!(delta.ts_event, msg.ts_recv);
1360 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1361 assert_eq!(delta.ts_init, 0);
1362 }
1363
1364 #[rstest]
1365 fn test_decode_mbp1_msg() {
1366 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1367 let mut dbn_stream = Decoder::from_zstd_file(path)
1368 .unwrap()
1369 .decode_stream::<dbn::Mbp1Msg>();
1370 let msg = dbn_stream.next().unwrap().unwrap();
1371
1372 let instrument_id = InstrumentId::from("ESM4.GLBX");
1373 let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, 0.into(), false).unwrap();
1374
1375 assert_eq!(quote.instrument_id, instrument_id);
1376 assert_eq!(quote.bid_price, Price::from("3720.25"));
1377 assert_eq!(quote.ask_price, Price::from("3720.50"));
1378 assert_eq!(quote.bid_size, Quantity::from("24"));
1379 assert_eq!(quote.ask_size, Quantity::from("11"));
1380 assert_eq!(quote.ts_event, msg.ts_recv);
1381 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1382 assert_eq!(quote.ts_init, 0);
1383 }
1384
1385 #[rstest]
1386 fn test_decode_bbo_1s_msg() {
1387 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1388 let mut dbn_stream = Decoder::from_zstd_file(path)
1389 .unwrap()
1390 .decode_stream::<dbn::BboMsg>();
1391 let msg = dbn_stream.next().unwrap().unwrap();
1392
1393 let instrument_id = InstrumentId::from("ESM4.GLBX");
1394 let quote = decode_bbo_msg(msg, instrument_id, 2, 0.into()).unwrap();
1395
1396 assert_eq!(quote.instrument_id, instrument_id);
1397 assert_eq!(quote.bid_price, Price::from("5199.50"));
1398 assert_eq!(quote.ask_price, Price::from("5199.75"));
1399 assert_eq!(quote.bid_size, Quantity::from("26"));
1400 assert_eq!(quote.ask_size, Quantity::from("23"));
1401 assert_eq!(quote.ts_event, msg.ts_recv);
1402 assert_eq!(quote.ts_event, 1715248801000000000);
1403 assert_eq!(quote.ts_init, 0);
1404 }
1405
1406 #[rstest]
1407 fn test_decode_bbo_1m_msg() {
1408 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1409 let mut dbn_stream = Decoder::from_zstd_file(path)
1410 .unwrap()
1411 .decode_stream::<dbn::BboMsg>();
1412 let msg = dbn_stream.next().unwrap().unwrap();
1413
1414 let instrument_id = InstrumentId::from("ESM4.GLBX");
1415 let quote = decode_bbo_msg(msg, instrument_id, 2, 0.into()).unwrap();
1416
1417 assert_eq!(quote.instrument_id, instrument_id);
1418 assert_eq!(quote.bid_price, Price::from("5199.50"));
1419 assert_eq!(quote.ask_price, Price::from("5199.75"));
1420 assert_eq!(quote.bid_size, Quantity::from("33"));
1421 assert_eq!(quote.ask_size, Quantity::from("17"));
1422 assert_eq!(quote.ts_event, msg.ts_recv);
1423 assert_eq!(quote.ts_event, 1715248800000000000);
1424 assert_eq!(quote.ts_init, 0);
1425 }
1426
1427 #[rstest]
1428 fn test_decode_mbp10_msg() {
1429 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1430 let mut dbn_stream = Decoder::from_zstd_file(path)
1431 .unwrap()
1432 .decode_stream::<dbn::Mbp10Msg>();
1433 let msg = dbn_stream.next().unwrap().unwrap();
1434
1435 let instrument_id = InstrumentId::from("ESM4.GLBX");
1436 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, 0.into()).unwrap();
1437
1438 assert_eq!(depth10.instrument_id, instrument_id);
1439 assert_eq!(depth10.bids.len(), 10);
1440 assert_eq!(depth10.asks.len(), 10);
1441 assert_eq!(depth10.bid_counts.len(), 10);
1442 assert_eq!(depth10.ask_counts.len(), 10);
1443 assert_eq!(depth10.flags, 128);
1444 assert_eq!(depth10.sequence, 1_170_352);
1445 assert_eq!(depth10.ts_event, msg.ts_recv);
1446 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1447 assert_eq!(depth10.ts_init, 0);
1448 }
1449
1450 #[rstest]
1451 fn test_decode_trade_msg() {
1452 let path = test_data_path().join("test_data.trades.dbn.zst");
1453 let mut dbn_stream = Decoder::from_zstd_file(path)
1454 .unwrap()
1455 .decode_stream::<dbn::TradeMsg>();
1456 let msg = dbn_stream.next().unwrap().unwrap();
1457
1458 let instrument_id = InstrumentId::from("ESM4.GLBX");
1459 let trade = decode_trade_msg(msg, instrument_id, 2, 0.into()).unwrap();
1460
1461 assert_eq!(trade.instrument_id, instrument_id);
1462 assert_eq!(trade.price, Price::from("3720.25"));
1463 assert_eq!(trade.size, Quantity::from("5"));
1464 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1465 assert_eq!(trade.trade_id.to_string(), "1170380");
1466 assert_eq!(trade.ts_event, msg.ts_recv);
1467 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1468 assert_eq!(trade.ts_init, 0);
1469 }
1470
1471 #[rstest]
1472 fn test_decode_tbbo_msg() {
1473 let path = test_data_path().join("test_data.tbbo.dbn.zst");
1474 let mut dbn_stream = Decoder::from_zstd_file(path)
1475 .unwrap()
1476 .decode_stream::<dbn::Mbp1Msg>();
1477 let msg = dbn_stream.next().unwrap().unwrap();
1478
1479 let instrument_id = InstrumentId::from("ESM4.GLBX");
1480 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, 0.into()).unwrap();
1481
1482 assert_eq!(quote.instrument_id, instrument_id);
1483 assert_eq!(quote.bid_price, Price::from("3720.25"));
1484 assert_eq!(quote.ask_price, Price::from("3720.50"));
1485 assert_eq!(quote.bid_size, Quantity::from("26"));
1486 assert_eq!(quote.ask_size, Quantity::from("7"));
1487 assert_eq!(quote.ts_event, msg.ts_recv);
1488 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1489 assert_eq!(quote.ts_init, 0);
1490
1491 assert_eq!(trade.instrument_id, instrument_id);
1492 assert_eq!(trade.price, Price::from("3720.25"));
1493 assert_eq!(trade.size, Quantity::from("5"));
1494 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1495 assert_eq!(trade.trade_id.to_string(), "1170380");
1496 assert_eq!(trade.ts_event, msg.ts_recv);
1497 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1498 assert_eq!(trade.ts_init, 0);
1499 }
1500
1501 #[ignore] #[rstest]
1503 fn test_decode_ohlcv_msg() {
1504 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1505 let mut dbn_stream = Decoder::from_zstd_file(path)
1506 .unwrap()
1507 .decode_stream::<dbn::OhlcvMsg>();
1508 let msg = dbn_stream.next().unwrap().unwrap();
1509
1510 let instrument_id = InstrumentId::from("ESM4.GLBX");
1511 let bar = decode_ohlcv_msg(msg, instrument_id, 2, 0.into()).unwrap();
1512
1513 assert_eq!(
1514 bar.bar_type,
1515 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1516 );
1517 assert_eq!(bar.open, Price::from("3720.25"));
1518 assert_eq!(bar.high, Price::from("3720.50"));
1519 assert_eq!(bar.low, Price::from("3720.25"));
1520 assert_eq!(bar.close, Price::from("3720.50"));
1521 assert_eq!(bar.ts_event, 1_609_160_400_000_000_000);
1522 assert_eq!(bar.ts_init, 1_609_160_401_000_000_000); }
1524
1525 #[rstest]
1526 fn test_decode_definition_msg() {
1527 let path = test_data_path().join("test_data.definition.dbn.zst");
1528 let mut dbn_stream = Decoder::from_zstd_file(path)
1529 .unwrap()
1530 .decode_stream::<dbn::InstrumentDefMsg>();
1531 let msg = dbn_stream.next().unwrap().unwrap();
1532
1533 let instrument_id = InstrumentId::from("ESM4.GLBX");
1534 let result = decode_instrument_def_msg(msg, instrument_id, 0.into());
1535
1536 assert!(result.is_ok());
1537 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1538 }
1539
1540 #[rstest]
1541 fn test_decode_definition_v1_msg() {
1542 let path = test_data_path().join("test_data.definition.v1.dbn.zst");
1543 let mut dbn_stream = Decoder::from_zstd_file(path)
1544 .unwrap()
1545 .decode_stream::<dbn::compat::InstrumentDefMsgV1>();
1546 let msg = dbn_stream.next().unwrap().unwrap();
1547
1548 let instrument_id = InstrumentId::from("ESM4.GLBX");
1549 let result = decode_instrument_def_msg_v1(msg, instrument_id, 0.into());
1550
1551 assert!(result.is_ok());
1552 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1553 }
1554
1555 #[rstest]
1556 fn test_decode_status_msg() {
1557 let path = test_data_path().join("test_data.status.dbn.zst");
1558 let mut dbn_stream = Decoder::from_zstd_file(path)
1559 .unwrap()
1560 .decode_stream::<dbn::StatusMsg>();
1561 let msg = dbn_stream.next().unwrap().unwrap();
1562
1563 let instrument_id = InstrumentId::from("ESM4.GLBX");
1564 let status = decode_status_msg(msg, instrument_id, 0.into()).unwrap();
1565
1566 assert_eq!(status.instrument_id, instrument_id);
1567 assert_eq!(status.action, MarketStatusAction::Trading);
1568 assert_eq!(status.ts_event, msg.hd.ts_event);
1569 assert_eq!(status.ts_init, 0);
1570 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1571 assert_eq!(status.trading_event, None);
1572 assert_eq!(status.is_trading, Some(true));
1573 assert_eq!(status.is_quoting, Some(true));
1574 assert_eq!(status.is_short_sell_restricted, None);
1575 }
1576
1577 #[rstest]
1578 fn test_decode_imbalance_msg() {
1579 let path = test_data_path().join("test_data.imbalance.dbn.zst");
1580 let mut dbn_stream = Decoder::from_zstd_file(path)
1581 .unwrap()
1582 .decode_stream::<dbn::ImbalanceMsg>();
1583 let msg = dbn_stream.next().unwrap().unwrap();
1584
1585 let instrument_id = InstrumentId::from("ESM4.GLBX");
1586 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, 0.into()).unwrap();
1587
1588 assert_eq!(imbalance.instrument_id, instrument_id);
1589 assert_eq!(imbalance.ref_price, Price::from("229.43"));
1590 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1591 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1592 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1593 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1594 assert_eq!(imbalance.side, OrderSide::Buy);
1595 assert_eq!(imbalance.significant_imbalance, 126);
1596 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1597 assert_eq!(imbalance.ts_recv, msg.ts_recv);
1598 assert_eq!(imbalance.ts_init, 0);
1599 }
1600
1601 #[rstest]
1602 fn test_decode_statistics_msg() {
1603 let path = test_data_path().join("test_data.statistics.dbn.zst");
1604 let mut dbn_stream = Decoder::from_zstd_file(path)
1605 .unwrap()
1606 .decode_stream::<dbn::StatMsg>();
1607 let msg = dbn_stream.next().unwrap().unwrap();
1608
1609 let instrument_id = InstrumentId::from("ESM4.GLBX");
1610 let statistics = decode_statistics_msg(msg, instrument_id, 2, 0.into()).unwrap();
1611
1612 assert_eq!(statistics.instrument_id, instrument_id);
1613 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1614 assert_eq!(
1615 statistics.update_action,
1616 DatabentoStatisticUpdateAction::Added
1617 );
1618 assert_eq!(statistics.price, Some(Price::from("100.00")));
1619 assert_eq!(statistics.quantity, None);
1620 assert_eq!(statistics.channel_id, 13);
1621 assert_eq!(statistics.stat_flags, 255);
1622 assert_eq!(statistics.sequence, 2);
1623 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1624 assert_eq!(statistics.ts_in_delta, 26961);
1625 assert_eq!(statistics.ts_event, msg.hd.ts_event);
1626 assert_eq!(statistics.ts_recv, msg.ts_recv);
1627 assert_eq!(statistics.ts_init, 0);
1628 }
1629}