1use std::{cmp, ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24 },
25 enums::{
26 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28 },
29 identifiers::{InstrumentId, TradeId},
30 instruments::{
31 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32 },
33 types::{Currency, Price, Quantity, price::decode_raw_price_i64},
34};
35use ustr::Ustr;
36
37use super::{
38 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
39 types::{DatabentoImbalance, DatabentoStatistics},
40};
41
42const DATABENTO_FIXED_SCALAR: f64 = 1_000_000_000.0;
43
44const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
46
47const BAR_SPEC_1S: BarSpecification = BarSpecification {
48 step: STEP_ONE,
49 aggregation: BarAggregation::Second,
50 price_type: PriceType::Last,
51};
52const BAR_SPEC_1M: BarSpecification = BarSpecification {
53 step: STEP_ONE,
54 aggregation: BarAggregation::Minute,
55 price_type: PriceType::Last,
56};
57const BAR_SPEC_1H: BarSpecification = BarSpecification {
58 step: STEP_ONE,
59 aggregation: BarAggregation::Hour,
60 price_type: PriceType::Last,
61};
62const BAR_SPEC_1D: BarSpecification = BarSpecification {
63 step: STEP_ONE,
64 aggregation: BarAggregation::Day,
65 price_type: PriceType::Last,
66};
67
68const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
69const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
70const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
71const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
72
73#[must_use]
74pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
75 match c as u8 as char {
76 'Y' => Some(true),
77 'N' => Some(false),
78 _ => None,
79 }
80}
81
82#[must_use]
83pub const fn parse_order_side(c: c_char) -> OrderSide {
84 match c as u8 as char {
85 'A' => OrderSide::Sell,
86 'B' => OrderSide::Buy,
87 _ => OrderSide::NoOrderSide,
88 }
89}
90
91#[must_use]
92pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
93 match c as u8 as char {
94 'A' => AggressorSide::Seller,
95 'B' => AggressorSide::Buyer,
96 _ => AggressorSide::NoAggressor,
97 }
98}
99
100pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
104 match c as u8 as char {
105 'A' => Ok(BookAction::Add),
106 'C' => Ok(BookAction::Delete),
107 'F' => Ok(BookAction::Update),
108 'M' => Ok(BookAction::Update),
109 'R' => Ok(BookAction::Clear),
110 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
111 }
112}
113
114pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
118 match c as u8 as char {
119 'C' => Ok(OptionKind::Call),
120 'P' => Ok(OptionKind::Put),
121 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
122 }
123}
124
125fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
126 match value {
127 Ok(value) if !value.is_empty() => {
128 Currency::try_from_str(value).unwrap_or_else(Currency::USD)
129 }
130 Ok(_) => Currency::USD(),
131 Err(e) => {
132 log::error!("Error parsing currency: {e}");
133 Currency::USD()
134 }
135 }
136}
137
138pub fn parse_cfi_iso10926(
142 value: &str,
143) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
144 let chars: Vec<char> = value.chars().collect();
145 if chars.len() < 3 {
146 anyhow::bail!("Value string is too short");
147 }
148
149 let cfi_category = chars[0];
151 let cfi_group = chars[1];
152 let cfi_attribute1 = chars[2];
153 let mut asset_class = match cfi_category {
158 'D' => Some(AssetClass::Debt),
159 'E' => Some(AssetClass::Equity),
160 'S' => None,
161 _ => None,
162 };
163
164 let instrument_class = match cfi_group {
165 'I' => Some(InstrumentClass::Future),
166 _ => None,
167 };
168
169 if cfi_attribute1 == 'I' {
170 asset_class = Some(AssetClass::Index);
171 }
172
173 Ok((asset_class, instrument_class))
174}
175
176pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
181 let value_str = match value {
182 0 => return Ok(None),
183 1 => "Scheduled",
184 2 => "Surveillance intervention",
185 3 => "Market event",
186 4 => "Instrument activation",
187 5 => "Instrument expiration",
188 6 => "Recovery in process",
189 10 => "Regulatory",
190 11 => "Administrative",
191 12 => "Non-compliance",
192 13 => "Filings not current",
193 14 => "SEC trading suspension",
194 15 => "New issue",
195 16 => "Issue available",
196 17 => "Issues reviewed",
197 18 => "Filing requirements satisfied",
198 30 => "News pending",
199 31 => "News released",
200 32 => "News and resumption times",
201 33 => "News not forthcoming",
202 40 => "Order imbalance",
203 50 => "LULD pause",
204 60 => "Operational",
205 70 => "Additional information requested",
206 80 => "Merger effective",
207 90 => "ETF",
208 100 => "Corporate action",
209 110 => "New Security offering",
210 120 => "Market wide halt level 1",
211 121 => "Market wide halt level 2",
212 122 => "Market wide halt level 3",
213 123 => "Market wide halt carryover",
214 124 => "Market wide halt resumption",
215 130 => "Quotation not available",
216 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
217 };
218
219 Ok(Some(Ustr::from(value_str)))
220}
221
222pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
226 let value_str = match value {
227 0 => return Ok(None),
228 1 => "No cancel",
229 2 => "Change trading session",
230 3 => "Implied matching on",
231 4 => "Implied matching off",
232 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
233 };
234
235 Ok(Some(Ustr::from(value_str)))
236}
237
238#[must_use]
240pub fn decode_price(value: i64, precision: u8) -> Price {
241 Price::from_raw(decode_raw_price_i64(value), precision)
242}
243
244#[must_use]
246pub fn decode_quantity(value: u64) -> Quantity {
247 Quantity::from(value)
248}
249
250#[must_use]
252pub fn decode_price_increment(value: i64, precision: u8) -> Price {
253 match value {
254 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
255 _ => decode_price(value, precision),
256 }
257}
258
259#[must_use]
261pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
262 match value {
263 i64::MAX => None,
264 _ => Some(decode_price(value, precision)),
265 }
266}
267
268#[must_use]
270pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
271 match value {
272 i64::MAX => None,
273 _ => Some(Quantity::from(value)),
274 }
275}
276
277#[must_use]
279pub fn decode_multiplier(value: i64) -> Quantity {
280 match value {
281 0 | i64::MAX => Quantity::from(1),
282 _ => Quantity::from(format!("{}", value as f64 / DATABENTO_FIXED_SCALAR)),
283 }
284}
285
286#[must_use]
288pub fn decode_lot_size(value: i32) -> Quantity {
289 match value {
290 0 | i32::MAX => Quantity::from(1),
291 value => Quantity::from(value),
292 }
293}
294
295#[must_use]
296fn is_trade_msg(order_side: OrderSide, action: c_char) -> bool {
297 order_side == OrderSide::NoOrderSide || action as u8 as char == 'T'
298}
299
300pub fn decode_mbo_msg(
304 msg: &dbn::MboMsg,
305 instrument_id: InstrumentId,
306 price_precision: u8,
307 ts_init: Option<UnixNanos>,
308 include_trades: bool,
309) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
310 let side = parse_order_side(msg.side);
311 if is_trade_msg(side, msg.action) {
312 if include_trades {
313 let ts_event = msg.ts_recv.into();
314 let ts_init = ts_init.unwrap_or(ts_event);
315
316 let trade = TradeTick::new(
317 instrument_id,
318 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
319 Quantity::from(msg.size),
320 parse_aggressor_side(msg.side),
321 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
322 ts_event,
323 ts_init,
324 );
325 return Ok((None, Some(trade)));
326 }
327
328 return Ok((None, None));
329 }
330
331 let order = BookOrder::new(
332 side,
333 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
334 Quantity::from(msg.size),
335 msg.order_id,
336 );
337 let ts_event = msg.ts_recv.into();
338 let ts_init = ts_init.unwrap_or(ts_event);
339
340 let delta = OrderBookDelta::new(
341 instrument_id,
342 parse_book_action(msg.action)?,
343 order,
344 msg.flags.raw(),
345 msg.sequence.into(),
346 ts_event,
347 ts_init,
348 );
349
350 Ok((Some(delta), None))
351}
352
353pub fn decode_trade_msg(
357 msg: &dbn::TradeMsg,
358 instrument_id: InstrumentId,
359 price_precision: u8,
360 ts_init: Option<UnixNanos>,
361) -> anyhow::Result<TradeTick> {
362 let ts_event = msg.ts_recv.into();
363 let ts_init = ts_init.unwrap_or(ts_event);
364
365 let trade = TradeTick::new(
366 instrument_id,
367 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
368 Quantity::from(msg.size),
369 parse_aggressor_side(msg.side),
370 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
371 ts_event,
372 ts_init,
373 );
374
375 Ok(trade)
376}
377
378pub fn decode_tbbo_msg(
382 msg: &dbn::TbboMsg,
383 instrument_id: InstrumentId,
384 price_precision: u8,
385 ts_init: Option<UnixNanos>,
386) -> anyhow::Result<(QuoteTick, TradeTick)> {
387 let top_level = &msg.levels[0];
388 let ts_event = msg.ts_recv.into();
389 let ts_init = ts_init.unwrap_or(ts_event);
390
391 let quote = QuoteTick::new(
392 instrument_id,
393 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
394 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
395 Quantity::from(top_level.bid_sz),
396 Quantity::from(top_level.ask_sz),
397 ts_event,
398 ts_init,
399 );
400
401 let trade = TradeTick::new(
402 instrument_id,
403 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
404 Quantity::from(msg.size),
405 parse_aggressor_side(msg.side),
406 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
407 ts_event,
408 ts_init,
409 );
410
411 Ok((quote, trade))
412}
413
414pub fn decode_mbp1_msg(
418 msg: &dbn::Mbp1Msg,
419 instrument_id: InstrumentId,
420 price_precision: u8,
421 ts_init: Option<UnixNanos>,
422 include_trades: bool,
423) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
424 let top_level = &msg.levels[0];
425 let ts_event = msg.ts_recv.into();
426 let ts_init = ts_init.unwrap_or(ts_event);
427
428 let quote = QuoteTick::new(
429 instrument_id,
430 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
431 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
432 Quantity::from(top_level.bid_sz),
433 Quantity::from(top_level.ask_sz),
434 ts_event,
435 ts_init,
436 );
437
438 let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
439 Some(TradeTick::new(
440 instrument_id,
441 Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
442 Quantity::from(msg.size),
443 parse_aggressor_side(msg.side),
444 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
445 ts_event,
446 ts_init,
447 ))
448 } else {
449 None
450 };
451
452 Ok((quote, maybe_trade))
453}
454
455pub fn decode_bbo_msg(
459 msg: &dbn::BboMsg,
460 instrument_id: InstrumentId,
461 price_precision: u8,
462 ts_init: Option<UnixNanos>,
463) -> anyhow::Result<QuoteTick> {
464 let top_level = &msg.levels[0];
465 let ts_event = msg.ts_recv.into();
466 let ts_init = ts_init.unwrap_or(ts_event);
467
468 let quote = QuoteTick::new(
469 instrument_id,
470 Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
471 Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
472 Quantity::from(top_level.bid_sz),
473 Quantity::from(top_level.ask_sz),
474 ts_event,
475 ts_init,
476 );
477
478 Ok(quote)
479}
480
481pub fn decode_mbp10_msg(
489 msg: &dbn::Mbp10Msg,
490 instrument_id: InstrumentId,
491 price_precision: u8,
492 ts_init: Option<UnixNanos>,
493) -> anyhow::Result<OrderBookDepth10> {
494 let mut bids = Vec::with_capacity(DEPTH10_LEN);
495 let mut asks = Vec::with_capacity(DEPTH10_LEN);
496 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
497 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
498
499 for level in &msg.levels {
500 let bid_order = BookOrder::new(
501 OrderSide::Buy,
502 Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
503 Quantity::from(level.bid_sz),
504 0,
505 );
506
507 let ask_order = BookOrder::new(
508 OrderSide::Sell,
509 Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
510 Quantity::from(level.ask_sz),
511 0,
512 );
513
514 bids.push(bid_order);
515 asks.push(ask_order);
516 bid_counts.push(level.bid_ct);
517 ask_counts.push(level.ask_ct);
518 }
519
520 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().expect("`bids` length != 10");
521 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().expect("`asks` length != 10");
522 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().expect("`bid_counts` length != 10");
523 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().expect("`ask_counts` length != 10");
524 let ts_event = msg.ts_recv.into();
525 let ts_init = ts_init.unwrap_or(ts_event);
526
527 let depth = OrderBookDepth10::new(
528 instrument_id,
529 bids,
530 asks,
531 bid_counts,
532 ask_counts,
533 msg.flags.raw(),
534 msg.sequence.into(),
535 ts_event,
536 ts_init,
537 );
538
539 Ok(depth)
540}
541
542pub fn decode_bar_type(
546 msg: &dbn::OhlcvMsg,
547 instrument_id: InstrumentId,
548) -> anyhow::Result<BarType> {
549 let bar_type = match msg.hd.rtype {
550 32 => {
551 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
553 }
554 33 => {
555 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
557 }
558 34 => {
559 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
561 }
562 35 => {
563 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
565 }
566 _ => anyhow::bail!(
567 "`rtype` is not a supported bar aggregation, was {}",
568 msg.hd.rtype
569 ),
570 };
571
572 Ok(bar_type)
573}
574
575pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
579 let adjustment = match msg.hd.rtype {
580 32 => {
581 BAR_CLOSE_ADJUSTMENT_1S
583 }
584 33 => {
585 BAR_CLOSE_ADJUSTMENT_1M
587 }
588 34 => {
589 BAR_CLOSE_ADJUSTMENT_1H
591 }
592 35 => {
593 BAR_CLOSE_ADJUSTMENT_1D
595 }
596 _ => anyhow::bail!(
597 "`rtype` is not a supported bar aggregation, was {}",
598 msg.hd.rtype
599 ),
600 };
601
602 Ok(adjustment.into())
603}
604
605pub fn decode_ohlcv_msg(
609 msg: &dbn::OhlcvMsg,
610 instrument_id: InstrumentId,
611 price_precision: u8,
612 ts_init: Option<UnixNanos>,
613 timestamp_on_close: bool,
614) -> anyhow::Result<Bar> {
615 let bar_type = decode_bar_type(msg, instrument_id)?;
616 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
617
618 let ts_event_raw = msg.hd.ts_event.into();
619 let ts_init_raw = ts_init.unwrap_or(ts_event_raw);
620
621 let (ts_event, ts_init) = if timestamp_on_close {
622 let ts_close = cmp::max(ts_init_raw, ts_event_raw) + ts_event_adjustment;
624 (ts_close, ts_close)
625 } else {
626 (ts_event_raw, ts_event_raw)
628 };
629
630 let bar = Bar::new(
631 bar_type,
632 Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
633 Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
634 Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
635 Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
636 Quantity::from(msg.volume),
637 ts_event,
638 ts_init,
639 );
640
641 Ok(bar)
642}
643
644pub fn decode_status_msg(
652 msg: &dbn::StatusMsg,
653 instrument_id: InstrumentId,
654 ts_init: Option<UnixNanos>,
655) -> anyhow::Result<InstrumentStatus> {
656 let ts_event = msg.hd.ts_event.into();
657 let ts_init = ts_init.unwrap_or(ts_event);
658
659 let status = InstrumentStatus::new(
660 instrument_id,
661 MarketStatusAction::from_u16(msg.action).expect("Invalid `MarketStatusAction`"),
662 ts_event,
663 ts_init,
664 parse_status_reason(msg.reason)?,
665 parse_status_trading_event(msg.trading_event)?,
666 parse_optional_bool(msg.is_trading),
667 parse_optional_bool(msg.is_quoting),
668 parse_optional_bool(msg.is_short_sell_restricted),
669 );
670
671 Ok(status)
672}
673
674pub fn decode_record(
678 record: &dbn::RecordRef,
679 instrument_id: InstrumentId,
680 price_precision: u8,
681 ts_init: Option<UnixNanos>,
682 include_trades: bool,
683 bars_timestamp_on_close: bool,
684) -> anyhow::Result<(Option<Data>, Option<Data>)> {
685 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
689 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
690 let result = decode_mbo_msg(
691 msg,
692 instrument_id,
693 price_precision,
694 Some(ts_init),
695 include_trades,
696 )?;
697 match result {
698 (Some(delta), None) => (Some(Data::Delta(delta)), None),
699 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
700 (None, None) => (None, None),
701 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
702 }
703 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
704 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
705 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
706 (Some(Data::Trade(trade)), None)
707 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
708 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
709 let result = decode_mbp1_msg(
710 msg,
711 instrument_id,
712 price_precision,
713 Some(ts_init),
714 include_trades,
715 )?;
716 match result {
717 (quote, None) => (Some(Data::Quote(quote)), None),
718 (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
719 }
720 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
721 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
722 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
723 (Some(Data::Quote(quote)), None)
724 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
725 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
726 let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
727 (Some(Data::Quote(quote)), None)
728 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
729 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
730 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
731 (Some(Data::from(depth)), None)
732 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
733 let ts_init = determine_timestamp(ts_init, msg.hd.ts_event.into());
734 let bar = decode_ohlcv_msg(
735 msg,
736 instrument_id,
737 price_precision,
738 Some(ts_init),
739 bars_timestamp_on_close,
740 )?;
741 (Some(Data::Bar(bar)), None)
742 } else {
743 anyhow::bail!("DBN message type is not currently supported")
744 };
745
746 Ok(result)
747}
748
749const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
750 match ts_init {
751 Some(ts_init) => ts_init,
752 None => msg_timestamp,
753 }
754}
755
756pub fn decode_instrument_def_msg(
760 msg: &dbn::InstrumentDefMsg,
761 instrument_id: InstrumentId,
762 ts_init: Option<UnixNanos>,
763) -> anyhow::Result<InstrumentAny> {
764 match msg.instrument_class as u8 as char {
765 'K' => Ok(InstrumentAny::Equity(decode_equity(
766 msg,
767 instrument_id,
768 ts_init,
769 )?)),
770 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
771 msg,
772 instrument_id,
773 ts_init,
774 )?)),
775 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
776 msg,
777 instrument_id,
778 ts_init,
779 )?)),
780 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
781 msg,
782 instrument_id,
783 ts_init,
784 )?)),
785 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
786 msg,
787 instrument_id,
788 ts_init,
789 )?)),
790 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
791 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
792 _ => anyhow::bail!(
793 "Unsupported `instrument_class` '{}'",
794 msg.instrument_class as u8 as char
795 ),
796 }
797}
798
799pub fn decode_equity(
803 msg: &dbn::InstrumentDefMsg,
804 instrument_id: InstrumentId,
805 ts_init: Option<UnixNanos>,
806) -> anyhow::Result<Equity> {
807 let currency = parse_currency_or_usd_default(msg.currency());
808 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
809 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
810 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
812
813 Ok(Equity::new(
814 instrument_id,
815 instrument_id.symbol,
816 None, currency,
818 price_increment.precision,
819 price_increment,
820 Some(lot_size),
821 None, None, None, None, None, None, None, None, ts_event,
830 ts_init,
831 ))
832}
833
834pub fn decode_futures_contract(
838 msg: &dbn::InstrumentDefMsg,
839 instrument_id: InstrumentId,
840 ts_init: Option<UnixNanos>,
841) -> anyhow::Result<FuturesContract> {
842 let currency = parse_currency_or_usd_default(msg.currency());
843 let exchange = Ustr::from(msg.exchange()?);
844 let underlying = Ustr::from(msg.asset()?);
845 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
846 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
847 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
848 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
849 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
851
852 FuturesContract::new_checked(
853 instrument_id,
854 instrument_id.symbol,
855 asset_class.unwrap_or(AssetClass::Commodity),
856 Some(exchange),
857 underlying,
858 msg.activation.into(),
859 msg.expiration.into(),
860 currency,
861 price_increment.precision,
862 price_increment,
863 multiplier,
864 lot_size,
865 None, None, None, None, None, None, None, None, ts_event,
874 ts_init,
875 )
876}
877
878pub fn decode_futures_spread(
882 msg: &dbn::InstrumentDefMsg,
883 instrument_id: InstrumentId,
884 ts_init: Option<UnixNanos>,
885) -> anyhow::Result<FuturesSpread> {
886 let exchange = Ustr::from(msg.exchange()?);
887 let underlying = Ustr::from(msg.asset()?);
888 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
889 let strategy_type = Ustr::from(msg.secsubtype()?);
890 let currency = parse_currency_or_usd_default(msg.currency());
891 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
892 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
893 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
894 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
896
897 FuturesSpread::new_checked(
898 instrument_id,
899 instrument_id.symbol,
900 asset_class.unwrap_or(AssetClass::Commodity),
901 Some(exchange),
902 underlying,
903 strategy_type,
904 msg.activation.into(),
905 msg.expiration.into(),
906 currency,
907 price_increment.precision,
908 price_increment,
909 multiplier,
910 lot_size,
911 None, None, None, None, None, None, None, None, ts_event,
920 ts_init,
921 )
922}
923
924pub fn decode_option_contract(
928 msg: &dbn::InstrumentDefMsg,
929 instrument_id: InstrumentId,
930 ts_init: Option<UnixNanos>,
931) -> anyhow::Result<OptionContract> {
932 let currency = parse_currency_or_usd_default(msg.currency());
933 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
934 let exchange = Ustr::from(msg.exchange()?);
935 let underlying = Ustr::from(msg.underlying()?);
936 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
937 Some(AssetClass::Equity)
938 } else {
939 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
940 asset_class
941 };
942 let option_kind = parse_option_kind(msg.instrument_class)?;
943 let strike_price = Price::from_raw(
944 decode_raw_price_i64(msg.strike_price),
945 strike_price_currency.precision,
946 );
947 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
948 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
949 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
950 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
952
953 OptionContract::new_checked(
954 instrument_id,
955 instrument_id.symbol,
956 asset_class_opt.unwrap_or(AssetClass::Commodity),
957 Some(exchange),
958 underlying,
959 option_kind,
960 strike_price,
961 currency,
962 msg.activation.into(),
963 msg.expiration.into(),
964 price_increment.precision,
965 price_increment,
966 multiplier,
967 lot_size,
968 None, None, None, None, None, None, None, None, ts_event,
977 ts_init,
978 )
979}
980
981pub fn decode_option_spread(
985 msg: &dbn::InstrumentDefMsg,
986 instrument_id: InstrumentId,
987 ts_init: Option<UnixNanos>,
988) -> anyhow::Result<OptionSpread> {
989 let exchange = Ustr::from(msg.exchange()?);
990 let underlying = Ustr::from(msg.underlying()?);
991 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
992 Some(AssetClass::Equity)
993 } else {
994 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
995 asset_class
996 };
997 let strategy_type = Ustr::from(msg.secsubtype()?);
998 let currency = parse_currency_or_usd_default(msg.currency());
999 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1000 let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1001 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1002 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1004
1005 OptionSpread::new_checked(
1006 instrument_id,
1007 instrument_id.symbol,
1008 asset_class_opt.unwrap_or(AssetClass::Commodity),
1009 Some(exchange),
1010 underlying,
1011 strategy_type,
1012 msg.activation.into(),
1013 msg.expiration.into(),
1014 currency,
1015 price_increment.precision,
1016 price_increment,
1017 multiplier,
1018 lot_size,
1019 None, None, None, None, None, None, None, None, ts_event,
1028 ts_init,
1029 )
1030}
1031
1032pub fn decode_imbalance_msg(
1036 msg: &dbn::ImbalanceMsg,
1037 instrument_id: InstrumentId,
1038 price_precision: u8,
1039 ts_init: Option<UnixNanos>,
1040) -> anyhow::Result<DatabentoImbalance> {
1041 let ts_event = msg.ts_recv.into();
1042 let ts_init = ts_init.unwrap_or(ts_event);
1043
1044 DatabentoImbalance::new(
1045 instrument_id,
1046 Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1047 Price::from_raw(
1048 decode_raw_price_i64(msg.cont_book_clr_price),
1049 price_precision,
1050 ),
1051 Price::from_raw(
1052 decode_raw_price_i64(msg.auct_interest_clr_price),
1053 price_precision,
1054 ),
1055 Quantity::new(f64::from(msg.paired_qty), 0),
1056 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1057 parse_order_side(msg.side),
1058 msg.significant_imbalance as c_char,
1059 msg.hd.ts_event.into(),
1060 ts_event,
1061 ts_init,
1062 )
1063}
1064
1065pub fn decode_statistics_msg(
1073 msg: &dbn::StatMsg,
1074 instrument_id: InstrumentId,
1075 price_precision: u8,
1076 ts_init: Option<UnixNanos>,
1077) -> anyhow::Result<DatabentoStatistics> {
1078 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1079 .expect("Invalid value for `stat_type`");
1080 let update_action = DatabentoStatisticUpdateAction::from_u8(msg.update_action)
1081 .expect("Invalid value for `update_action`");
1082 let ts_event = msg.ts_recv.into();
1083 let ts_init = ts_init.unwrap_or(ts_event);
1084
1085 DatabentoStatistics::new(
1086 instrument_id,
1087 stat_type,
1088 update_action,
1089 decode_optional_price(msg.price, price_precision),
1090 decode_optional_quantity(msg.quantity),
1091 msg.channel_id,
1092 msg.stat_flags,
1093 msg.sequence,
1094 msg.ts_ref.into(),
1095 msg.ts_in_delta,
1096 msg.hd.ts_event.into(),
1097 ts_event,
1098 ts_init,
1099 )
1100}
1101
1102#[cfg(test)]
1106mod tests {
1107 use std::path::{Path, PathBuf};
1108
1109 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1110 use fallible_streaming_iterator::FallibleStreamingIterator;
1111 use nautilus_model::instruments::Instrument;
1112 use rstest::*;
1113
1114 use super::*;
1115
1116 fn test_data_path() -> PathBuf {
1117 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1118 }
1119
1120 #[rstest]
1121 #[case('Y' as c_char, Some(true))]
1122 #[case('N' as c_char, Some(false))]
1123 #[case('X' as c_char, None)]
1124 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1125 assert_eq!(parse_optional_bool(input), expected);
1126 }
1127
1128 #[rstest]
1129 #[case('A' as c_char, OrderSide::Sell)]
1130 #[case('B' as c_char, OrderSide::Buy)]
1131 #[case('X' as c_char, OrderSide::NoOrderSide)]
1132 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1133 assert_eq!(parse_order_side(input), expected);
1134 }
1135
1136 #[rstest]
1137 #[case('A' as c_char, AggressorSide::Seller)]
1138 #[case('B' as c_char, AggressorSide::Buyer)]
1139 #[case('X' as c_char, AggressorSide::NoAggressor)]
1140 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1141 assert_eq!(parse_aggressor_side(input), expected);
1142 }
1143
1144 #[rstest]
1145 #[case('A' as c_char, Ok(BookAction::Add))]
1146 #[case('C' as c_char, Ok(BookAction::Delete))]
1147 #[case('F' as c_char, Ok(BookAction::Update))]
1148 #[case('M' as c_char, Ok(BookAction::Update))]
1149 #[case('R' as c_char, Ok(BookAction::Clear))]
1150 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1151 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1152 match parse_book_action(input) {
1153 Ok(action) => assert_eq!(Ok(action), expected),
1154 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1155 }
1156 }
1157
1158 #[rstest]
1159 #[case('C' as c_char, Ok(OptionKind::Call))]
1160 #[case('P' as c_char, Ok(OptionKind::Put))]
1161 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1162 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1163 match parse_option_kind(input) {
1164 Ok(kind) => assert_eq!(Ok(kind), expected),
1165 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1166 }
1167 }
1168
1169 #[rstest]
1170 #[case(Ok("USD"), Currency::USD())]
1171 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1172 #[case(Ok(""), Currency::USD())]
1173 #[case(Err("Error"), Currency::USD())]
1174 fn test_parse_currency_or_usd_default(
1175 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1177 ) {
1178 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1179 assert_eq!(actual, expected);
1180 }
1181
1182 #[rstest]
1183 #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1184 #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1185 #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1186 #[case("XXX", Ok((None, None)))]
1187 #[case("D", Err("Value string is too short"))]
1188 fn test_parse_cfi_iso10926(
1189 #[case] input: &str,
1190 #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1191 ) {
1192 match parse_cfi_iso10926(input) {
1193 Ok(result) => assert_eq!(Ok(result), expected),
1194 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1195 }
1196 }
1197
1198 #[rstest]
1199 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1203 let actual = decode_price_increment(value, precision);
1204 assert_eq!(actual, expected);
1205 }
1206
1207 #[rstest]
1208 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] fn test_decode_optional_price(
1212 #[case] value: i64,
1213 #[case] precision: u8,
1214 #[case] expected: Option<Price>,
1215 ) {
1216 let actual = decode_optional_price(value, precision);
1217 assert_eq!(actual, expected);
1218 }
1219
1220 #[rstest]
1221 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1225 let actual = decode_optional_quantity(value);
1226 assert_eq!(actual, expected);
1227 }
1228
1229 #[rstest]
1230 fn test_decode_mbo_msg() {
1231 let path = test_data_path().join("test_data.mbo.dbn.zst");
1232 let mut dbn_stream = Decoder::from_zstd_file(path)
1233 .unwrap()
1234 .decode_stream::<dbn::MboMsg>();
1235 let msg = dbn_stream.next().unwrap().unwrap();
1236
1237 let instrument_id = InstrumentId::from("ESM4.GLBX");
1238 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1239 let delta = delta.unwrap();
1240
1241 assert_eq!(delta.instrument_id, instrument_id);
1242 assert_eq!(delta.action, BookAction::Delete);
1243 assert_eq!(delta.order.side, OrderSide::Sell);
1244 assert_eq!(delta.order.price, Price::from("3722.75"));
1245 assert_eq!(delta.order.size, Quantity::from("1"));
1246 assert_eq!(delta.order.order_id, 647_784_973_705);
1247 assert_eq!(delta.flags, 128);
1248 assert_eq!(delta.sequence, 1_170_352);
1249 assert_eq!(delta.ts_event, msg.ts_recv);
1250 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1251 assert_eq!(delta.ts_init, 0);
1252 }
1253
1254 #[rstest]
1255 fn test_decode_mbp1_msg() {
1256 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1257 let mut dbn_stream = Decoder::from_zstd_file(path)
1258 .unwrap()
1259 .decode_stream::<dbn::Mbp1Msg>();
1260 let msg = dbn_stream.next().unwrap().unwrap();
1261
1262 let instrument_id = InstrumentId::from("ESM4.GLBX");
1263 let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1264
1265 assert_eq!(quote.instrument_id, instrument_id);
1266 assert_eq!(quote.bid_price, Price::from("3720.25"));
1267 assert_eq!(quote.ask_price, Price::from("3720.50"));
1268 assert_eq!(quote.bid_size, Quantity::from("24"));
1269 assert_eq!(quote.ask_size, Quantity::from("11"));
1270 assert_eq!(quote.ts_event, msg.ts_recv);
1271 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1272 assert_eq!(quote.ts_init, 0);
1273 }
1274
1275 #[rstest]
1276 fn test_decode_bbo_1s_msg() {
1277 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1278 let mut dbn_stream = Decoder::from_zstd_file(path)
1279 .unwrap()
1280 .decode_stream::<dbn::BboMsg>();
1281 let msg = dbn_stream.next().unwrap().unwrap();
1282
1283 let instrument_id = InstrumentId::from("ESM4.GLBX");
1284 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1285
1286 assert_eq!(quote.instrument_id, instrument_id);
1287 assert_eq!(quote.bid_price, Price::from("5199.50"));
1288 assert_eq!(quote.ask_price, Price::from("5199.75"));
1289 assert_eq!(quote.bid_size, Quantity::from("26"));
1290 assert_eq!(quote.ask_size, Quantity::from("23"));
1291 assert_eq!(quote.ts_event, msg.ts_recv);
1292 assert_eq!(quote.ts_event, 1715248801000000000);
1293 assert_eq!(quote.ts_init, 0);
1294 }
1295
1296 #[rstest]
1297 fn test_decode_bbo_1m_msg() {
1298 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1299 let mut dbn_stream = Decoder::from_zstd_file(path)
1300 .unwrap()
1301 .decode_stream::<dbn::BboMsg>();
1302 let msg = dbn_stream.next().unwrap().unwrap();
1303
1304 let instrument_id = InstrumentId::from("ESM4.GLBX");
1305 let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1306
1307 assert_eq!(quote.instrument_id, instrument_id);
1308 assert_eq!(quote.bid_price, Price::from("5199.50"));
1309 assert_eq!(quote.ask_price, Price::from("5199.75"));
1310 assert_eq!(quote.bid_size, Quantity::from("33"));
1311 assert_eq!(quote.ask_size, Quantity::from("17"));
1312 assert_eq!(quote.ts_event, msg.ts_recv);
1313 assert_eq!(quote.ts_event, 1715248800000000000);
1314 assert_eq!(quote.ts_init, 0);
1315 }
1316
1317 #[rstest]
1318 fn test_decode_mbp10_msg() {
1319 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1320 let mut dbn_stream = Decoder::from_zstd_file(path)
1321 .unwrap()
1322 .decode_stream::<dbn::Mbp10Msg>();
1323 let msg = dbn_stream.next().unwrap().unwrap();
1324
1325 let instrument_id = InstrumentId::from("ESM4.GLBX");
1326 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1327
1328 assert_eq!(depth10.instrument_id, instrument_id);
1329 assert_eq!(depth10.bids.len(), 10);
1330 assert_eq!(depth10.asks.len(), 10);
1331 assert_eq!(depth10.bid_counts.len(), 10);
1332 assert_eq!(depth10.ask_counts.len(), 10);
1333 assert_eq!(depth10.flags, 128);
1334 assert_eq!(depth10.sequence, 1_170_352);
1335 assert_eq!(depth10.ts_event, msg.ts_recv);
1336 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1337 assert_eq!(depth10.ts_init, 0);
1338 }
1339
1340 #[rstest]
1341 fn test_decode_trade_msg() {
1342 let path = test_data_path().join("test_data.trades.dbn.zst");
1343 let mut dbn_stream = Decoder::from_zstd_file(path)
1344 .unwrap()
1345 .decode_stream::<dbn::TradeMsg>();
1346 let msg = dbn_stream.next().unwrap().unwrap();
1347
1348 let instrument_id = InstrumentId::from("ESM4.GLBX");
1349 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1350
1351 assert_eq!(trade.instrument_id, instrument_id);
1352 assert_eq!(trade.price, Price::from("3720.25"));
1353 assert_eq!(trade.size, Quantity::from("5"));
1354 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1355 assert_eq!(trade.trade_id.to_string(), "1170380");
1356 assert_eq!(trade.ts_event, msg.ts_recv);
1357 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1358 assert_eq!(trade.ts_init, 0);
1359 }
1360
1361 #[rstest]
1362 fn test_decode_tbbo_msg() {
1363 let path = test_data_path().join("test_data.tbbo.dbn.zst");
1364 let mut dbn_stream = Decoder::from_zstd_file(path)
1365 .unwrap()
1366 .decode_stream::<dbn::Mbp1Msg>();
1367 let msg = dbn_stream.next().unwrap().unwrap();
1368
1369 let instrument_id = InstrumentId::from("ESM4.GLBX");
1370 let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1371
1372 assert_eq!(quote.instrument_id, instrument_id);
1373 assert_eq!(quote.bid_price, Price::from("3720.25"));
1374 assert_eq!(quote.ask_price, Price::from("3720.50"));
1375 assert_eq!(quote.bid_size, Quantity::from("26"));
1376 assert_eq!(quote.ask_size, Quantity::from("7"));
1377 assert_eq!(quote.ts_event, msg.ts_recv);
1378 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1379 assert_eq!(quote.ts_init, 0);
1380
1381 assert_eq!(trade.instrument_id, instrument_id);
1382 assert_eq!(trade.price, Price::from("3720.25"));
1383 assert_eq!(trade.size, Quantity::from("5"));
1384 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1385 assert_eq!(trade.trade_id.to_string(), "1170380");
1386 assert_eq!(trade.ts_event, msg.ts_recv);
1387 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1388 assert_eq!(trade.ts_init, 0);
1389 }
1390
1391 #[ignore = "Requires updated test data"]
1392 #[rstest]
1393 fn test_decode_ohlcv_msg() {
1394 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1395 let mut dbn_stream = Decoder::from_zstd_file(path)
1396 .unwrap()
1397 .decode_stream::<dbn::OhlcvMsg>();
1398 let msg = dbn_stream.next().unwrap().unwrap();
1399
1400 let instrument_id = InstrumentId::from("ESM4.GLBX");
1401 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1402
1403 assert_eq!(
1404 bar.bar_type,
1405 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1406 );
1407 assert_eq!(bar.open, Price::from("3720.25"));
1408 assert_eq!(bar.high, Price::from("3720.50"));
1409 assert_eq!(bar.low, Price::from("3720.25"));
1410 assert_eq!(bar.close, Price::from("3720.50"));
1411 assert_eq!(bar.ts_event, 1_609_160_400_000_000_000);
1412 assert_eq!(bar.ts_init, 1_609_160_401_000_000_000); }
1414
1415 #[rstest]
1416 fn test_decode_definition_msg() {
1417 let path = test_data_path().join("test_data.definition.dbn.zst");
1418 let mut dbn_stream = Decoder::from_zstd_file(path)
1419 .unwrap()
1420 .decode_stream::<dbn::InstrumentDefMsg>();
1421 let msg = dbn_stream.next().unwrap().unwrap();
1422
1423 let instrument_id = InstrumentId::from("ESM4.GLBX");
1424 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1425
1426 assert!(result.is_ok());
1427 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1428 }
1429
1430 #[rstest]
1431 fn test_decode_status_msg() {
1432 let path = test_data_path().join("test_data.status.dbn.zst");
1433 let mut dbn_stream = Decoder::from_zstd_file(path)
1434 .unwrap()
1435 .decode_stream::<dbn::StatusMsg>();
1436 let msg = dbn_stream.next().unwrap().unwrap();
1437
1438 let instrument_id = InstrumentId::from("ESM4.GLBX");
1439 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1440
1441 assert_eq!(status.instrument_id, instrument_id);
1442 assert_eq!(status.action, MarketStatusAction::Trading);
1443 assert_eq!(status.ts_event, msg.hd.ts_event);
1444 assert_eq!(status.ts_init, 0);
1445 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1446 assert_eq!(status.trading_event, None);
1447 assert_eq!(status.is_trading, Some(true));
1448 assert_eq!(status.is_quoting, Some(true));
1449 assert_eq!(status.is_short_sell_restricted, None);
1450 }
1451
1452 #[rstest]
1453 fn test_decode_imbalance_msg() {
1454 let path = test_data_path().join("test_data.imbalance.dbn.zst");
1455 let mut dbn_stream = Decoder::from_zstd_file(path)
1456 .unwrap()
1457 .decode_stream::<dbn::ImbalanceMsg>();
1458 let msg = dbn_stream.next().unwrap().unwrap();
1459
1460 let instrument_id = InstrumentId::from("ESM4.GLBX");
1461 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1462
1463 assert_eq!(imbalance.instrument_id, instrument_id);
1464 assert_eq!(imbalance.ref_price, Price::from("229.43"));
1465 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1466 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1467 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1468 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1469 assert_eq!(imbalance.side, OrderSide::Buy);
1470 assert_eq!(imbalance.significant_imbalance, 126);
1471 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1472 assert_eq!(imbalance.ts_recv, msg.ts_recv);
1473 assert_eq!(imbalance.ts_init, 0);
1474 }
1475
1476 #[rstest]
1477 fn test_decode_statistics_msg() {
1478 let path = test_data_path().join("test_data.statistics.dbn.zst");
1479 let mut dbn_stream = Decoder::from_zstd_file(path)
1480 .unwrap()
1481 .decode_stream::<dbn::StatMsg>();
1482 let msg = dbn_stream.next().unwrap().unwrap();
1483
1484 let instrument_id = InstrumentId::from("ESM4.GLBX");
1485 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1486
1487 assert_eq!(statistics.instrument_id, instrument_id);
1488 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1489 assert_eq!(
1490 statistics.update_action,
1491 DatabentoStatisticUpdateAction::Added
1492 );
1493 assert_eq!(statistics.price, Some(Price::from("100.00")));
1494 assert_eq!(statistics.quantity, None);
1495 assert_eq!(statistics.channel_id, 13);
1496 assert_eq!(statistics.stat_flags, 255);
1497 assert_eq!(statistics.sequence, 2);
1498 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1499 assert_eq!(statistics.ts_in_delta, 26961);
1500 assert_eq!(statistics.ts_event, msg.hd.ts_event);
1501 assert_eq!(statistics.ts_recv, msg.ts_recv);
1502 assert_eq!(statistics.ts_init, 0);
1503 }
1504}