nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24    },
25    enums::{
26        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28    },
29    identifiers::{InstrumentId, TradeId},
30    instruments::{
31        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32    },
33    types::{Currency, Price, Quantity, price::decode_raw_price_i64},
34};
35use ustr::Ustr;
36
37use super::{
38    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
39    types::{DatabentoImbalance, DatabentoStatistics},
40};
41
42// SAFETY: Known valid value
43const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
44
45const BAR_SPEC_1S: BarSpecification = BarSpecification {
46    step: STEP_ONE,
47    aggregation: BarAggregation::Second,
48    price_type: PriceType::Last,
49};
50const BAR_SPEC_1M: BarSpecification = BarSpecification {
51    step: STEP_ONE,
52    aggregation: BarAggregation::Minute,
53    price_type: PriceType::Last,
54};
55const BAR_SPEC_1H: BarSpecification = BarSpecification {
56    step: STEP_ONE,
57    aggregation: BarAggregation::Hour,
58    price_type: PriceType::Last,
59};
60const BAR_SPEC_1D: BarSpecification = BarSpecification {
61    step: STEP_ONE,
62    aggregation: BarAggregation::Day,
63    price_type: PriceType::Last,
64};
65
66const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
67const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
68const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
69const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
70
71#[must_use]
72pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
73    match c as u8 as char {
74        'Y' => Some(true),
75        'N' => Some(false),
76        _ => None,
77    }
78}
79
80#[must_use]
81pub const fn parse_order_side(c: c_char) -> OrderSide {
82    match c as u8 as char {
83        'A' => OrderSide::Sell,
84        'B' => OrderSide::Buy,
85        _ => OrderSide::NoOrderSide,
86    }
87}
88
89#[must_use]
90pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
91    match c as u8 as char {
92        'A' => AggressorSide::Seller,
93        'B' => AggressorSide::Buyer,
94        _ => AggressorSide::NoAggressor,
95    }
96}
97
98/// Parses a Databento book action character into a `BookAction` enum.
99///
100/// # Errors
101///
102/// Returns an error if `c` is not a valid `BookAction` character.
103pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
104    match c as u8 as char {
105        'A' => Ok(BookAction::Add),
106        'C' => Ok(BookAction::Delete),
107        'F' => Ok(BookAction::Update),
108        'M' => Ok(BookAction::Update),
109        'R' => Ok(BookAction::Clear),
110        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
111    }
112}
113
114/// Parses a Databento option kind character into an `OptionKind` enum.
115///
116/// # Errors
117///
118/// Returns an error if `c` is not a valid `OptionKind` character.
119pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
120    match c as u8 as char {
121        'C' => Ok(OptionKind::Call),
122        'P' => Ok(OptionKind::Put),
123        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
124    }
125}
126
127fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
128    match value {
129        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
130            tracing::warn!("Unknown currency code '{value}', defaulting to USD");
131            Currency::USD()
132        }),
133        Ok(_) => Currency::USD(),
134        Err(e) => {
135            tracing::error!("Error parsing currency: {e}");
136            Currency::USD()
137        }
138    }
139}
140
141/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
142///
143/// # Errors
144///
145/// Returns an error if `value` has fewer than 3 characters.
146pub fn parse_cfi_iso10926(
147    value: &str,
148) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
149    let chars: Vec<char> = value.chars().collect();
150    if chars.len() < 3 {
151        anyhow::bail!("Value string is too short");
152    }
153
154    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
155    let cfi_category = chars[0];
156    let cfi_group = chars[1];
157    let cfi_attribute1 = chars[2];
158    // let cfi_attribute2 = value[3];
159    // let cfi_attribute3 = value[4];
160    // let cfi_attribute4 = value[5];
161
162    let mut asset_class = match cfi_category {
163        'D' => Some(AssetClass::Debt),
164        'E' => Some(AssetClass::Equity),
165        'S' => None,
166        _ => None,
167    };
168
169    let instrument_class = match cfi_group {
170        'I' => Some(InstrumentClass::Future),
171        _ => None,
172    };
173
174    if cfi_attribute1 == 'I' {
175        asset_class = Some(AssetClass::Index);
176    }
177
178    Ok((asset_class, instrument_class))
179}
180
181/// Parses a Databento status reason code into a human-readable string.
182///
183/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
184///
185/// # Errors
186///
187/// Returns an error if `value` is an invalid status reason code.
188pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
189    let value_str = match value {
190        0 => return Ok(None),
191        1 => "Scheduled",
192        2 => "Surveillance intervention",
193        3 => "Market event",
194        4 => "Instrument activation",
195        5 => "Instrument expiration",
196        6 => "Recovery in process",
197        10 => "Regulatory",
198        11 => "Administrative",
199        12 => "Non-compliance",
200        13 => "Filings not current",
201        14 => "SEC trading suspension",
202        15 => "New issue",
203        16 => "Issue available",
204        17 => "Issues reviewed",
205        18 => "Filing requirements satisfied",
206        30 => "News pending",
207        31 => "News released",
208        32 => "News and resumption times",
209        33 => "News not forthcoming",
210        40 => "Order imbalance",
211        50 => "LULD pause",
212        60 => "Operational",
213        70 => "Additional information requested",
214        80 => "Merger effective",
215        90 => "ETF",
216        100 => "Corporate action",
217        110 => "New Security offering",
218        120 => "Market wide halt level 1",
219        121 => "Market wide halt level 2",
220        122 => "Market wide halt level 3",
221        123 => "Market wide halt carryover",
222        124 => "Market wide halt resumption",
223        130 => "Quotation not available",
224        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
225    };
226
227    Ok(Some(Ustr::from(value_str)))
228}
229
230/// Parses a Databento status trading event code into a human-readable string.
231///
232/// # Errors
233///
234/// Returns an error if `value` is an invalid status trading event code.
235pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
236    let value_str = match value {
237        0 => return Ok(None),
238        1 => "No cancel",
239        2 => "Change trading session",
240        3 => "Implied matching on",
241        4 => "Implied matching off",
242        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
243    };
244
245    Ok(Some(Ustr::from(value_str)))
246}
247
248/// Decodes a price from the given value, expressed in units of 1e-9.
249#[must_use]
250pub fn decode_price(value: i64, precision: u8) -> Price {
251    Price::from_raw(decode_raw_price_i64(value), precision)
252}
253
254/// Decodes a quantity from the given value, expressed in standard whole-number units.
255#[must_use]
256pub fn decode_quantity(value: u64) -> Quantity {
257    Quantity::from(value)
258}
259
260/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
261#[must_use]
262pub fn decode_price_increment(value: i64, precision: u8) -> Price {
263    match value {
264        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
265        _ => decode_price(value, precision),
266    }
267}
268
269/// Decodes a price from the given optional value, expressed in units of 1e-9.
270#[must_use]
271pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
272    match value {
273        i64::MAX => None,
274        _ => Some(decode_price(value, precision)),
275    }
276}
277
278/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
279#[must_use]
280pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
281    match value {
282        i64::MAX => None,
283        _ => Some(Quantity::from(value)),
284    }
285}
286
287/// Decodes a multiplier from the given value, expressed in units of 1e-9.
288/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
289///
290/// # Errors
291///
292/// Returns an error if value is negative (invalid multiplier).
293pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
294    match value {
295        0 | i64::MAX => Ok(Quantity::from(1)),
296        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
297        v => {
298            // Work in integers: v is fixed-point with 9 fractional digits.
299            // Build a canonical decimal string without floating-point.
300            let abs = v as u128;
301
302            const SCALE: u128 = 1_000_000_000;
303            let int_part = abs / SCALE;
304            let frac_part = abs % SCALE;
305
306            // Format fractional part with exactly 9 digits, then trim trailing zeros
307            // to keep a canonical representation.
308            if frac_part == 0 {
309                // Pure integer
310                Ok(Quantity::from(int_part as u64))
311            } else {
312                let mut frac_str = format!("{frac_part:09}");
313                while frac_str.ends_with('0') {
314                    frac_str.pop();
315                }
316                let s = format!("{int_part}.{frac_str}");
317                Ok(Quantity::from(s))
318            }
319        }
320    }
321}
322
323/// Decodes a lot size from the given value, expressed in standard whole-number units.
324#[must_use]
325pub fn decode_lot_size(value: i32) -> Quantity {
326    match value {
327        0 | i32::MAX => Quantity::from(1),
328        value => Quantity::from(value),
329    }
330}
331
332#[must_use]
333fn is_trade_msg(order_side: OrderSide, action: c_char) -> bool {
334    order_side == OrderSide::NoOrderSide || action as u8 as char == 'T'
335}
336
337/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
338///
339/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
340/// whether the message represents an order book update or a trade execution.
341///
342/// # Errors
343///
344/// Returns an error if decoding the MBO message fails.
345pub fn decode_mbo_msg(
346    msg: &dbn::MboMsg,
347    instrument_id: InstrumentId,
348    price_precision: u8,
349    ts_init: Option<UnixNanos>,
350    include_trades: bool,
351) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
352    let side = parse_order_side(msg.side);
353    if is_trade_msg(side, msg.action) {
354        if include_trades && msg.size > 0 {
355            let ts_event = msg.ts_recv.into();
356            let ts_init = ts_init.unwrap_or(ts_event);
357
358            let trade = TradeTick::new(
359                instrument_id,
360                Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
361                Quantity::from(msg.size),
362                parse_aggressor_side(msg.side),
363                TradeId::new(itoa::Buffer::new().format(msg.sequence)),
364                ts_event,
365                ts_init,
366            );
367            return Ok((None, Some(trade)));
368        }
369
370        return Ok((None, None));
371    }
372
373    let order = BookOrder::new(
374        side,
375        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
376        Quantity::from(msg.size),
377        msg.order_id,
378    );
379    let ts_event = msg.ts_recv.into();
380    let ts_init = ts_init.unwrap_or(ts_event);
381
382    let delta = OrderBookDelta::new(
383        instrument_id,
384        parse_book_action(msg.action)?,
385        order,
386        msg.flags.raw(),
387        msg.sequence.into(),
388        ts_event,
389        ts_init,
390    );
391
392    Ok((Some(delta), None))
393}
394
395/// Decodes a Databento Trade message into a `TradeTick`.
396///
397/// # Errors
398///
399/// Returns an error if decoding the Trade message fails.
400pub fn decode_trade_msg(
401    msg: &dbn::TradeMsg,
402    instrument_id: InstrumentId,
403    price_precision: u8,
404    ts_init: Option<UnixNanos>,
405) -> anyhow::Result<TradeTick> {
406    let ts_event = msg.ts_recv.into();
407    let ts_init = ts_init.unwrap_or(ts_event);
408
409    let trade = TradeTick::new(
410        instrument_id,
411        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
412        Quantity::from(msg.size),
413        parse_aggressor_side(msg.side),
414        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
415        ts_event,
416        ts_init,
417    );
418
419    Ok(trade)
420}
421
422/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
423///
424/// # Errors
425///
426/// Returns an error if decoding the TBBO message fails.
427pub fn decode_tbbo_msg(
428    msg: &dbn::TbboMsg,
429    instrument_id: InstrumentId,
430    price_precision: u8,
431    ts_init: Option<UnixNanos>,
432) -> anyhow::Result<(QuoteTick, TradeTick)> {
433    let top_level = &msg.levels[0];
434    let ts_event = msg.ts_recv.into();
435    let ts_init = ts_init.unwrap_or(ts_event);
436
437    let quote = QuoteTick::new(
438        instrument_id,
439        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
440        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
441        Quantity::from(top_level.bid_sz),
442        Quantity::from(top_level.ask_sz),
443        ts_event,
444        ts_init,
445    );
446
447    let trade = TradeTick::new(
448        instrument_id,
449        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
450        Quantity::from(msg.size),
451        parse_aggressor_side(msg.side),
452        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
453        ts_event,
454        ts_init,
455    );
456
457    Ok((quote, trade))
458}
459
460/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
461///
462/// # Errors
463///
464/// Returns an error if decoding the MBP1 message fails.
465pub fn decode_mbp1_msg(
466    msg: &dbn::Mbp1Msg,
467    instrument_id: InstrumentId,
468    price_precision: u8,
469    ts_init: Option<UnixNanos>,
470    include_trades: bool,
471) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
472    let top_level = &msg.levels[0];
473    let ts_event = msg.ts_recv.into();
474    let ts_init = ts_init.unwrap_or(ts_event);
475
476    let quote = QuoteTick::new(
477        instrument_id,
478        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
479        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
480        Quantity::from(top_level.bid_sz),
481        Quantity::from(top_level.ask_sz),
482        ts_event,
483        ts_init,
484    );
485
486    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
487        Some(TradeTick::new(
488            instrument_id,
489            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
490            Quantity::from(msg.size),
491            parse_aggressor_side(msg.side),
492            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
493            ts_event,
494            ts_init,
495        ))
496    } else {
497        None
498    };
499
500    Ok((quote, maybe_trade))
501}
502
503/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
504///
505/// # Errors
506///
507/// Returns an error if decoding the BBO message fails.
508pub fn decode_bbo_msg(
509    msg: &dbn::BboMsg,
510    instrument_id: InstrumentId,
511    price_precision: u8,
512    ts_init: Option<UnixNanos>,
513) -> anyhow::Result<QuoteTick> {
514    let top_level = &msg.levels[0];
515    let ts_event = msg.ts_recv.into();
516    let ts_init = ts_init.unwrap_or(ts_event);
517
518    let quote = QuoteTick::new(
519        instrument_id,
520        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
521        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
522        Quantity::from(top_level.bid_sz),
523        Quantity::from(top_level.ask_sz),
524        ts_event,
525        ts_init,
526    );
527
528    Ok(quote)
529}
530
531/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
532///
533/// # Errors
534///
535/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
536pub fn decode_mbp10_msg(
537    msg: &dbn::Mbp10Msg,
538    instrument_id: InstrumentId,
539    price_precision: u8,
540    ts_init: Option<UnixNanos>,
541) -> anyhow::Result<OrderBookDepth10> {
542    let mut bids = Vec::with_capacity(DEPTH10_LEN);
543    let mut asks = Vec::with_capacity(DEPTH10_LEN);
544    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
545    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
546
547    for level in &msg.levels {
548        let bid_order = BookOrder::new(
549            OrderSide::Buy,
550            Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
551            Quantity::from(level.bid_sz),
552            0,
553        );
554
555        let ask_order = BookOrder::new(
556            OrderSide::Sell,
557            Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
558            Quantity::from(level.ask_sz),
559            0,
560        );
561
562        bids.push(bid_order);
563        asks.push(ask_order);
564        bid_counts.push(level.bid_ct);
565        ask_counts.push(level.ask_ct);
566    }
567
568    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
569        anyhow::anyhow!(
570            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
571            v.len()
572        )
573    })?;
574
575    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
576        anyhow::anyhow!(
577            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
578            v.len()
579        )
580    })?;
581
582    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
583        anyhow::anyhow!(
584            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
585            v.len()
586        )
587    })?;
588
589    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
590        anyhow::anyhow!(
591            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
592            v.len()
593        )
594    })?;
595
596    let ts_event = msg.ts_recv.into();
597    let ts_init = ts_init.unwrap_or(ts_event);
598
599    let depth = OrderBookDepth10::new(
600        instrument_id,
601        bids,
602        asks,
603        bid_counts,
604        ask_counts,
605        msg.flags.raw(),
606        msg.sequence.into(),
607        ts_event,
608        ts_init,
609    );
610
611    Ok(depth)
612}
613
614/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
615///
616/// Returns a tuple containing a `QuoteTick` and an optional `TradeTick` based on the message content.
617///
618/// # Errors
619///
620/// Returns an error if decoding the CMBP1 message fails.
621pub fn decode_cmbp1_msg(
622    msg: &dbn::Cmbp1Msg,
623    instrument_id: InstrumentId,
624    price_precision: u8,
625    ts_init: Option<UnixNanos>,
626    include_trades: bool,
627) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
628    let top_level = &msg.levels[0];
629    let ts_event = msg.ts_recv.into();
630    let ts_init = ts_init.unwrap_or(ts_event);
631
632    let quote = QuoteTick::new(
633        instrument_id,
634        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
635        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
636        Quantity::from(top_level.bid_sz),
637        Quantity::from(top_level.ask_sz),
638        ts_event,
639        ts_init,
640    );
641
642    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
643        // Use UUID4 for trade ID as CMBP1 doesn't have a sequence field
644        Some(TradeTick::new(
645            instrument_id,
646            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
647            Quantity::from(msg.size),
648            parse_aggressor_side(msg.side),
649            TradeId::new(UUID4::new().to_string()),
650            ts_event,
651            ts_init,
652        ))
653    } else {
654        None
655    };
656
657    Ok((quote, maybe_trade))
658}
659
660/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
661///
662/// Returns a `QuoteTick` representing the consolidated best bid and offer.
663///
664/// # Errors
665///
666/// Returns an error if decoding the CBBO message fails.
667pub fn decode_cbbo_msg(
668    msg: &dbn::CbboMsg,
669    instrument_id: InstrumentId,
670    price_precision: u8,
671    ts_init: Option<UnixNanos>,
672) -> anyhow::Result<QuoteTick> {
673    let top_level = &msg.levels[0];
674    let ts_event = msg.ts_recv.into();
675    let ts_init = ts_init.unwrap_or(ts_event);
676
677    let quote = QuoteTick::new(
678        instrument_id,
679        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
680        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
681        Quantity::from(top_level.bid_sz),
682        Quantity::from(top_level.ask_sz),
683        ts_event,
684        ts_init,
685    );
686
687    Ok(quote)
688}
689
690/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
691///
692/// Returns a tuple containing both a `QuoteTick` and a `TradeTick`.
693///
694/// # Errors
695///
696/// Returns an error if decoding the TCBBO message fails.
697pub fn decode_tcbbo_msg(
698    msg: &dbn::CbboMsg,
699    instrument_id: InstrumentId,
700    price_precision: u8,
701    ts_init: Option<UnixNanos>,
702) -> anyhow::Result<(QuoteTick, TradeTick)> {
703    let top_level = &msg.levels[0];
704    let ts_event = msg.ts_recv.into();
705    let ts_init = ts_init.unwrap_or(ts_event);
706
707    let quote = QuoteTick::new(
708        instrument_id,
709        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
710        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
711        Quantity::from(top_level.bid_sz),
712        Quantity::from(top_level.ask_sz),
713        ts_event,
714        ts_init,
715    );
716
717    // Use UUID4 for trade ID as TCBBO doesn't have a sequence field
718    let trade = TradeTick::new(
719        instrument_id,
720        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
721        Quantity::from(msg.size),
722        parse_aggressor_side(msg.side),
723        TradeId::new(UUID4::new().to_string()),
724        ts_event,
725        ts_init,
726    );
727
728    Ok((quote, trade))
729}
730
731/// # Errors
732///
733/// Returns an error if `rtype` is not a supported bar aggregation.
734pub fn decode_bar_type(
735    msg: &dbn::OhlcvMsg,
736    instrument_id: InstrumentId,
737) -> anyhow::Result<BarType> {
738    let bar_type = match msg.hd.rtype {
739        32 => {
740            // ohlcv-1s
741            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
742        }
743        33 => {
744            // ohlcv-1m
745            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
746        }
747        34 => {
748            // ohlcv-1h
749            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
750        }
751        35 => {
752            // ohlcv-1d
753            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
754        }
755        36 => {
756            // ohlcv-eod
757            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
758        }
759        _ => anyhow::bail!(
760            "`rtype` is not a supported bar aggregation, was {}",
761            msg.hd.rtype
762        ),
763    };
764
765    Ok(bar_type)
766}
767
768/// # Errors
769///
770/// Returns an error if `rtype` is not a supported bar aggregation.
771pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
772    let adjustment = match msg.hd.rtype {
773        32 => {
774            // ohlcv-1s
775            BAR_CLOSE_ADJUSTMENT_1S
776        }
777        33 => {
778            // ohlcv-1m
779            BAR_CLOSE_ADJUSTMENT_1M
780        }
781        34 => {
782            // ohlcv-1h
783            BAR_CLOSE_ADJUSTMENT_1H
784        }
785        35 | 36 => {
786            // ohlcv-1d and ohlcv-eod
787            BAR_CLOSE_ADJUSTMENT_1D
788        }
789        _ => anyhow::bail!(
790            "`rtype` is not a supported bar aggregation, was {}",
791            msg.hd.rtype
792        ),
793    };
794
795    Ok(adjustment.into())
796}
797
798/// # Errors
799///
800/// Returns an error if decoding the OHLCV message fails.
801pub fn decode_ohlcv_msg(
802    msg: &dbn::OhlcvMsg,
803    instrument_id: InstrumentId,
804    price_precision: u8,
805    ts_init: Option<UnixNanos>,
806    timestamp_on_close: bool,
807) -> anyhow::Result<Bar> {
808    let bar_type = decode_bar_type(msg, instrument_id)?;
809    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
810
811    let ts_event_raw = msg.hd.ts_event.into();
812    let ts_close = ts_event_raw + ts_event_adjustment;
813    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
814
815    let ts_event = if timestamp_on_close {
816        ts_close
817    } else {
818        ts_event_raw
819    };
820
821    let bar = Bar::new(
822        bar_type,
823        Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
824        Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
825        Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
826        Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
827        Quantity::from(msg.volume),
828        ts_event,
829        ts_init,
830    );
831
832    Ok(bar)
833}
834
835/// Decodes a Databento status message into an `InstrumentStatus` event.
836///
837/// # Errors
838///
839/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
840pub fn decode_status_msg(
841    msg: &dbn::StatusMsg,
842    instrument_id: InstrumentId,
843    ts_init: Option<UnixNanos>,
844) -> anyhow::Result<InstrumentStatus> {
845    let ts_event = msg.hd.ts_event.into();
846    let ts_init = ts_init.unwrap_or(ts_event);
847
848    let action = MarketStatusAction::from_u16(msg.action)
849        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
850
851    let status = InstrumentStatus::new(
852        instrument_id,
853        action,
854        ts_event,
855        ts_init,
856        parse_status_reason(msg.reason)?,
857        parse_status_trading_event(msg.trading_event)?,
858        parse_optional_bool(msg.is_trading),
859        parse_optional_bool(msg.is_quoting),
860        parse_optional_bool(msg.is_short_sell_restricted),
861    );
862
863    Ok(status)
864}
865
866/// # Errors
867///
868/// Returns an error if decoding the record type fails or encounters unsupported message.
869pub fn decode_record(
870    record: &dbn::RecordRef,
871    instrument_id: InstrumentId,
872    price_precision: u8,
873    ts_init: Option<UnixNanos>,
874    include_trades: bool,
875    bars_timestamp_on_close: bool,
876) -> anyhow::Result<(Option<Data>, Option<Data>)> {
877    // Note: TBBO and TCBBO messages provide both quotes and trades.
878    // TBBO is handled explicitly below, while TCBBO is handled by
879    // the CbboMsg branch based on whether it has trade data.
880    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
881        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
882        let result = decode_mbo_msg(
883            msg,
884            instrument_id,
885            price_precision,
886            Some(ts_init),
887            include_trades,
888        )?;
889        match result {
890            (Some(delta), None) => (Some(Data::Delta(delta)), None),
891            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
892            (None, None) => (None, None),
893            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
894        }
895    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
896        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
897        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
898        (Some(Data::Trade(trade)), None)
899    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
900        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
901        let result = decode_mbp1_msg(
902            msg,
903            instrument_id,
904            price_precision,
905            Some(ts_init),
906            include_trades,
907        )?;
908        match result {
909            (quote, None) => (Some(Data::Quote(quote)), None),
910            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
911        }
912    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
913        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
914        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
915        (Some(Data::Quote(quote)), None)
916    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
917        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
918        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
919        (Some(Data::Quote(quote)), None)
920    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
921        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
922        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
923        (Some(Data::from(depth)), None)
924    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
925        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
926        // it will be set correctly in decode_ohlcv_msg instead
927        let bar = decode_ohlcv_msg(
928            msg,
929            instrument_id,
930            price_precision,
931            ts_init,
932            bars_timestamp_on_close,
933        )?;
934        (Some(Data::Bar(bar)), None)
935    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
936        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
937        let result = decode_cmbp1_msg(
938            msg,
939            instrument_id,
940            price_precision,
941            Some(ts_init),
942            include_trades,
943        )?;
944        match result {
945            (quote, None) => (Some(Data::Quote(quote)), None),
946            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
947        }
948    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
949        // TBBO always has both quote and trade
950        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
951        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
952        (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
953    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
954        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
955        if msg.price != i64::MAX && msg.size > 0 {
956            // TCBBO - has both quote and trade
957            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
958            let (quote, trade) =
959                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
960            (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
961        } else {
962            // Regular CBBO - quote only
963            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
964            let quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
965            (Some(Data::Quote(quote)), None)
966        }
967    } else {
968        anyhow::bail!("DBN message type is not currently supported")
969    };
970
971    Ok(result)
972}
973
974const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
975    match ts_init {
976        Some(ts_init) => ts_init,
977        None => msg_timestamp,
978    }
979}
980
981/// # Errors
982///
983/// Returns an error if decoding the `InstrumentDefMsg` fails.
984pub fn decode_instrument_def_msg(
985    msg: &dbn::InstrumentDefMsg,
986    instrument_id: InstrumentId,
987    ts_init: Option<UnixNanos>,
988) -> anyhow::Result<InstrumentAny> {
989    match msg.instrument_class as u8 as char {
990        'K' => Ok(InstrumentAny::Equity(decode_equity(
991            msg,
992            instrument_id,
993            ts_init,
994        )?)),
995        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
996            msg,
997            instrument_id,
998            ts_init,
999        )?)),
1000        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1001            msg,
1002            instrument_id,
1003            ts_init,
1004        )?)),
1005        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1006            msg,
1007            instrument_id,
1008            ts_init,
1009        )?)),
1010        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1011            msg,
1012            instrument_id,
1013            ts_init,
1014        )?)),
1015        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1016        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1017        _ => anyhow::bail!(
1018            "Unsupported `instrument_class` '{}'",
1019            msg.instrument_class as u8 as char
1020        ),
1021    }
1022}
1023
1024/// Decodes a Databento instrument definition message into an `Equity` instrument.
1025///
1026/// # Errors
1027///
1028/// Returns an error if parsing or constructing `Equity` fails.
1029pub fn decode_equity(
1030    msg: &dbn::InstrumentDefMsg,
1031    instrument_id: InstrumentId,
1032    ts_init: Option<UnixNanos>,
1033) -> anyhow::Result<Equity> {
1034    let currency = parse_currency_or_usd_default(msg.currency());
1035    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1036    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1037    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1038    let ts_init = ts_init.unwrap_or(ts_event);
1039
1040    Ok(Equity::new(
1041        instrument_id,
1042        instrument_id.symbol,
1043        None, // No ISIN available yet
1044        currency,
1045        price_increment.precision,
1046        price_increment,
1047        Some(lot_size),
1048        None, // TBD
1049        None, // TBD
1050        None, // TBD
1051        None, // TBD
1052        None, // TBD
1053        None, // TBD
1054        None, // TBD
1055        None, // TBD
1056        ts_event,
1057        ts_init,
1058    ))
1059}
1060
1061/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1062///
1063/// # Errors
1064///
1065/// Returns an error if parsing or constructing `FuturesContract` fails.
1066pub fn decode_futures_contract(
1067    msg: &dbn::InstrumentDefMsg,
1068    instrument_id: InstrumentId,
1069    ts_init: Option<UnixNanos>,
1070) -> anyhow::Result<FuturesContract> {
1071    let currency = parse_currency_or_usd_default(msg.currency());
1072    let exchange = Ustr::from(msg.exchange()?);
1073    let underlying = Ustr::from(msg.asset()?);
1074    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1075    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1076    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1077    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1078    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1079    let ts_init = ts_init.unwrap_or(ts_event);
1080
1081    FuturesContract::new_checked(
1082        instrument_id,
1083        instrument_id.symbol,
1084        asset_class.unwrap_or(AssetClass::Commodity),
1085        Some(exchange),
1086        underlying,
1087        msg.activation.into(),
1088        msg.expiration.into(),
1089        currency,
1090        price_increment.precision,
1091        price_increment,
1092        multiplier,
1093        lot_size,
1094        None, // TBD
1095        None, // TBD
1096        None, // TBD
1097        None, // TBD
1098        None, // TBD
1099        None, // TBD
1100        None, // TBD
1101        None, // TBD
1102        ts_event,
1103        ts_init,
1104    )
1105}
1106
1107/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1108///
1109/// # Errors
1110///
1111/// Returns an error if parsing or constructing `FuturesSpread` fails.
1112pub fn decode_futures_spread(
1113    msg: &dbn::InstrumentDefMsg,
1114    instrument_id: InstrumentId,
1115    ts_init: Option<UnixNanos>,
1116) -> anyhow::Result<FuturesSpread> {
1117    let exchange = Ustr::from(msg.exchange()?);
1118    let underlying = Ustr::from(msg.asset()?);
1119    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1120    let strategy_type = Ustr::from(msg.secsubtype()?);
1121    let currency = parse_currency_or_usd_default(msg.currency());
1122    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1123    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1124    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1125    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1126    let ts_init = ts_init.unwrap_or(ts_event);
1127
1128    FuturesSpread::new_checked(
1129        instrument_id,
1130        instrument_id.symbol,
1131        asset_class.unwrap_or(AssetClass::Commodity),
1132        Some(exchange),
1133        underlying,
1134        strategy_type,
1135        msg.activation.into(),
1136        msg.expiration.into(),
1137        currency,
1138        price_increment.precision,
1139        price_increment,
1140        multiplier,
1141        lot_size,
1142        None, // TBD
1143        None, // TBD
1144        None, // TBD
1145        None, // TBD
1146        None, // TBD
1147        None, // TBD
1148        None, // TBD
1149        None, // TBD
1150        ts_event,
1151        ts_init,
1152    )
1153}
1154
1155/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1156///
1157/// # Errors
1158///
1159/// Returns an error if parsing or constructing `OptionContract` fails.
1160pub fn decode_option_contract(
1161    msg: &dbn::InstrumentDefMsg,
1162    instrument_id: InstrumentId,
1163    ts_init: Option<UnixNanos>,
1164) -> anyhow::Result<OptionContract> {
1165    let currency = parse_currency_or_usd_default(msg.currency());
1166    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1167    let exchange = Ustr::from(msg.exchange()?);
1168    let underlying = Ustr::from(msg.underlying()?);
1169    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1170        Some(AssetClass::Equity)
1171    } else {
1172        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1173        asset_class
1174    };
1175    let option_kind = parse_option_kind(msg.instrument_class)?;
1176    let strike_price = Price::from_raw(
1177        decode_raw_price_i64(msg.strike_price),
1178        strike_price_currency.precision,
1179    );
1180    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1181    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1182    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1183    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1184    let ts_init = ts_init.unwrap_or(ts_event);
1185
1186    OptionContract::new_checked(
1187        instrument_id,
1188        instrument_id.symbol,
1189        asset_class_opt.unwrap_or(AssetClass::Commodity),
1190        Some(exchange),
1191        underlying,
1192        option_kind,
1193        strike_price,
1194        currency,
1195        msg.activation.into(),
1196        msg.expiration.into(),
1197        price_increment.precision,
1198        price_increment,
1199        multiplier,
1200        lot_size,
1201        None, // TBD
1202        None, // TBD
1203        None, // TBD
1204        None, // TBD
1205        None, // TBD
1206        None, // TBD
1207        None, // TBD
1208        None, // TBD
1209        ts_event,
1210        ts_init,
1211    )
1212}
1213
1214/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1215///
1216/// # Errors
1217///
1218/// Returns an error if parsing or constructing `OptionSpread` fails.
1219pub fn decode_option_spread(
1220    msg: &dbn::InstrumentDefMsg,
1221    instrument_id: InstrumentId,
1222    ts_init: Option<UnixNanos>,
1223) -> anyhow::Result<OptionSpread> {
1224    let exchange = Ustr::from(msg.exchange()?);
1225    let underlying = Ustr::from(msg.underlying()?);
1226    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1227        Some(AssetClass::Equity)
1228    } else {
1229        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1230        asset_class
1231    };
1232    let strategy_type = Ustr::from(msg.secsubtype()?);
1233    let currency = parse_currency_or_usd_default(msg.currency());
1234    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1235    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1236    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1237    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1238    let ts_init = ts_init.unwrap_or(ts_event);
1239
1240    OptionSpread::new_checked(
1241        instrument_id,
1242        instrument_id.symbol,
1243        asset_class_opt.unwrap_or(AssetClass::Commodity),
1244        Some(exchange),
1245        underlying,
1246        strategy_type,
1247        msg.activation.into(),
1248        msg.expiration.into(),
1249        currency,
1250        price_increment.precision,
1251        price_increment,
1252        multiplier,
1253        lot_size,
1254        None, // TBD
1255        None, // TBD
1256        None, // TBD
1257        None, // TBD
1258        None, // TBD
1259        None, // TBD
1260        None, // TBD
1261        None, // TBD
1262        ts_event,
1263        ts_init,
1264    )
1265}
1266
1267/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1268///
1269/// # Errors
1270///
1271/// Returns an error if constructing `DatabentoImbalance` fails.
1272pub fn decode_imbalance_msg(
1273    msg: &dbn::ImbalanceMsg,
1274    instrument_id: InstrumentId,
1275    price_precision: u8,
1276    ts_init: Option<UnixNanos>,
1277) -> anyhow::Result<DatabentoImbalance> {
1278    let ts_event = msg.ts_recv.into();
1279    let ts_init = ts_init.unwrap_or(ts_event);
1280
1281    Ok(DatabentoImbalance::new(
1282        instrument_id,
1283        Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1284        Price::from_raw(
1285            decode_raw_price_i64(msg.cont_book_clr_price),
1286            price_precision,
1287        ),
1288        Price::from_raw(
1289            decode_raw_price_i64(msg.auct_interest_clr_price),
1290            price_precision,
1291        ),
1292        Quantity::new(f64::from(msg.paired_qty), 0),
1293        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1294        parse_order_side(msg.side),
1295        msg.significant_imbalance as c_char,
1296        msg.hd.ts_event.into(),
1297        ts_event,
1298        ts_init,
1299    ))
1300}
1301
1302/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1303///
1304/// # Errors
1305///
1306/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1307/// `msg.update_action` is not a valid enum variant.
1308pub fn decode_statistics_msg(
1309    msg: &dbn::StatMsg,
1310    instrument_id: InstrumentId,
1311    price_precision: u8,
1312    ts_init: Option<UnixNanos>,
1313) -> anyhow::Result<DatabentoStatistics> {
1314    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1315        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1316    let update_action =
1317        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1318            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1319        })?;
1320    let ts_event = msg.ts_recv.into();
1321    let ts_init = ts_init.unwrap_or(ts_event);
1322
1323    Ok(DatabentoStatistics::new(
1324        instrument_id,
1325        stat_type,
1326        update_action,
1327        decode_optional_price(msg.price, price_precision),
1328        decode_optional_quantity(msg.quantity),
1329        msg.channel_id,
1330        msg.stat_flags,
1331        msg.sequence,
1332        msg.ts_ref.into(),
1333        msg.ts_in_delta,
1334        msg.hd.ts_event.into(),
1335        ts_event,
1336        ts_init,
1337    ))
1338}
1339
1340////////////////////////////////////////////////////////////////////////////////
1341// Tests
1342////////////////////////////////////////////////////////////////////////////////
1343#[cfg(test)]
1344mod tests {
1345    use std::path::{Path, PathBuf};
1346
1347    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1348    use fallible_streaming_iterator::FallibleStreamingIterator;
1349    use nautilus_model::instruments::Instrument;
1350    use rstest::*;
1351
1352    use super::*;
1353
1354    fn test_data_path() -> PathBuf {
1355        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1356    }
1357
1358    #[rstest]
1359    #[case('Y' as c_char, Some(true))]
1360    #[case('N' as c_char, Some(false))]
1361    #[case('X' as c_char, None)]
1362    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1363        assert_eq!(parse_optional_bool(input), expected);
1364    }
1365
1366    #[rstest]
1367    #[case('A' as c_char, OrderSide::Sell)]
1368    #[case('B' as c_char, OrderSide::Buy)]
1369    #[case('X' as c_char, OrderSide::NoOrderSide)]
1370    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1371        assert_eq!(parse_order_side(input), expected);
1372    }
1373
1374    #[rstest]
1375    #[case('A' as c_char, AggressorSide::Seller)]
1376    #[case('B' as c_char, AggressorSide::Buyer)]
1377    #[case('X' as c_char, AggressorSide::NoAggressor)]
1378    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1379        assert_eq!(parse_aggressor_side(input), expected);
1380    }
1381
1382    #[rstest]
1383    #[case('A' as c_char, Ok(BookAction::Add))]
1384    #[case('C' as c_char, Ok(BookAction::Delete))]
1385    #[case('F' as c_char, Ok(BookAction::Update))]
1386    #[case('M' as c_char, Ok(BookAction::Update))]
1387    #[case('R' as c_char, Ok(BookAction::Clear))]
1388    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1389    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1390        match parse_book_action(input) {
1391            Ok(action) => assert_eq!(Ok(action), expected),
1392            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1393        }
1394    }
1395
1396    #[rstest]
1397    #[case('C' as c_char, Ok(OptionKind::Call))]
1398    #[case('P' as c_char, Ok(OptionKind::Put))]
1399    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1400    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1401        match parse_option_kind(input) {
1402            Ok(kind) => assert_eq!(Ok(kind), expected),
1403            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1404        }
1405    }
1406
1407    #[rstest]
1408    #[case(Ok("USD"), Currency::USD())]
1409    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1410    #[case(Ok(""), Currency::USD())]
1411    #[case(Err("Error"), Currency::USD())]
1412    fn test_parse_currency_or_usd_default(
1413        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1414        #[case] expected: Currency,
1415    ) {
1416        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1417        assert_eq!(actual, expected);
1418    }
1419
1420    #[rstest]
1421    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1422    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1423    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1424    #[case("XXX", Ok((None, None)))]
1425    #[case("D", Err("Value string is too short"))]
1426    fn test_parse_cfi_iso10926(
1427        #[case] input: &str,
1428        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1429    ) {
1430        match parse_cfi_iso10926(input) {
1431            Ok(result) => assert_eq!(Ok(result), expected),
1432            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1433        }
1434    }
1435
1436    #[rstest]
1437    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1438    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1439    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1440    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1441        let actual = decode_price_increment(value, precision);
1442        assert_eq!(actual, expected);
1443    }
1444
1445    #[rstest]
1446    #[case(i64::MAX, 2, None)] // None for i64::MAX
1447    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1448    #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] // Arbitrary valid price
1449    fn test_decode_optional_price(
1450        #[case] value: i64,
1451        #[case] precision: u8,
1452        #[case] expected: Option<Price>,
1453    ) {
1454        let actual = decode_optional_price(value, precision);
1455        assert_eq!(actual, expected);
1456    }
1457
1458    #[rstest]
1459    #[case(i64::MAX, None)] // None for i32::MAX
1460    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1461    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1462    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1463        let actual = decode_optional_quantity(value);
1464        assert_eq!(actual, expected);
1465    }
1466
1467    #[rstest]
1468    #[case(0, Quantity::from(1))] // Default fallback for 0
1469    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1470    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1471    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1472    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1473    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1474    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1475    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1476    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1477    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1478        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1479    }
1480
1481    #[rstest]
1482    #[case(-1_500_000_000)] // Large negative value
1483    #[case(-1)] // Small negative value
1484    #[case(-999_999_999)] // Another negative value
1485    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1486        let result = decode_multiplier(raw);
1487        assert!(result.is_err());
1488        assert!(
1489            result
1490                .unwrap_err()
1491                .to_string()
1492                .contains("Invalid negative multiplier")
1493        );
1494    }
1495
1496    #[rstest]
1497    fn test_decode_mbo_msg() {
1498        let path = test_data_path().join("test_data.mbo.dbn.zst");
1499        let mut dbn_stream = Decoder::from_zstd_file(path)
1500            .unwrap()
1501            .decode_stream::<dbn::MboMsg>();
1502        let msg = dbn_stream.next().unwrap().unwrap();
1503
1504        let instrument_id = InstrumentId::from("ESM4.GLBX");
1505        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1506        let delta = delta.unwrap();
1507
1508        assert_eq!(delta.instrument_id, instrument_id);
1509        assert_eq!(delta.action, BookAction::Delete);
1510        assert_eq!(delta.order.side, OrderSide::Sell);
1511        assert_eq!(delta.order.price, Price::from("3722.75"));
1512        assert_eq!(delta.order.size, Quantity::from("1"));
1513        assert_eq!(delta.order.order_id, 647_784_973_705);
1514        assert_eq!(delta.flags, 128);
1515        assert_eq!(delta.sequence, 1_170_352);
1516        assert_eq!(delta.ts_event, msg.ts_recv);
1517        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1518        assert_eq!(delta.ts_init, 0);
1519    }
1520
1521    #[rstest]
1522    fn test_decode_mbp1_msg() {
1523        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1524        let mut dbn_stream = Decoder::from_zstd_file(path)
1525            .unwrap()
1526            .decode_stream::<dbn::Mbp1Msg>();
1527        let msg = dbn_stream.next().unwrap().unwrap();
1528
1529        let instrument_id = InstrumentId::from("ESM4.GLBX");
1530        let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1531
1532        assert_eq!(quote.instrument_id, instrument_id);
1533        assert_eq!(quote.bid_price, Price::from("3720.25"));
1534        assert_eq!(quote.ask_price, Price::from("3720.50"));
1535        assert_eq!(quote.bid_size, Quantity::from("24"));
1536        assert_eq!(quote.ask_size, Quantity::from("11"));
1537        assert_eq!(quote.ts_event, msg.ts_recv);
1538        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1539        assert_eq!(quote.ts_init, 0);
1540    }
1541
1542    #[rstest]
1543    fn test_decode_bbo_1s_msg() {
1544        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1545        let mut dbn_stream = Decoder::from_zstd_file(path)
1546            .unwrap()
1547            .decode_stream::<dbn::BboMsg>();
1548        let msg = dbn_stream.next().unwrap().unwrap();
1549
1550        let instrument_id = InstrumentId::from("ESM4.GLBX");
1551        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1552
1553        assert_eq!(quote.instrument_id, instrument_id);
1554        assert_eq!(quote.bid_price, Price::from("5199.50"));
1555        assert_eq!(quote.ask_price, Price::from("5199.75"));
1556        assert_eq!(quote.bid_size, Quantity::from("26"));
1557        assert_eq!(quote.ask_size, Quantity::from("23"));
1558        assert_eq!(quote.ts_event, msg.ts_recv);
1559        assert_eq!(quote.ts_event, 1715248801000000000);
1560        assert_eq!(quote.ts_init, 0);
1561    }
1562
1563    #[rstest]
1564    fn test_decode_bbo_1m_msg() {
1565        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1566        let mut dbn_stream = Decoder::from_zstd_file(path)
1567            .unwrap()
1568            .decode_stream::<dbn::BboMsg>();
1569        let msg = dbn_stream.next().unwrap().unwrap();
1570
1571        let instrument_id = InstrumentId::from("ESM4.GLBX");
1572        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1573
1574        assert_eq!(quote.instrument_id, instrument_id);
1575        assert_eq!(quote.bid_price, Price::from("5199.50"));
1576        assert_eq!(quote.ask_price, Price::from("5199.75"));
1577        assert_eq!(quote.bid_size, Quantity::from("33"));
1578        assert_eq!(quote.ask_size, Quantity::from("17"));
1579        assert_eq!(quote.ts_event, msg.ts_recv);
1580        assert_eq!(quote.ts_event, 1715248800000000000);
1581        assert_eq!(quote.ts_init, 0);
1582    }
1583
1584    #[rstest]
1585    fn test_decode_mbp10_msg() {
1586        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1587        let mut dbn_stream = Decoder::from_zstd_file(path)
1588            .unwrap()
1589            .decode_stream::<dbn::Mbp10Msg>();
1590        let msg = dbn_stream.next().unwrap().unwrap();
1591
1592        let instrument_id = InstrumentId::from("ESM4.GLBX");
1593        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1594
1595        assert_eq!(depth10.instrument_id, instrument_id);
1596        assert_eq!(depth10.bids.len(), 10);
1597        assert_eq!(depth10.asks.len(), 10);
1598        assert_eq!(depth10.bid_counts.len(), 10);
1599        assert_eq!(depth10.ask_counts.len(), 10);
1600        assert_eq!(depth10.flags, 128);
1601        assert_eq!(depth10.sequence, 1_170_352);
1602        assert_eq!(depth10.ts_event, msg.ts_recv);
1603        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1604        assert_eq!(depth10.ts_init, 0);
1605    }
1606
1607    #[rstest]
1608    fn test_decode_trade_msg() {
1609        let path = test_data_path().join("test_data.trades.dbn.zst");
1610        let mut dbn_stream = Decoder::from_zstd_file(path)
1611            .unwrap()
1612            .decode_stream::<dbn::TradeMsg>();
1613        let msg = dbn_stream.next().unwrap().unwrap();
1614
1615        let instrument_id = InstrumentId::from("ESM4.GLBX");
1616        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1617
1618        assert_eq!(trade.instrument_id, instrument_id);
1619        assert_eq!(trade.price, Price::from("3720.25"));
1620        assert_eq!(trade.size, Quantity::from("5"));
1621        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1622        assert_eq!(trade.trade_id.to_string(), "1170380");
1623        assert_eq!(trade.ts_event, msg.ts_recv);
1624        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1625        assert_eq!(trade.ts_init, 0);
1626    }
1627
1628    #[rstest]
1629    fn test_decode_tbbo_msg() {
1630        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1631        let mut dbn_stream = Decoder::from_zstd_file(path)
1632            .unwrap()
1633            .decode_stream::<dbn::Mbp1Msg>();
1634        let msg = dbn_stream.next().unwrap().unwrap();
1635
1636        let instrument_id = InstrumentId::from("ESM4.GLBX");
1637        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1638
1639        assert_eq!(quote.instrument_id, instrument_id);
1640        assert_eq!(quote.bid_price, Price::from("3720.25"));
1641        assert_eq!(quote.ask_price, Price::from("3720.50"));
1642        assert_eq!(quote.bid_size, Quantity::from("26"));
1643        assert_eq!(quote.ask_size, Quantity::from("7"));
1644        assert_eq!(quote.ts_event, msg.ts_recv);
1645        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1646        assert_eq!(quote.ts_init, 0);
1647
1648        assert_eq!(trade.instrument_id, instrument_id);
1649        assert_eq!(trade.price, Price::from("3720.25"));
1650        assert_eq!(trade.size, Quantity::from("5"));
1651        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1652        assert_eq!(trade.trade_id.to_string(), "1170380");
1653        assert_eq!(trade.ts_event, msg.ts_recv);
1654        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1655        assert_eq!(trade.ts_init, 0);
1656    }
1657
1658    #[ignore = "Requires updated test data"]
1659    #[rstest]
1660    fn test_decode_ohlcv_msg() {
1661        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1662        let mut dbn_stream = Decoder::from_zstd_file(path)
1663            .unwrap()
1664            .decode_stream::<dbn::OhlcvMsg>();
1665        let msg = dbn_stream.next().unwrap().unwrap();
1666
1667        let instrument_id = InstrumentId::from("ESM4.GLBX");
1668        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1669
1670        assert_eq!(
1671            bar.bar_type,
1672            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1673        );
1674        assert_eq!(bar.open, Price::from("3720.25"));
1675        assert_eq!(bar.high, Price::from("3720.50"));
1676        assert_eq!(bar.low, Price::from("3720.25"));
1677        assert_eq!(bar.close, Price::from("3720.50"));
1678        assert_eq!(bar.ts_event, 1_609_160_400_000_000_000);
1679        assert_eq!(bar.ts_init, 1_609_160_401_000_000_000); // Adjusted to open + interval
1680    }
1681
1682    #[rstest]
1683    fn test_decode_definition_msg() {
1684        let path = test_data_path().join("test_data.definition.dbn.zst");
1685        let mut dbn_stream = Decoder::from_zstd_file(path)
1686            .unwrap()
1687            .decode_stream::<dbn::InstrumentDefMsg>();
1688        let msg = dbn_stream.next().unwrap().unwrap();
1689
1690        let instrument_id = InstrumentId::from("ESM4.GLBX");
1691        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1692
1693        assert!(result.is_ok());
1694        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1695    }
1696
1697    #[rstest]
1698    fn test_decode_status_msg() {
1699        let path = test_data_path().join("test_data.status.dbn.zst");
1700        let mut dbn_stream = Decoder::from_zstd_file(path)
1701            .unwrap()
1702            .decode_stream::<dbn::StatusMsg>();
1703        let msg = dbn_stream.next().unwrap().unwrap();
1704
1705        let instrument_id = InstrumentId::from("ESM4.GLBX");
1706        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1707
1708        assert_eq!(status.instrument_id, instrument_id);
1709        assert_eq!(status.action, MarketStatusAction::Trading);
1710        assert_eq!(status.ts_event, msg.hd.ts_event);
1711        assert_eq!(status.ts_init, 0);
1712        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1713        assert_eq!(status.trading_event, None);
1714        assert_eq!(status.is_trading, Some(true));
1715        assert_eq!(status.is_quoting, Some(true));
1716        assert_eq!(status.is_short_sell_restricted, None);
1717    }
1718
1719    #[rstest]
1720    fn test_decode_imbalance_msg() {
1721        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1722        let mut dbn_stream = Decoder::from_zstd_file(path)
1723            .unwrap()
1724            .decode_stream::<dbn::ImbalanceMsg>();
1725        let msg = dbn_stream.next().unwrap().unwrap();
1726
1727        let instrument_id = InstrumentId::from("ESM4.GLBX");
1728        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1729
1730        assert_eq!(imbalance.instrument_id, instrument_id);
1731        assert_eq!(imbalance.ref_price, Price::from("229.43"));
1732        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1733        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1734        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1735        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1736        assert_eq!(imbalance.side, OrderSide::Buy);
1737        assert_eq!(imbalance.significant_imbalance, 126);
1738        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1739        assert_eq!(imbalance.ts_recv, msg.ts_recv);
1740        assert_eq!(imbalance.ts_init, 0);
1741    }
1742
1743    #[rstest]
1744    fn test_decode_statistics_msg() {
1745        let path = test_data_path().join("test_data.statistics.dbn.zst");
1746        let mut dbn_stream = Decoder::from_zstd_file(path)
1747            .unwrap()
1748            .decode_stream::<dbn::StatMsg>();
1749        let msg = dbn_stream.next().unwrap().unwrap();
1750
1751        let instrument_id = InstrumentId::from("ESM4.GLBX");
1752        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1753
1754        assert_eq!(statistics.instrument_id, instrument_id);
1755        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1756        assert_eq!(
1757            statistics.update_action,
1758            DatabentoStatisticUpdateAction::Added
1759        );
1760        assert_eq!(statistics.price, Some(Price::from("100.00")));
1761        assert_eq!(statistics.quantity, None);
1762        assert_eq!(statistics.channel_id, 13);
1763        assert_eq!(statistics.stat_flags, 255);
1764        assert_eq!(statistics.sequence, 2);
1765        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1766        assert_eq!(statistics.ts_in_delta, 26961);
1767        assert_eq!(statistics.ts_event, msg.hd.ts_event);
1768        assert_eq!(statistics.ts_recv, msg.ts_recv);
1769        assert_eq!(statistics.ts_init, 0);
1770    }
1771
1772    #[rstest]
1773    fn test_decode_cmbp1_msg() {
1774        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1775        let mut dbn_stream = Decoder::from_zstd_file(path)
1776            .unwrap()
1777            .decode_stream::<dbn::Cmbp1Msg>();
1778        let msg = dbn_stream.next().unwrap().unwrap();
1779
1780        let instrument_id = InstrumentId::from("ESM4.GLBX");
1781        let (quote, trade) = decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1782
1783        assert_eq!(quote.instrument_id, instrument_id);
1784        assert!(quote.bid_price.raw > 0);
1785        assert!(quote.ask_price.raw > 0);
1786        assert!(quote.bid_size.raw > 0);
1787        assert!(quote.ask_size.raw > 0);
1788        assert_eq!(quote.ts_event, msg.ts_recv);
1789        assert_eq!(quote.ts_init, 0);
1790
1791        // Check if trade is present based on action
1792        if msg.action as u8 as char == 'T' {
1793            assert!(trade.is_some());
1794            let trade = trade.unwrap();
1795            assert_eq!(trade.instrument_id, instrument_id);
1796        } else {
1797            assert!(trade.is_none());
1798        }
1799    }
1800
1801    // TODO: Re-enable these tests once proper CBBO test data is available
1802    // The current test_data.cbbo.dbn.zst files contain invalid/placeholder data
1803    #[rstest]
1804    #[ignore]
1805    fn test_decode_cbbo_msg() {
1806        let path = test_data_path().join("test_data.cbbo.dbn.zst");
1807        let mut dbn_stream = Decoder::from_zstd_file(path)
1808            .unwrap()
1809            .decode_stream::<dbn::CbboMsg>();
1810        let msg = dbn_stream.next().unwrap().unwrap();
1811
1812        let instrument_id = InstrumentId::from("ESM4.GLBX");
1813        let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1814
1815        assert_eq!(quote.instrument_id, instrument_id);
1816        assert!(quote.bid_price.raw > 0);
1817        assert!(quote.ask_price.raw > 0);
1818        assert!(quote.bid_size.raw > 0);
1819        assert!(quote.ask_size.raw > 0);
1820        assert_eq!(quote.ts_event, msg.ts_recv);
1821        assert_eq!(quote.ts_init, 0);
1822    }
1823
1824    #[rstest]
1825    #[ignore]
1826    fn test_decode_cbbo_1s_msg() {
1827        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1828        let mut dbn_stream = Decoder::from_zstd_file(path)
1829            .unwrap()
1830            .decode_stream::<dbn::CbboMsg>();
1831        let msg = dbn_stream.next().unwrap().unwrap();
1832
1833        let instrument_id = InstrumentId::from("ESM4.GLBX");
1834        let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1835
1836        assert_eq!(quote.instrument_id, instrument_id);
1837        assert!(quote.bid_price.raw > 0);
1838        assert!(quote.ask_price.raw > 0);
1839        assert!(quote.bid_size.raw > 0);
1840        assert!(quote.ask_size.raw > 0);
1841        assert_eq!(quote.ts_event, msg.ts_recv);
1842        assert_eq!(quote.ts_init, 0);
1843    }
1844
1845    // TODO: Re-enable this test once proper CBBO test data is available
1846    #[rstest]
1847    fn test_decode_mbp10_msg_with_all_levels() {
1848        let mut msg = dbn::Mbp10Msg::default();
1849        for i in 0..10 {
1850            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
1851            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
1852            msg.levels[i].bid_sz = 10 + i as u32;
1853            msg.levels[i].ask_sz = 10 + i as u32;
1854            msg.levels[i].bid_ct = 1 + i as u32;
1855            msg.levels[i].ask_ct = 1 + i as u32;
1856        }
1857        msg.ts_recv = 1_609_160_400_000_704_060;
1858
1859        let instrument_id = InstrumentId::from("TEST.VENUE");
1860        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
1861
1862        assert!(result.is_ok());
1863        let depth = result.unwrap();
1864        assert_eq!(depth.bids.len(), 10);
1865        assert_eq!(depth.asks.len(), 10);
1866        assert_eq!(depth.bid_counts.len(), 10);
1867        assert_eq!(depth.ask_counts.len(), 10);
1868    }
1869
1870    #[rstest]
1871    fn test_array_conversion_error_handling() {
1872        use nautilus_model::{data::BookOrder, enums::OrderSide};
1873
1874        let mut bids = Vec::new();
1875        let mut asks = Vec::new();
1876
1877        // Intentionally create fewer than DEPTH10_LEN elements
1878        for i in 0..5 {
1879            bids.push(BookOrder::new(
1880                OrderSide::Buy,
1881                Price::from(format!("{}.00", 100 - i)),
1882                Quantity::from(10),
1883                i as u64,
1884            ));
1885            asks.push(BookOrder::new(
1886                OrderSide::Sell,
1887                Price::from(format!("{}.00", 101 + i)),
1888                Quantity::from(10),
1889                i as u64,
1890            ));
1891        }
1892
1893        let result: Result<[BookOrder; DEPTH10_LEN], _> =
1894            bids.try_into().map_err(|v: Vec<BookOrder>| {
1895                anyhow::anyhow!(
1896                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
1897                    v.len()
1898                )
1899            });
1900        assert!(result.is_err());
1901        assert!(
1902            result
1903                .unwrap_err()
1904                .to_string()
1905                .contains("Expected exactly 10 bid levels, received 5")
1906        );
1907    }
1908
1909    #[rstest]
1910    #[ignore]
1911    fn test_decode_tcbbo_msg() {
1912        let path = test_data_path().join("test_data.cbbo.dbn.zst");
1913        let mut dbn_stream = Decoder::from_zstd_file(path)
1914            .unwrap()
1915            .decode_stream::<dbn::CbboMsg>();
1916        let msg = dbn_stream.next().unwrap().unwrap();
1917
1918        // Simulate TCBBO by adding trade data
1919        let mut tcbbo_msg = msg.clone();
1920        tcbbo_msg.price = 372025;
1921        tcbbo_msg.size = 10;
1922
1923        let instrument_id = InstrumentId::from("ESM4.GLBX");
1924        let (quote, trade) =
1925            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
1926
1927        assert_eq!(quote.instrument_id, instrument_id);
1928        assert!(quote.bid_price.raw > 0);
1929        assert!(quote.ask_price.raw > 0);
1930        assert!(quote.bid_size.raw > 0);
1931        assert!(quote.ask_size.raw > 0);
1932        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
1933        assert_eq!(quote.ts_init, 0);
1934
1935        assert_eq!(trade.instrument_id, instrument_id);
1936        assert_eq!(trade.price, Price::from_raw(372025, 2));
1937        assert_eq!(trade.size, Quantity::from(10));
1938        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
1939        assert_eq!(trade.ts_init, 0);
1940    }
1941}