nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp, ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24    },
25    enums::{
26        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28    },
29    identifiers::{InstrumentId, TradeId},
30    instruments::{
31        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32    },
33    types::{Currency, Price, Quantity, price::decode_raw_price_i64},
34};
35use ustr::Ustr;
36
37use super::{
38    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
39    types::{DatabentoImbalance, DatabentoStatistics},
40};
41
42const DATABENTO_FIXED_SCALAR: f64 = 1_000_000_000.0;
43
44// SAFETY: Known valid value
45const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
46
47const BAR_SPEC_1S: BarSpecification = BarSpecification {
48    step: STEP_ONE,
49    aggregation: BarAggregation::Second,
50    price_type: PriceType::Last,
51};
52const BAR_SPEC_1M: BarSpecification = BarSpecification {
53    step: STEP_ONE,
54    aggregation: BarAggregation::Minute,
55    price_type: PriceType::Last,
56};
57const BAR_SPEC_1H: BarSpecification = BarSpecification {
58    step: STEP_ONE,
59    aggregation: BarAggregation::Hour,
60    price_type: PriceType::Last,
61};
62const BAR_SPEC_1D: BarSpecification = BarSpecification {
63    step: STEP_ONE,
64    aggregation: BarAggregation::Day,
65    price_type: PriceType::Last,
66};
67
68const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
69const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
70const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
71const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
72
73#[must_use]
74pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
75    match c as u8 as char {
76        'Y' => Some(true),
77        'N' => Some(false),
78        _ => None,
79    }
80}
81
82#[must_use]
83pub const fn parse_order_side(c: c_char) -> OrderSide {
84    match c as u8 as char {
85        'A' => OrderSide::Sell,
86        'B' => OrderSide::Buy,
87        _ => OrderSide::NoOrderSide,
88    }
89}
90
91#[must_use]
92pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
93    match c as u8 as char {
94        'A' => AggressorSide::Seller,
95        'B' => AggressorSide::Buyer,
96        _ => AggressorSide::NoAggressor,
97    }
98}
99
100/// # Errors
101///
102/// Returns an error if `c` is not a valid `BookAction` character.
103pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
104    match c as u8 as char {
105        'A' => Ok(BookAction::Add),
106        'C' => Ok(BookAction::Delete),
107        'F' => Ok(BookAction::Update),
108        'M' => Ok(BookAction::Update),
109        'R' => Ok(BookAction::Clear),
110        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
111    }
112}
113
114/// # Errors
115///
116/// Returns an error if `c` is not a valid `OptionKind` character.
117pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
118    match c as u8 as char {
119        'C' => Ok(OptionKind::Call),
120        'P' => Ok(OptionKind::Put),
121        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
122    }
123}
124
125fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
126    match value {
127        Ok(value) if !value.is_empty() => {
128            Currency::try_from_str(value).unwrap_or_else(Currency::USD)
129        }
130        Ok(_) => Currency::USD(),
131        Err(e) => {
132            log::error!("Error parsing currency: {e}");
133            Currency::USD()
134        }
135    }
136}
137
138/// # Errors
139///
140/// Returns an error if `value` has fewer than 3 characters.
141pub fn parse_cfi_iso10926(
142    value: &str,
143) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
144    let chars: Vec<char> = value.chars().collect();
145    if chars.len() < 3 {
146        anyhow::bail!("Value string is too short");
147    }
148
149    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
150    let cfi_category = chars[0];
151    let cfi_group = chars[1];
152    let cfi_attribute1 = chars[2];
153    // let cfi_attribute2 = value[3];
154    // let cfi_attribute3 = value[4];
155    // let cfi_attribute4 = value[5];
156
157    let mut asset_class = match cfi_category {
158        'D' => Some(AssetClass::Debt),
159        'E' => Some(AssetClass::Equity),
160        'S' => None,
161        _ => None,
162    };
163
164    let instrument_class = match cfi_group {
165        'I' => Some(InstrumentClass::Future),
166        _ => None,
167    };
168
169    if cfi_attribute1 == 'I' {
170        asset_class = Some(AssetClass::Index);
171    }
172
173    Ok((asset_class, instrument_class))
174}
175
176// https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons
177/// # Errors
178///
179/// Returns an error if `value` is an invalid status reason code.
180pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
181    let value_str = match value {
182        0 => return Ok(None),
183        1 => "Scheduled",
184        2 => "Surveillance intervention",
185        3 => "Market event",
186        4 => "Instrument activation",
187        5 => "Instrument expiration",
188        6 => "Recovery in process",
189        10 => "Regulatory",
190        11 => "Administrative",
191        12 => "Non-compliance",
192        13 => "Filings not current",
193        14 => "SEC trading suspension",
194        15 => "New issue",
195        16 => "Issue available",
196        17 => "Issues reviewed",
197        18 => "Filing requirements satisfied",
198        30 => "News pending",
199        31 => "News released",
200        32 => "News and resumption times",
201        33 => "News not forthcoming",
202        40 => "Order imbalance",
203        50 => "LULD pause",
204        60 => "Operational",
205        70 => "Additional information requested",
206        80 => "Merger effective",
207        90 => "ETF",
208        100 => "Corporate action",
209        110 => "New Security offering",
210        120 => "Market wide halt level 1",
211        121 => "Market wide halt level 2",
212        122 => "Market wide halt level 3",
213        123 => "Market wide halt carryover",
214        124 => "Market wide halt resumption",
215        130 => "Quotation not available",
216        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
217    };
218
219    Ok(Some(Ustr::from(value_str)))
220}
221
222/// # Errors
223///
224/// Returns an error if `value` is an invalid status trading event code.
225pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
226    let value_str = match value {
227        0 => return Ok(None),
228        1 => "No cancel",
229        2 => "Change trading session",
230        3 => "Implied matching on",
231        4 => "Implied matching off",
232        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
233    };
234
235    Ok(Some(Ustr::from(value_str)))
236}
237
238/// Decodes a price from the given value, expressed in units of 1e-9.
239#[must_use]
240pub fn decode_price(value: i64, precision: u8) -> Price {
241    Price::from_raw(decode_raw_price_i64(value), precision)
242}
243
244/// Decodes a quantity from the given value, expressed in standard whole-number units.
245#[must_use]
246pub fn decode_quantity(value: u64) -> Quantity {
247    Quantity::from(value)
248}
249
250/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
251#[must_use]
252pub fn decode_price_increment(value: i64, precision: u8) -> Price {
253    match value {
254        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
255        _ => decode_price(value, precision),
256    }
257}
258
259/// Decodes a price from the given optional value, expressed in units of 1e-9.
260#[must_use]
261pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
262    match value {
263        i64::MAX => None,
264        _ => Some(decode_price(value, precision)),
265    }
266}
267
268/// Decodes a quantity from the given optional value, expressed in standard whole-number units.
269#[must_use]
270pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
271    match value {
272        i64::MAX => None,
273        _ => Some(Quantity::from(value)),
274    }
275}
276
277/// Decodes a multiplier from the given value, expressed in units of 1e-9.
278#[must_use]
279pub fn decode_multiplier(value: i64) -> Quantity {
280    match value {
281        0 | i64::MAX => Quantity::from(1),
282        _ => Quantity::from(format!("{}", value as f64 / DATABENTO_FIXED_SCALAR)),
283    }
284}
285
286/// Decodes a lot size from the given value, expressed in standard whole-number units.
287#[must_use]
288pub fn decode_lot_size(value: i32) -> Quantity {
289    match value {
290        0 | i32::MAX => Quantity::from(1),
291        value => Quantity::from(value),
292    }
293}
294
295#[must_use]
296fn is_trade_msg(order_side: OrderSide, action: c_char) -> bool {
297    order_side == OrderSide::NoOrderSide || action as u8 as char == 'T'
298}
299
300/// # Errors
301///
302/// Returns an error if decoding the MBO message fails.
303pub fn decode_mbo_msg(
304    msg: &dbn::MboMsg,
305    instrument_id: InstrumentId,
306    price_precision: u8,
307    ts_init: Option<UnixNanos>,
308    include_trades: bool,
309) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
310    let side = parse_order_side(msg.side);
311    if is_trade_msg(side, msg.action) {
312        if include_trades {
313            let ts_event = msg.ts_recv.into();
314            let ts_init = ts_init.unwrap_or(ts_event);
315
316            let trade = TradeTick::new(
317                instrument_id,
318                Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
319                Quantity::from(msg.size),
320                parse_aggressor_side(msg.side),
321                TradeId::new(itoa::Buffer::new().format(msg.sequence)),
322                ts_event,
323                ts_init,
324            );
325            return Ok((None, Some(trade)));
326        }
327
328        return Ok((None, None));
329    }
330
331    let order = BookOrder::new(
332        side,
333        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
334        Quantity::from(msg.size),
335        msg.order_id,
336    );
337    let ts_event = msg.ts_recv.into();
338    let ts_init = ts_init.unwrap_or(ts_event);
339
340    let delta = OrderBookDelta::new(
341        instrument_id,
342        parse_book_action(msg.action)?,
343        order,
344        msg.flags.raw(),
345        msg.sequence.into(),
346        ts_event,
347        ts_init,
348    );
349
350    Ok((Some(delta), None))
351}
352
353/// # Errors
354///
355/// Returns an error if decoding the Trade message fails.
356pub fn decode_trade_msg(
357    msg: &dbn::TradeMsg,
358    instrument_id: InstrumentId,
359    price_precision: u8,
360    ts_init: Option<UnixNanos>,
361) -> anyhow::Result<TradeTick> {
362    let ts_event = msg.ts_recv.into();
363    let ts_init = ts_init.unwrap_or(ts_event);
364
365    let trade = TradeTick::new(
366        instrument_id,
367        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
368        Quantity::from(msg.size),
369        parse_aggressor_side(msg.side),
370        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
371        ts_event,
372        ts_init,
373    );
374
375    Ok(trade)
376}
377
378/// # Errors
379///
380/// Returns an error if decoding the TBBO message fails.
381pub fn decode_tbbo_msg(
382    msg: &dbn::TbboMsg,
383    instrument_id: InstrumentId,
384    price_precision: u8,
385    ts_init: Option<UnixNanos>,
386) -> anyhow::Result<(QuoteTick, TradeTick)> {
387    let top_level = &msg.levels[0];
388    let ts_event = msg.ts_recv.into();
389    let ts_init = ts_init.unwrap_or(ts_event);
390
391    let quote = QuoteTick::new(
392        instrument_id,
393        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
394        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
395        Quantity::from(top_level.bid_sz),
396        Quantity::from(top_level.ask_sz),
397        ts_event,
398        ts_init,
399    );
400
401    let trade = TradeTick::new(
402        instrument_id,
403        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
404        Quantity::from(msg.size),
405        parse_aggressor_side(msg.side),
406        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
407        ts_event,
408        ts_init,
409    );
410
411    Ok((quote, trade))
412}
413
414/// # Errors
415///
416/// Returns an error if decoding the MBP1 message fails.
417pub fn decode_mbp1_msg(
418    msg: &dbn::Mbp1Msg,
419    instrument_id: InstrumentId,
420    price_precision: u8,
421    ts_init: Option<UnixNanos>,
422    include_trades: bool,
423) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
424    let top_level = &msg.levels[0];
425    let ts_event = msg.ts_recv.into();
426    let ts_init = ts_init.unwrap_or(ts_event);
427
428    let quote = QuoteTick::new(
429        instrument_id,
430        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
431        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
432        Quantity::from(top_level.bid_sz),
433        Quantity::from(top_level.ask_sz),
434        ts_event,
435        ts_init,
436    );
437
438    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
439        Some(TradeTick::new(
440            instrument_id,
441            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
442            Quantity::from(msg.size),
443            parse_aggressor_side(msg.side),
444            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
445            ts_event,
446            ts_init,
447        ))
448    } else {
449        None
450    };
451
452    Ok((quote, maybe_trade))
453}
454
455/// # Errors
456///
457/// Returns an error if decoding the BBO message fails.
458pub fn decode_bbo_msg(
459    msg: &dbn::BboMsg,
460    instrument_id: InstrumentId,
461    price_precision: u8,
462    ts_init: Option<UnixNanos>,
463) -> anyhow::Result<QuoteTick> {
464    let top_level = &msg.levels[0];
465    let ts_event = msg.ts_recv.into();
466    let ts_init = ts_init.unwrap_or(ts_event);
467
468    let quote = QuoteTick::new(
469        instrument_id,
470        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
471        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
472        Quantity::from(top_level.bid_sz),
473        Quantity::from(top_level.ask_sz),
474        ts_event,
475        ts_init,
476    );
477
478    Ok(quote)
479}
480
481/// # Errors
482///
483/// Returns an error if decoding the MBP10 message fails.
484///
485/// # Panics
486///
487/// Panics if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
488pub fn decode_mbp10_msg(
489    msg: &dbn::Mbp10Msg,
490    instrument_id: InstrumentId,
491    price_precision: u8,
492    ts_init: Option<UnixNanos>,
493) -> anyhow::Result<OrderBookDepth10> {
494    let mut bids = Vec::with_capacity(DEPTH10_LEN);
495    let mut asks = Vec::with_capacity(DEPTH10_LEN);
496    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
497    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
498
499    for level in &msg.levels {
500        let bid_order = BookOrder::new(
501            OrderSide::Buy,
502            Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
503            Quantity::from(level.bid_sz),
504            0,
505        );
506
507        let ask_order = BookOrder::new(
508            OrderSide::Sell,
509            Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
510            Quantity::from(level.ask_sz),
511            0,
512        );
513
514        bids.push(bid_order);
515        asks.push(ask_order);
516        bid_counts.push(level.bid_ct);
517        ask_counts.push(level.ask_ct);
518    }
519
520    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().expect("`bids` length != 10");
521    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().expect("`asks` length != 10");
522    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().expect("`bid_counts` length != 10");
523    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().expect("`ask_counts` length != 10");
524    let ts_event = msg.ts_recv.into();
525    let ts_init = ts_init.unwrap_or(ts_event);
526
527    let depth = OrderBookDepth10::new(
528        instrument_id,
529        bids,
530        asks,
531        bid_counts,
532        ask_counts,
533        msg.flags.raw(),
534        msg.sequence.into(),
535        ts_event,
536        ts_init,
537    );
538
539    Ok(depth)
540}
541
542/// # Errors
543///
544/// Returns an error if `rtype` is not a supported bar aggregation.
545pub fn decode_bar_type(
546    msg: &dbn::OhlcvMsg,
547    instrument_id: InstrumentId,
548) -> anyhow::Result<BarType> {
549    let bar_type = match msg.hd.rtype {
550        32 => {
551            // ohlcv-1s
552            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
553        }
554        33 => {
555            //  ohlcv-1m
556            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
557        }
558        34 => {
559            // ohlcv-1h
560            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
561        }
562        35 => {
563            // ohlcv-1d
564            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
565        }
566        _ => anyhow::bail!(
567            "`rtype` is not a supported bar aggregation, was {}",
568            msg.hd.rtype
569        ),
570    };
571
572    Ok(bar_type)
573}
574
575/// # Errors
576///
577/// Returns an error if `rtype` is not a supported bar aggregation.
578pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
579    let adjustment = match msg.hd.rtype {
580        32 => {
581            // ohlcv-1s
582            BAR_CLOSE_ADJUSTMENT_1S
583        }
584        33 => {
585            //  ohlcv-1m
586            BAR_CLOSE_ADJUSTMENT_1M
587        }
588        34 => {
589            //  ohlcv-1h
590            BAR_CLOSE_ADJUSTMENT_1H
591        }
592        35 => {
593            // ohlcv-1d
594            BAR_CLOSE_ADJUSTMENT_1D
595        }
596        _ => anyhow::bail!(
597            "`rtype` is not a supported bar aggregation, was {}",
598            msg.hd.rtype
599        ),
600    };
601
602    Ok(adjustment.into())
603}
604
605/// # Errors
606///
607/// Returns an error if decoding the OHLCV message fails.
608pub fn decode_ohlcv_msg(
609    msg: &dbn::OhlcvMsg,
610    instrument_id: InstrumentId,
611    price_precision: u8,
612    ts_init: Option<UnixNanos>,
613    timestamp_on_close: bool,
614) -> anyhow::Result<Bar> {
615    let bar_type = decode_bar_type(msg, instrument_id)?;
616    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
617
618    let ts_event_raw = msg.hd.ts_event.into();
619    let ts_init_raw = ts_init.unwrap_or(ts_event_raw);
620
621    let (ts_event, ts_init) = if timestamp_on_close {
622        // Both ts_event and ts_init are set to close time
623        let ts_close = cmp::max(ts_init_raw, ts_event_raw) + ts_event_adjustment;
624        (ts_close, ts_close)
625    } else {
626        // Both ts_event and ts_init are set to open time
627        (ts_event_raw, ts_event_raw)
628    };
629
630    let bar = Bar::new(
631        bar_type,
632        Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
633        Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
634        Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
635        Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
636        Quantity::from(msg.volume),
637        ts_event,
638        ts_init,
639    );
640
641    Ok(bar)
642}
643
644/// # Errors
645///
646/// Returns an error if decoding the status message fails.
647///
648/// # Panics
649///
650/// Panics if `msg.action` is not a valid `MarketStatusAction`.
651pub fn decode_status_msg(
652    msg: &dbn::StatusMsg,
653    instrument_id: InstrumentId,
654    ts_init: Option<UnixNanos>,
655) -> anyhow::Result<InstrumentStatus> {
656    let ts_event = msg.hd.ts_event.into();
657    let ts_init = ts_init.unwrap_or(ts_event);
658
659    let status = InstrumentStatus::new(
660        instrument_id,
661        MarketStatusAction::from_u16(msg.action).expect("Invalid `MarketStatusAction`"),
662        ts_event,
663        ts_init,
664        parse_status_reason(msg.reason)?,
665        parse_status_trading_event(msg.trading_event)?,
666        parse_optional_bool(msg.is_trading),
667        parse_optional_bool(msg.is_quoting),
668        parse_optional_bool(msg.is_short_sell_restricted),
669    );
670
671    Ok(status)
672}
673
674/// # Errors
675///
676/// Returns an error if decoding the record type fails or encounters unsupported message.
677pub fn decode_record(
678    record: &dbn::RecordRef,
679    instrument_id: InstrumentId,
680    price_precision: u8,
681    ts_init: Option<UnixNanos>,
682    include_trades: bool,
683    bars_timestamp_on_close: bool,
684) -> anyhow::Result<(Option<Data>, Option<Data>)> {
685    // We don't handle `TbboMsg` here as Nautilus separates this schema
686    // into quotes and trades when loading, and the live client will
687    // never subscribe to `tbbo`.
688    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
689        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
690        let result = decode_mbo_msg(
691            msg,
692            instrument_id,
693            price_precision,
694            Some(ts_init),
695            include_trades,
696        )?;
697        match result {
698            (Some(delta), None) => (Some(Data::Delta(delta)), None),
699            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
700            (None, None) => (None, None),
701            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
702        }
703    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
704        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
705        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
706        (Some(Data::Trade(trade)), None)
707    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
708        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
709        let result = decode_mbp1_msg(
710            msg,
711            instrument_id,
712            price_precision,
713            Some(ts_init),
714            include_trades,
715        )?;
716        match result {
717            (quote, None) => (Some(Data::Quote(quote)), None),
718            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
719        }
720    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
721        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
722        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
723        (Some(Data::Quote(quote)), None)
724    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
725        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
726        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
727        (Some(Data::Quote(quote)), None)
728    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
729        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
730        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
731        (Some(Data::from(depth)), None)
732    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
733        let ts_init = determine_timestamp(ts_init, msg.hd.ts_event.into());
734        let bar = decode_ohlcv_msg(
735            msg,
736            instrument_id,
737            price_precision,
738            Some(ts_init),
739            bars_timestamp_on_close,
740        )?;
741        (Some(Data::Bar(bar)), None)
742    } else {
743        anyhow::bail!("DBN message type is not currently supported")
744    };
745
746    Ok(result)
747}
748
749const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
750    match ts_init {
751        Some(ts_init) => ts_init,
752        None => msg_timestamp,
753    }
754}
755
756/// # Errors
757///
758/// Returns an error if decoding the `InstrumentDefMsg` fails.
759pub fn decode_instrument_def_msg(
760    msg: &dbn::InstrumentDefMsg,
761    instrument_id: InstrumentId,
762    ts_init: Option<UnixNanos>,
763) -> anyhow::Result<InstrumentAny> {
764    match msg.instrument_class as u8 as char {
765        'K' => Ok(InstrumentAny::Equity(decode_equity(
766            msg,
767            instrument_id,
768            ts_init,
769        )?)),
770        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
771            msg,
772            instrument_id,
773            ts_init,
774        )?)),
775        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
776            msg,
777            instrument_id,
778            ts_init,
779        )?)),
780        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
781            msg,
782            instrument_id,
783            ts_init,
784        )?)),
785        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
786            msg,
787            instrument_id,
788            ts_init,
789        )?)),
790        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
791        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
792        _ => anyhow::bail!(
793            "Unsupported `instrument_class` '{}'",
794            msg.instrument_class as u8 as char
795        ),
796    }
797}
798
799/// # Errors
800///
801/// Returns an error if parsing or constructing `Equity` fails.
802pub fn decode_equity(
803    msg: &dbn::InstrumentDefMsg,
804    instrument_id: InstrumentId,
805    ts_init: Option<UnixNanos>,
806) -> anyhow::Result<Equity> {
807    let currency = parse_currency_or_usd_default(msg.currency());
808    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
809    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
810    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
811    let ts_init = ts_init.unwrap_or(ts_event);
812
813    Ok(Equity::new(
814        instrument_id,
815        instrument_id.symbol,
816        None, // No ISIN available yet
817        currency,
818        price_increment.precision,
819        price_increment,
820        Some(lot_size),
821        None, // TBD
822        None, // TBD
823        None, // TBD
824        None, // TBD
825        None, // TBD
826        None, // TBD
827        None, // TBD
828        None, // TBD
829        ts_event,
830        ts_init,
831    ))
832}
833
834/// # Errors
835///
836/// Returns an error if parsing or constructing `FuturesContract` fails.
837pub fn decode_futures_contract(
838    msg: &dbn::InstrumentDefMsg,
839    instrument_id: InstrumentId,
840    ts_init: Option<UnixNanos>,
841) -> anyhow::Result<FuturesContract> {
842    let currency = parse_currency_or_usd_default(msg.currency());
843    let exchange = Ustr::from(msg.exchange()?);
844    let underlying = Ustr::from(msg.asset()?);
845    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
846    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
847    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
848    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
849    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
850    let ts_init = ts_init.unwrap_or(ts_event);
851
852    FuturesContract::new_checked(
853        instrument_id,
854        instrument_id.symbol,
855        asset_class.unwrap_or(AssetClass::Commodity),
856        Some(exchange),
857        underlying,
858        msg.activation.into(),
859        msg.expiration.into(),
860        currency,
861        price_increment.precision,
862        price_increment,
863        multiplier,
864        lot_size,
865        None, // TBD
866        None, // TBD
867        None, // TBD
868        None, // TBD
869        None, // TBD
870        None, // TBD
871        None, // TBD
872        None, // TBD
873        ts_event,
874        ts_init,
875    )
876}
877
878/// # Errors
879///
880/// Returns an error if parsing or constructing `FuturesSpread` fails.
881pub fn decode_futures_spread(
882    msg: &dbn::InstrumentDefMsg,
883    instrument_id: InstrumentId,
884    ts_init: Option<UnixNanos>,
885) -> anyhow::Result<FuturesSpread> {
886    let exchange = Ustr::from(msg.exchange()?);
887    let underlying = Ustr::from(msg.asset()?);
888    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
889    let strategy_type = Ustr::from(msg.secsubtype()?);
890    let currency = parse_currency_or_usd_default(msg.currency());
891    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
892    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
893    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
894    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
895    let ts_init = ts_init.unwrap_or(ts_event);
896
897    FuturesSpread::new_checked(
898        instrument_id,
899        instrument_id.symbol,
900        asset_class.unwrap_or(AssetClass::Commodity),
901        Some(exchange),
902        underlying,
903        strategy_type,
904        msg.activation.into(),
905        msg.expiration.into(),
906        currency,
907        price_increment.precision,
908        price_increment,
909        multiplier,
910        lot_size,
911        None, // TBD
912        None, // TBD
913        None, // TBD
914        None, // TBD
915        None, // TBD
916        None, // TBD
917        None, // TBD
918        None, // TBD
919        ts_event,
920        ts_init,
921    )
922}
923
924/// # Errors
925///
926/// Returns an error if parsing or constructing `OptionContract` fails.
927pub fn decode_option_contract(
928    msg: &dbn::InstrumentDefMsg,
929    instrument_id: InstrumentId,
930    ts_init: Option<UnixNanos>,
931) -> anyhow::Result<OptionContract> {
932    let currency = parse_currency_or_usd_default(msg.currency());
933    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
934    let exchange = Ustr::from(msg.exchange()?);
935    let underlying = Ustr::from(msg.underlying()?);
936    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
937        Some(AssetClass::Equity)
938    } else {
939        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
940        asset_class
941    };
942    let option_kind = parse_option_kind(msg.instrument_class)?;
943    let strike_price = Price::from_raw(
944        decode_raw_price_i64(msg.strike_price),
945        strike_price_currency.precision,
946    );
947    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
948    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
949    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
950    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
951    let ts_init = ts_init.unwrap_or(ts_event);
952
953    OptionContract::new_checked(
954        instrument_id,
955        instrument_id.symbol,
956        asset_class_opt.unwrap_or(AssetClass::Commodity),
957        Some(exchange),
958        underlying,
959        option_kind,
960        strike_price,
961        currency,
962        msg.activation.into(),
963        msg.expiration.into(),
964        price_increment.precision,
965        price_increment,
966        multiplier,
967        lot_size,
968        None, // TBD
969        None, // TBD
970        None, // TBD
971        None, // TBD
972        None, // TBD
973        None, // TBD
974        None, // TBD
975        None, // TBD
976        ts_event,
977        ts_init,
978    )
979}
980
981/// # Errors
982///
983/// Returns an error if parsing or constructing `OptionSpread` fails.
984pub fn decode_option_spread(
985    msg: &dbn::InstrumentDefMsg,
986    instrument_id: InstrumentId,
987    ts_init: Option<UnixNanos>,
988) -> anyhow::Result<OptionSpread> {
989    let exchange = Ustr::from(msg.exchange()?);
990    let underlying = Ustr::from(msg.underlying()?);
991    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
992        Some(AssetClass::Equity)
993    } else {
994        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
995        asset_class
996    };
997    let strategy_type = Ustr::from(msg.secsubtype()?);
998    let currency = parse_currency_or_usd_default(msg.currency());
999    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1000    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1001    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1002    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1003    let ts_init = ts_init.unwrap_or(ts_event);
1004
1005    OptionSpread::new_checked(
1006        instrument_id,
1007        instrument_id.symbol,
1008        asset_class_opt.unwrap_or(AssetClass::Commodity),
1009        Some(exchange),
1010        underlying,
1011        strategy_type,
1012        msg.activation.into(),
1013        msg.expiration.into(),
1014        currency,
1015        price_increment.precision,
1016        price_increment,
1017        multiplier,
1018        lot_size,
1019        None, // TBD
1020        None, // TBD
1021        None, // TBD
1022        None, // TBD
1023        None, // TBD
1024        None, // TBD
1025        None, // TBD
1026        None, // TBD
1027        ts_event,
1028        ts_init,
1029    )
1030}
1031
1032/// # Errors
1033///
1034/// Returns an error if constructing `DatabentoImbalance` fails.
1035pub fn decode_imbalance_msg(
1036    msg: &dbn::ImbalanceMsg,
1037    instrument_id: InstrumentId,
1038    price_precision: u8,
1039    ts_init: Option<UnixNanos>,
1040) -> anyhow::Result<DatabentoImbalance> {
1041    let ts_event = msg.ts_recv.into();
1042    let ts_init = ts_init.unwrap_or(ts_event);
1043
1044    DatabentoImbalance::new(
1045        instrument_id,
1046        Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1047        Price::from_raw(
1048            decode_raw_price_i64(msg.cont_book_clr_price),
1049            price_precision,
1050        ),
1051        Price::from_raw(
1052            decode_raw_price_i64(msg.auct_interest_clr_price),
1053            price_precision,
1054        ),
1055        Quantity::new(f64::from(msg.paired_qty), 0),
1056        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1057        parse_order_side(msg.side),
1058        msg.significant_imbalance as c_char,
1059        msg.hd.ts_event.into(),
1060        ts_event,
1061        ts_init,
1062    )
1063}
1064
1065/// # Errors
1066///
1067/// Returns an error if constructing `DatabentoStatistics` fails.
1068///
1069/// # Panics
1070///
1071/// Panics if `msg.stat_type` or `msg.update_action` is not a valid enum variant.
1072pub fn decode_statistics_msg(
1073    msg: &dbn::StatMsg,
1074    instrument_id: InstrumentId,
1075    price_precision: u8,
1076    ts_init: Option<UnixNanos>,
1077) -> anyhow::Result<DatabentoStatistics> {
1078    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1079        .expect("Invalid value for `stat_type`");
1080    let update_action = DatabentoStatisticUpdateAction::from_u8(msg.update_action)
1081        .expect("Invalid value for `update_action`");
1082    let ts_event = msg.ts_recv.into();
1083    let ts_init = ts_init.unwrap_or(ts_event);
1084
1085    DatabentoStatistics::new(
1086        instrument_id,
1087        stat_type,
1088        update_action,
1089        decode_optional_price(msg.price, price_precision),
1090        decode_optional_quantity(msg.quantity),
1091        msg.channel_id,
1092        msg.stat_flags,
1093        msg.sequence,
1094        msg.ts_ref.into(),
1095        msg.ts_in_delta,
1096        msg.hd.ts_event.into(),
1097        ts_event,
1098        ts_init,
1099    )
1100}
1101
1102////////////////////////////////////////////////////////////////////////////////
1103// Tests
1104////////////////////////////////////////////////////////////////////////////////
1105#[cfg(test)]
1106mod tests {
1107    use std::path::{Path, PathBuf};
1108
1109    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1110    use fallible_streaming_iterator::FallibleStreamingIterator;
1111    use nautilus_model::instruments::Instrument;
1112    use rstest::*;
1113
1114    use super::*;
1115
1116    fn test_data_path() -> PathBuf {
1117        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1118    }
1119
1120    #[rstest]
1121    #[case('Y' as c_char, Some(true))]
1122    #[case('N' as c_char, Some(false))]
1123    #[case('X' as c_char, None)]
1124    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1125        assert_eq!(parse_optional_bool(input), expected);
1126    }
1127
1128    #[rstest]
1129    #[case('A' as c_char, OrderSide::Sell)]
1130    #[case('B' as c_char, OrderSide::Buy)]
1131    #[case('X' as c_char, OrderSide::NoOrderSide)]
1132    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1133        assert_eq!(parse_order_side(input), expected);
1134    }
1135
1136    #[rstest]
1137    #[case('A' as c_char, AggressorSide::Seller)]
1138    #[case('B' as c_char, AggressorSide::Buyer)]
1139    #[case('X' as c_char, AggressorSide::NoAggressor)]
1140    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1141        assert_eq!(parse_aggressor_side(input), expected);
1142    }
1143
1144    #[rstest]
1145    #[case('A' as c_char, Ok(BookAction::Add))]
1146    #[case('C' as c_char, Ok(BookAction::Delete))]
1147    #[case('F' as c_char, Ok(BookAction::Update))]
1148    #[case('M' as c_char, Ok(BookAction::Update))]
1149    #[case('R' as c_char, Ok(BookAction::Clear))]
1150    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1151    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1152        match parse_book_action(input) {
1153            Ok(action) => assert_eq!(Ok(action), expected),
1154            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1155        }
1156    }
1157
1158    #[rstest]
1159    #[case('C' as c_char, Ok(OptionKind::Call))]
1160    #[case('P' as c_char, Ok(OptionKind::Put))]
1161    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1162    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1163        match parse_option_kind(input) {
1164            Ok(kind) => assert_eq!(Ok(kind), expected),
1165            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1166        }
1167    }
1168
1169    #[rstest]
1170    #[case(Ok("USD"), Currency::USD())]
1171    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1172    #[case(Ok(""), Currency::USD())]
1173    #[case(Err("Error"), Currency::USD())]
1174    fn test_parse_currency_or_usd_default(
1175        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1176        #[case] expected: Currency,
1177    ) {
1178        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1179        assert_eq!(actual, expected);
1180    }
1181
1182    #[rstest]
1183    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1184    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1185    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1186    #[case("XXX", Ok((None, None)))]
1187    #[case("D", Err("Value string is too short"))]
1188    fn test_parse_cfi_iso10926(
1189        #[case] input: &str,
1190        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1191    ) {
1192        match parse_cfi_iso10926(input) {
1193            Ok(result) => assert_eq!(Ok(result), expected),
1194            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1195        }
1196    }
1197
1198    #[rstest]
1199    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1200    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1201    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1202    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1203        let actual = decode_price_increment(value, precision);
1204        assert_eq!(actual, expected);
1205    }
1206
1207    #[rstest]
1208    #[case(i64::MAX, 2, None)] // None for i64::MAX
1209    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1210    #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] // Arbitrary valid price
1211    fn test_decode_optional_price(
1212        #[case] value: i64,
1213        #[case] precision: u8,
1214        #[case] expected: Option<Price>,
1215    ) {
1216        let actual = decode_optional_price(value, precision);
1217        assert_eq!(actual, expected);
1218    }
1219
1220    #[rstest]
1221    #[case(i64::MAX, None)] // None for i32::MAX
1222    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1223    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1224    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1225        let actual = decode_optional_quantity(value);
1226        assert_eq!(actual, expected);
1227    }
1228
1229    #[rstest]
1230    fn test_decode_mbo_msg() {
1231        let path = test_data_path().join("test_data.mbo.dbn.zst");
1232        let mut dbn_stream = Decoder::from_zstd_file(path)
1233            .unwrap()
1234            .decode_stream::<dbn::MboMsg>();
1235        let msg = dbn_stream.next().unwrap().unwrap();
1236
1237        let instrument_id = InstrumentId::from("ESM4.GLBX");
1238        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1239        let delta = delta.unwrap();
1240
1241        assert_eq!(delta.instrument_id, instrument_id);
1242        assert_eq!(delta.action, BookAction::Delete);
1243        assert_eq!(delta.order.side, OrderSide::Sell);
1244        assert_eq!(delta.order.price, Price::from("3722.75"));
1245        assert_eq!(delta.order.size, Quantity::from("1"));
1246        assert_eq!(delta.order.order_id, 647_784_973_705);
1247        assert_eq!(delta.flags, 128);
1248        assert_eq!(delta.sequence, 1_170_352);
1249        assert_eq!(delta.ts_event, msg.ts_recv);
1250        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1251        assert_eq!(delta.ts_init, 0);
1252    }
1253
1254    #[rstest]
1255    fn test_decode_mbp1_msg() {
1256        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1257        let mut dbn_stream = Decoder::from_zstd_file(path)
1258            .unwrap()
1259            .decode_stream::<dbn::Mbp1Msg>();
1260        let msg = dbn_stream.next().unwrap().unwrap();
1261
1262        let instrument_id = InstrumentId::from("ESM4.GLBX");
1263        let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1264
1265        assert_eq!(quote.instrument_id, instrument_id);
1266        assert_eq!(quote.bid_price, Price::from("3720.25"));
1267        assert_eq!(quote.ask_price, Price::from("3720.50"));
1268        assert_eq!(quote.bid_size, Quantity::from("24"));
1269        assert_eq!(quote.ask_size, Quantity::from("11"));
1270        assert_eq!(quote.ts_event, msg.ts_recv);
1271        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1272        assert_eq!(quote.ts_init, 0);
1273    }
1274
1275    #[rstest]
1276    fn test_decode_bbo_1s_msg() {
1277        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1278        let mut dbn_stream = Decoder::from_zstd_file(path)
1279            .unwrap()
1280            .decode_stream::<dbn::BboMsg>();
1281        let msg = dbn_stream.next().unwrap().unwrap();
1282
1283        let instrument_id = InstrumentId::from("ESM4.GLBX");
1284        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1285
1286        assert_eq!(quote.instrument_id, instrument_id);
1287        assert_eq!(quote.bid_price, Price::from("5199.50"));
1288        assert_eq!(quote.ask_price, Price::from("5199.75"));
1289        assert_eq!(quote.bid_size, Quantity::from("26"));
1290        assert_eq!(quote.ask_size, Quantity::from("23"));
1291        assert_eq!(quote.ts_event, msg.ts_recv);
1292        assert_eq!(quote.ts_event, 1715248801000000000);
1293        assert_eq!(quote.ts_init, 0);
1294    }
1295
1296    #[rstest]
1297    fn test_decode_bbo_1m_msg() {
1298        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1299        let mut dbn_stream = Decoder::from_zstd_file(path)
1300            .unwrap()
1301            .decode_stream::<dbn::BboMsg>();
1302        let msg = dbn_stream.next().unwrap().unwrap();
1303
1304        let instrument_id = InstrumentId::from("ESM4.GLBX");
1305        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1306
1307        assert_eq!(quote.instrument_id, instrument_id);
1308        assert_eq!(quote.bid_price, Price::from("5199.50"));
1309        assert_eq!(quote.ask_price, Price::from("5199.75"));
1310        assert_eq!(quote.bid_size, Quantity::from("33"));
1311        assert_eq!(quote.ask_size, Quantity::from("17"));
1312        assert_eq!(quote.ts_event, msg.ts_recv);
1313        assert_eq!(quote.ts_event, 1715248800000000000);
1314        assert_eq!(quote.ts_init, 0);
1315    }
1316
1317    #[rstest]
1318    fn test_decode_mbp10_msg() {
1319        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1320        let mut dbn_stream = Decoder::from_zstd_file(path)
1321            .unwrap()
1322            .decode_stream::<dbn::Mbp10Msg>();
1323        let msg = dbn_stream.next().unwrap().unwrap();
1324
1325        let instrument_id = InstrumentId::from("ESM4.GLBX");
1326        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1327
1328        assert_eq!(depth10.instrument_id, instrument_id);
1329        assert_eq!(depth10.bids.len(), 10);
1330        assert_eq!(depth10.asks.len(), 10);
1331        assert_eq!(depth10.bid_counts.len(), 10);
1332        assert_eq!(depth10.ask_counts.len(), 10);
1333        assert_eq!(depth10.flags, 128);
1334        assert_eq!(depth10.sequence, 1_170_352);
1335        assert_eq!(depth10.ts_event, msg.ts_recv);
1336        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1337        assert_eq!(depth10.ts_init, 0);
1338    }
1339
1340    #[rstest]
1341    fn test_decode_trade_msg() {
1342        let path = test_data_path().join("test_data.trades.dbn.zst");
1343        let mut dbn_stream = Decoder::from_zstd_file(path)
1344            .unwrap()
1345            .decode_stream::<dbn::TradeMsg>();
1346        let msg = dbn_stream.next().unwrap().unwrap();
1347
1348        let instrument_id = InstrumentId::from("ESM4.GLBX");
1349        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1350
1351        assert_eq!(trade.instrument_id, instrument_id);
1352        assert_eq!(trade.price, Price::from("3720.25"));
1353        assert_eq!(trade.size, Quantity::from("5"));
1354        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1355        assert_eq!(trade.trade_id.to_string(), "1170380");
1356        assert_eq!(trade.ts_event, msg.ts_recv);
1357        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1358        assert_eq!(trade.ts_init, 0);
1359    }
1360
1361    #[rstest]
1362    fn test_decode_tbbo_msg() {
1363        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1364        let mut dbn_stream = Decoder::from_zstd_file(path)
1365            .unwrap()
1366            .decode_stream::<dbn::Mbp1Msg>();
1367        let msg = dbn_stream.next().unwrap().unwrap();
1368
1369        let instrument_id = InstrumentId::from("ESM4.GLBX");
1370        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1371
1372        assert_eq!(quote.instrument_id, instrument_id);
1373        assert_eq!(quote.bid_price, Price::from("3720.25"));
1374        assert_eq!(quote.ask_price, Price::from("3720.50"));
1375        assert_eq!(quote.bid_size, Quantity::from("26"));
1376        assert_eq!(quote.ask_size, Quantity::from("7"));
1377        assert_eq!(quote.ts_event, msg.ts_recv);
1378        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1379        assert_eq!(quote.ts_init, 0);
1380
1381        assert_eq!(trade.instrument_id, instrument_id);
1382        assert_eq!(trade.price, Price::from("3720.25"));
1383        assert_eq!(trade.size, Quantity::from("5"));
1384        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1385        assert_eq!(trade.trade_id.to_string(), "1170380");
1386        assert_eq!(trade.ts_event, msg.ts_recv);
1387        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1388        assert_eq!(trade.ts_init, 0);
1389    }
1390
1391    #[ignore = "Requires updated test data"]
1392    #[rstest]
1393    fn test_decode_ohlcv_msg() {
1394        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1395        let mut dbn_stream = Decoder::from_zstd_file(path)
1396            .unwrap()
1397            .decode_stream::<dbn::OhlcvMsg>();
1398        let msg = dbn_stream.next().unwrap().unwrap();
1399
1400        let instrument_id = InstrumentId::from("ESM4.GLBX");
1401        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1402
1403        assert_eq!(
1404            bar.bar_type,
1405            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1406        );
1407        assert_eq!(bar.open, Price::from("3720.25"));
1408        assert_eq!(bar.high, Price::from("3720.50"));
1409        assert_eq!(bar.low, Price::from("3720.25"));
1410        assert_eq!(bar.close, Price::from("3720.50"));
1411        assert_eq!(bar.ts_event, 1_609_160_400_000_000_000);
1412        assert_eq!(bar.ts_init, 1_609_160_401_000_000_000); // Adjusted to open + interval
1413    }
1414
1415    #[rstest]
1416    fn test_decode_definition_msg() {
1417        let path = test_data_path().join("test_data.definition.dbn.zst");
1418        let mut dbn_stream = Decoder::from_zstd_file(path)
1419            .unwrap()
1420            .decode_stream::<dbn::InstrumentDefMsg>();
1421        let msg = dbn_stream.next().unwrap().unwrap();
1422
1423        let instrument_id = InstrumentId::from("ESM4.GLBX");
1424        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1425
1426        assert!(result.is_ok());
1427        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1428    }
1429
1430    #[rstest]
1431    fn test_decode_status_msg() {
1432        let path = test_data_path().join("test_data.status.dbn.zst");
1433        let mut dbn_stream = Decoder::from_zstd_file(path)
1434            .unwrap()
1435            .decode_stream::<dbn::StatusMsg>();
1436        let msg = dbn_stream.next().unwrap().unwrap();
1437
1438        let instrument_id = InstrumentId::from("ESM4.GLBX");
1439        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1440
1441        assert_eq!(status.instrument_id, instrument_id);
1442        assert_eq!(status.action, MarketStatusAction::Trading);
1443        assert_eq!(status.ts_event, msg.hd.ts_event);
1444        assert_eq!(status.ts_init, 0);
1445        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1446        assert_eq!(status.trading_event, None);
1447        assert_eq!(status.is_trading, Some(true));
1448        assert_eq!(status.is_quoting, Some(true));
1449        assert_eq!(status.is_short_sell_restricted, None);
1450    }
1451
1452    #[rstest]
1453    fn test_decode_imbalance_msg() {
1454        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1455        let mut dbn_stream = Decoder::from_zstd_file(path)
1456            .unwrap()
1457            .decode_stream::<dbn::ImbalanceMsg>();
1458        let msg = dbn_stream.next().unwrap().unwrap();
1459
1460        let instrument_id = InstrumentId::from("ESM4.GLBX");
1461        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1462
1463        assert_eq!(imbalance.instrument_id, instrument_id);
1464        assert_eq!(imbalance.ref_price, Price::from("229.43"));
1465        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1466        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1467        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1468        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1469        assert_eq!(imbalance.side, OrderSide::Buy);
1470        assert_eq!(imbalance.significant_imbalance, 126);
1471        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1472        assert_eq!(imbalance.ts_recv, msg.ts_recv);
1473        assert_eq!(imbalance.ts_init, 0);
1474    }
1475
1476    #[rstest]
1477    fn test_decode_statistics_msg() {
1478        let path = test_data_path().join("test_data.statistics.dbn.zst");
1479        let mut dbn_stream = Decoder::from_zstd_file(path)
1480            .unwrap()
1481            .decode_stream::<dbn::StatMsg>();
1482        let msg = dbn_stream.next().unwrap().unwrap();
1483
1484        let instrument_id = InstrumentId::from("ESM4.GLBX");
1485        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1486
1487        assert_eq!(statistics.instrument_id, instrument_id);
1488        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1489        assert_eq!(
1490            statistics.update_action,
1491            DatabentoStatisticUpdateAction::Added
1492        );
1493        assert_eq!(statistics.price, Some(Price::from("100.00")));
1494        assert_eq!(statistics.quantity, None);
1495        assert_eq!(statistics.channel_id, 13);
1496        assert_eq!(statistics.stat_flags, 255);
1497        assert_eq!(statistics.sequence, 2);
1498        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1499        assert_eq!(statistics.ts_in_delta, 26961);
1500        assert_eq!(statistics.ts_event, msg.hd.ts_event);
1501        assert_eq!(statistics.ts_recv, msg.ts_recv);
1502        assert_eq!(statistics.ts_init, 0);
1503    }
1504}