nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24    },
25    enums::{
26        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28    },
29    identifiers::{InstrumentId, TradeId},
30    instruments::{
31        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32    },
33    types::{Currency, Price, Quantity, price::decode_raw_price_i64},
34};
35use ustr::Ustr;
36
37use super::{
38    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
39    types::{DatabentoImbalance, DatabentoStatistics},
40};
41
42// SAFETY: Known valid value
43const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
44
45const BAR_SPEC_1S: BarSpecification = BarSpecification {
46    step: STEP_ONE,
47    aggregation: BarAggregation::Second,
48    price_type: PriceType::Last,
49};
50const BAR_SPEC_1M: BarSpecification = BarSpecification {
51    step: STEP_ONE,
52    aggregation: BarAggregation::Minute,
53    price_type: PriceType::Last,
54};
55const BAR_SPEC_1H: BarSpecification = BarSpecification {
56    step: STEP_ONE,
57    aggregation: BarAggregation::Hour,
58    price_type: PriceType::Last,
59};
60const BAR_SPEC_1D: BarSpecification = BarSpecification {
61    step: STEP_ONE,
62    aggregation: BarAggregation::Day,
63    price_type: PriceType::Last,
64};
65
66const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
67const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
68const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
69const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
70
71#[must_use]
72pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
73    match c as u8 as char {
74        'Y' => Some(true),
75        'N' => Some(false),
76        _ => None,
77    }
78}
79
80#[must_use]
81pub const fn parse_order_side(c: c_char) -> OrderSide {
82    match c as u8 as char {
83        'A' => OrderSide::Sell,
84        'B' => OrderSide::Buy,
85        _ => OrderSide::NoOrderSide,
86    }
87}
88
89#[must_use]
90pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
91    match c as u8 as char {
92        'A' => AggressorSide::Seller,
93        'B' => AggressorSide::Buyer,
94        _ => AggressorSide::NoAggressor,
95    }
96}
97
98/// Parses a Databento book action character into a `BookAction` enum.
99///
100/// # Errors
101///
102/// Returns an error if `c` is not a valid `BookAction` character.
103pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
104    match c as u8 as char {
105        'A' => Ok(BookAction::Add),
106        'C' => Ok(BookAction::Delete),
107        'F' => Ok(BookAction::Update),
108        'M' => Ok(BookAction::Update),
109        'R' => Ok(BookAction::Clear),
110        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
111    }
112}
113
114/// Parses a Databento option kind character into an `OptionKind` enum.
115///
116/// # Errors
117///
118/// Returns an error if `c` is not a valid `OptionKind` character.
119pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
120    match c as u8 as char {
121        'C' => Ok(OptionKind::Call),
122        'P' => Ok(OptionKind::Put),
123        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
124    }
125}
126
127fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
128    match value {
129        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
130            tracing::warn!("Unknown currency code '{value}', defaulting to USD");
131            Currency::USD()
132        }),
133        Ok(_) => Currency::USD(),
134        Err(e) => {
135            tracing::error!("Error parsing currency: {e}");
136            Currency::USD()
137        }
138    }
139}
140
141/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
142///
143/// # Errors
144///
145/// Returns an error if `value` has fewer than 3 characters.
146pub fn parse_cfi_iso10926(
147    value: &str,
148) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
149    let chars: Vec<char> = value.chars().collect();
150    if chars.len() < 3 {
151        anyhow::bail!("Value string is too short");
152    }
153
154    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
155    let cfi_category = chars[0];
156    let cfi_group = chars[1];
157    let cfi_attribute1 = chars[2];
158    // let cfi_attribute2 = value[3];
159    // let cfi_attribute3 = value[4];
160    // let cfi_attribute4 = value[5];
161
162    let mut asset_class = match cfi_category {
163        'D' => Some(AssetClass::Debt),
164        'E' => Some(AssetClass::Equity),
165        'S' => None,
166        _ => None,
167    };
168
169    let instrument_class = match cfi_group {
170        'I' => Some(InstrumentClass::Future),
171        _ => None,
172    };
173
174    if cfi_attribute1 == 'I' {
175        asset_class = Some(AssetClass::Index);
176    }
177
178    Ok((asset_class, instrument_class))
179}
180
181/// Parses a Databento status reason code into a human-readable string.
182///
183/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
184///
185/// # Errors
186///
187/// Returns an error if `value` is an invalid status reason code.
188pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
189    let value_str = match value {
190        0 => return Ok(None),
191        1 => "Scheduled",
192        2 => "Surveillance intervention",
193        3 => "Market event",
194        4 => "Instrument activation",
195        5 => "Instrument expiration",
196        6 => "Recovery in process",
197        10 => "Regulatory",
198        11 => "Administrative",
199        12 => "Non-compliance",
200        13 => "Filings not current",
201        14 => "SEC trading suspension",
202        15 => "New issue",
203        16 => "Issue available",
204        17 => "Issues reviewed",
205        18 => "Filing requirements satisfied",
206        30 => "News pending",
207        31 => "News released",
208        32 => "News and resumption times",
209        33 => "News not forthcoming",
210        40 => "Order imbalance",
211        50 => "LULD pause",
212        60 => "Operational",
213        70 => "Additional information requested",
214        80 => "Merger effective",
215        90 => "ETF",
216        100 => "Corporate action",
217        110 => "New Security offering",
218        120 => "Market wide halt level 1",
219        121 => "Market wide halt level 2",
220        122 => "Market wide halt level 3",
221        123 => "Market wide halt carryover",
222        124 => "Market wide halt resumption",
223        130 => "Quotation not available",
224        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
225    };
226
227    Ok(Some(Ustr::from(value_str)))
228}
229
230/// Parses a Databento status trading event code into a human-readable string.
231///
232/// # Errors
233///
234/// Returns an error if `value` is an invalid status trading event code.
235pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
236    let value_str = match value {
237        0 => return Ok(None),
238        1 => "No cancel",
239        2 => "Change trading session",
240        3 => "Implied matching on",
241        4 => "Implied matching off",
242        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
243    };
244
245    Ok(Some(Ustr::from(value_str)))
246}
247
248/// Decodes a price from the given value, expressed in units of 1e-9.
249#[must_use]
250pub fn decode_price(value: i64, precision: u8) -> Price {
251    Price::from_raw(decode_raw_price_i64(value), precision)
252}
253
254/// Decodes a quantity from the given value, expressed in standard whole-number units.
255#[must_use]
256pub fn decode_quantity(value: u64) -> Quantity {
257    Quantity::from(value)
258}
259
260/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
261#[must_use]
262pub fn decode_price_increment(value: i64, precision: u8) -> Price {
263    match value {
264        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
265        _ => decode_price(value, precision),
266    }
267}
268
269/// Decodes a price from the given optional value, expressed in units of 1e-9.
270#[must_use]
271pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
272    match value {
273        i64::MAX => None,
274        _ => Some(decode_price(value, precision)),
275    }
276}
277
278/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
279#[must_use]
280pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
281    match value {
282        i64::MAX => None,
283        _ => Some(Quantity::from(value)),
284    }
285}
286
287/// Decodes a multiplier from the given value, expressed in units of 1e-9.
288/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
289///
290/// # Errors
291///
292/// Returns an error if value is negative (invalid multiplier).
293pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
294    match value {
295        0 | i64::MAX => Ok(Quantity::from(1)),
296        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
297        v => {
298            // Work in integers: v is fixed-point with 9 fractional digits.
299            // Build a canonical decimal string without floating-point.
300            let abs = v as u128;
301
302            const SCALE: u128 = 1_000_000_000;
303            let int_part = abs / SCALE;
304            let frac_part = abs % SCALE;
305
306            // Format fractional part with exactly 9 digits, then trim trailing zeros
307            // to keep a canonical representation.
308            if frac_part == 0 {
309                // Pure integer
310                Ok(Quantity::from(int_part as u64))
311            } else {
312                let mut frac_str = format!("{frac_part:09}");
313                while frac_str.ends_with('0') {
314                    frac_str.pop();
315                }
316                let s = format!("{int_part}.{frac_str}");
317                Ok(Quantity::from(s))
318            }
319        }
320    }
321}
322
323/// Decodes a lot size from the given value, expressed in standard whole-number units.
324#[must_use]
325pub fn decode_lot_size(value: i32) -> Quantity {
326    match value {
327        0 | i32::MAX => Quantity::from(1),
328        value => Quantity::from(value),
329    }
330}
331
332#[must_use]
333fn is_trade_msg(order_side: OrderSide, action: c_char) -> bool {
334    order_side == OrderSide::NoOrderSide || action as u8 as char == 'T'
335}
336
337/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
338///
339/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
340/// whether the message represents an order book update or a trade execution.
341///
342/// # Errors
343///
344/// Returns an error if decoding the MBO message fails.
345pub fn decode_mbo_msg(
346    msg: &dbn::MboMsg,
347    instrument_id: InstrumentId,
348    price_precision: u8,
349    ts_init: Option<UnixNanos>,
350    include_trades: bool,
351) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
352    let side = parse_order_side(msg.side);
353    if is_trade_msg(side, msg.action) {
354        if include_trades && msg.size > 0 {
355            let ts_event = msg.ts_recv.into();
356            let ts_init = ts_init.unwrap_or(ts_event);
357
358            let trade = TradeTick::new(
359                instrument_id,
360                Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
361                Quantity::from(msg.size),
362                parse_aggressor_side(msg.side),
363                TradeId::new(itoa::Buffer::new().format(msg.sequence)),
364                ts_event,
365                ts_init,
366            );
367            return Ok((None, Some(trade)));
368        }
369
370        return Ok((None, None));
371    }
372
373    let order = BookOrder::new(
374        side,
375        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
376        Quantity::from(msg.size),
377        msg.order_id,
378    );
379    let ts_event = msg.ts_recv.into();
380    let ts_init = ts_init.unwrap_or(ts_event);
381
382    let delta = OrderBookDelta::new(
383        instrument_id,
384        parse_book_action(msg.action)?,
385        order,
386        msg.flags.raw(),
387        msg.sequence.into(),
388        ts_event,
389        ts_init,
390    );
391
392    Ok((Some(delta), None))
393}
394
395/// Decodes a Databento Trade message into a `TradeTick`.
396///
397/// # Errors
398///
399/// Returns an error if decoding the Trade message fails.
400pub fn decode_trade_msg(
401    msg: &dbn::TradeMsg,
402    instrument_id: InstrumentId,
403    price_precision: u8,
404    ts_init: Option<UnixNanos>,
405) -> anyhow::Result<TradeTick> {
406    let ts_event = msg.ts_recv.into();
407    let ts_init = ts_init.unwrap_or(ts_event);
408
409    let trade = TradeTick::new(
410        instrument_id,
411        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
412        Quantity::from(msg.size),
413        parse_aggressor_side(msg.side),
414        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
415        ts_event,
416        ts_init,
417    );
418
419    Ok(trade)
420}
421
422/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
423///
424/// # Errors
425///
426/// Returns an error if decoding the TBBO message fails.
427pub fn decode_tbbo_msg(
428    msg: &dbn::TbboMsg,
429    instrument_id: InstrumentId,
430    price_precision: u8,
431    ts_init: Option<UnixNanos>,
432) -> anyhow::Result<(QuoteTick, TradeTick)> {
433    let top_level = &msg.levels[0];
434    let ts_event = msg.ts_recv.into();
435    let ts_init = ts_init.unwrap_or(ts_event);
436
437    let quote = QuoteTick::new(
438        instrument_id,
439        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
440        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
441        Quantity::from(top_level.bid_sz),
442        Quantity::from(top_level.ask_sz),
443        ts_event,
444        ts_init,
445    );
446
447    let trade = TradeTick::new(
448        instrument_id,
449        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
450        Quantity::from(msg.size),
451        parse_aggressor_side(msg.side),
452        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
453        ts_event,
454        ts_init,
455    );
456
457    Ok((quote, trade))
458}
459
460/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
461///
462/// # Errors
463///
464/// Returns an error if decoding the MBP1 message fails.
465pub fn decode_mbp1_msg(
466    msg: &dbn::Mbp1Msg,
467    instrument_id: InstrumentId,
468    price_precision: u8,
469    ts_init: Option<UnixNanos>,
470    include_trades: bool,
471) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
472    let top_level = &msg.levels[0];
473    let ts_event = msg.ts_recv.into();
474    let ts_init = ts_init.unwrap_or(ts_event);
475
476    let quote = QuoteTick::new(
477        instrument_id,
478        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
479        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
480        Quantity::from(top_level.bid_sz),
481        Quantity::from(top_level.ask_sz),
482        ts_event,
483        ts_init,
484    );
485
486    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
487        Some(TradeTick::new(
488            instrument_id,
489            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
490            Quantity::from(msg.size),
491            parse_aggressor_side(msg.side),
492            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
493            ts_event,
494            ts_init,
495        ))
496    } else {
497        None
498    };
499
500    Ok((quote, maybe_trade))
501}
502
503/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
504///
505/// # Errors
506///
507/// Returns an error if decoding the BBO message fails.
508pub fn decode_bbo_msg(
509    msg: &dbn::BboMsg,
510    instrument_id: InstrumentId,
511    price_precision: u8,
512    ts_init: Option<UnixNanos>,
513) -> anyhow::Result<QuoteTick> {
514    let top_level = &msg.levels[0];
515    let ts_event = msg.ts_recv.into();
516    let ts_init = ts_init.unwrap_or(ts_event);
517
518    let quote = QuoteTick::new(
519        instrument_id,
520        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
521        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
522        Quantity::from(top_level.bid_sz),
523        Quantity::from(top_level.ask_sz),
524        ts_event,
525        ts_init,
526    );
527
528    Ok(quote)
529}
530
531/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
532///
533/// # Errors
534///
535/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
536pub fn decode_mbp10_msg(
537    msg: &dbn::Mbp10Msg,
538    instrument_id: InstrumentId,
539    price_precision: u8,
540    ts_init: Option<UnixNanos>,
541) -> anyhow::Result<OrderBookDepth10> {
542    let mut bids = Vec::with_capacity(DEPTH10_LEN);
543    let mut asks = Vec::with_capacity(DEPTH10_LEN);
544    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
545    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
546
547    for level in &msg.levels {
548        let bid_order = BookOrder::new(
549            OrderSide::Buy,
550            Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
551            Quantity::from(level.bid_sz),
552            0,
553        );
554
555        let ask_order = BookOrder::new(
556            OrderSide::Sell,
557            Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
558            Quantity::from(level.ask_sz),
559            0,
560        );
561
562        bids.push(bid_order);
563        asks.push(ask_order);
564        bid_counts.push(level.bid_ct);
565        ask_counts.push(level.ask_ct);
566    }
567
568    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
569        anyhow::anyhow!(
570            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
571            v.len()
572        )
573    })?;
574
575    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
576        anyhow::anyhow!(
577            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
578            v.len()
579        )
580    })?;
581
582    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
583        anyhow::anyhow!(
584            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
585            v.len()
586        )
587    })?;
588
589    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
590        anyhow::anyhow!(
591            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
592            v.len()
593        )
594    })?;
595
596    let ts_event = msg.ts_recv.into();
597    let ts_init = ts_init.unwrap_or(ts_event);
598
599    let depth = OrderBookDepth10::new(
600        instrument_id,
601        bids,
602        asks,
603        bid_counts,
604        ask_counts,
605        msg.flags.raw(),
606        msg.sequence.into(),
607        ts_event,
608        ts_init,
609    );
610
611    Ok(depth)
612}
613
614/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
615///
616/// Returns a tuple containing a `QuoteTick` and an optional `TradeTick` based on the message content.
617///
618/// # Errors
619///
620/// Returns an error if decoding the CMBP1 message fails.
621pub fn decode_cmbp1_msg(
622    msg: &dbn::Cmbp1Msg,
623    instrument_id: InstrumentId,
624    price_precision: u8,
625    ts_init: Option<UnixNanos>,
626    include_trades: bool,
627) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
628    let top_level = &msg.levels[0];
629    let ts_event = msg.ts_recv.into();
630    let ts_init = ts_init.unwrap_or(ts_event);
631
632    let quote = QuoteTick::new(
633        instrument_id,
634        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
635        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
636        Quantity::from(top_level.bid_sz),
637        Quantity::from(top_level.ask_sz),
638        ts_event,
639        ts_init,
640    );
641
642    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
643        // Use UUID4 for trade ID as CMBP1 doesn't have a sequence field
644        Some(TradeTick::new(
645            instrument_id,
646            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
647            Quantity::from(msg.size),
648            parse_aggressor_side(msg.side),
649            TradeId::new(UUID4::new().to_string()),
650            ts_event,
651            ts_init,
652        ))
653    } else {
654        None
655    };
656
657    Ok((quote, maybe_trade))
658}
659
660/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
661///
662/// Returns a `QuoteTick` representing the consolidated best bid and offer.
663///
664/// # Errors
665///
666/// Returns an error if decoding the CBBO message fails.
667pub fn decode_cbbo_msg(
668    msg: &dbn::CbboMsg,
669    instrument_id: InstrumentId,
670    price_precision: u8,
671    ts_init: Option<UnixNanos>,
672) -> anyhow::Result<QuoteTick> {
673    let top_level = &msg.levels[0];
674    let ts_event = msg.ts_recv.into();
675    let ts_init = ts_init.unwrap_or(ts_event);
676
677    let quote = QuoteTick::new(
678        instrument_id,
679        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
680        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
681        Quantity::from(top_level.bid_sz),
682        Quantity::from(top_level.ask_sz),
683        ts_event,
684        ts_init,
685    );
686
687    Ok(quote)
688}
689
690/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
691///
692/// Returns a tuple containing both a `QuoteTick` and a `TradeTick`.
693///
694/// # Errors
695///
696/// Returns an error if decoding the TCBBO message fails.
697pub fn decode_tcbbo_msg(
698    msg: &dbn::CbboMsg,
699    instrument_id: InstrumentId,
700    price_precision: u8,
701    ts_init: Option<UnixNanos>,
702) -> anyhow::Result<(QuoteTick, TradeTick)> {
703    let top_level = &msg.levels[0];
704    let ts_event = msg.ts_recv.into();
705    let ts_init = ts_init.unwrap_or(ts_event);
706
707    let quote = QuoteTick::new(
708        instrument_id,
709        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
710        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
711        Quantity::from(top_level.bid_sz),
712        Quantity::from(top_level.ask_sz),
713        ts_event,
714        ts_init,
715    );
716
717    // Use UUID4 for trade ID as TCBBO doesn't have a sequence field
718    let trade = TradeTick::new(
719        instrument_id,
720        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
721        Quantity::from(msg.size),
722        parse_aggressor_side(msg.side),
723        TradeId::new(UUID4::new().to_string()),
724        ts_event,
725        ts_init,
726    );
727
728    Ok((quote, trade))
729}
730
731/// # Errors
732///
733/// Returns an error if `rtype` is not a supported bar aggregation.
734pub fn decode_bar_type(
735    msg: &dbn::OhlcvMsg,
736    instrument_id: InstrumentId,
737) -> anyhow::Result<BarType> {
738    let bar_type = match msg.hd.rtype {
739        32 => {
740            // ohlcv-1s
741            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
742        }
743        33 => {
744            // ohlcv-1m
745            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
746        }
747        34 => {
748            // ohlcv-1h
749            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
750        }
751        35 => {
752            // ohlcv-1d
753            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
754        }
755        36 => {
756            // ohlcv-eod
757            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
758        }
759        _ => anyhow::bail!(
760            "`rtype` is not a supported bar aggregation, was {}",
761            msg.hd.rtype
762        ),
763    };
764
765    Ok(bar_type)
766}
767
768/// # Errors
769///
770/// Returns an error if `rtype` is not a supported bar aggregation.
771pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
772    let adjustment = match msg.hd.rtype {
773        32 => {
774            // ohlcv-1s
775            BAR_CLOSE_ADJUSTMENT_1S
776        }
777        33 => {
778            // ohlcv-1m
779            BAR_CLOSE_ADJUSTMENT_1M
780        }
781        34 => {
782            // ohlcv-1h
783            BAR_CLOSE_ADJUSTMENT_1H
784        }
785        35 | 36 => {
786            // ohlcv-1d and ohlcv-eod
787            BAR_CLOSE_ADJUSTMENT_1D
788        }
789        _ => anyhow::bail!(
790            "`rtype` is not a supported bar aggregation, was {}",
791            msg.hd.rtype
792        ),
793    };
794
795    Ok(adjustment.into())
796}
797
798/// # Errors
799///
800/// Returns an error if decoding the OHLCV message fails.
801pub fn decode_ohlcv_msg(
802    msg: &dbn::OhlcvMsg,
803    instrument_id: InstrumentId,
804    price_precision: u8,
805    ts_init: Option<UnixNanos>,
806    timestamp_on_close: bool,
807) -> anyhow::Result<Bar> {
808    let bar_type = decode_bar_type(msg, instrument_id)?;
809    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
810
811    let ts_event_raw = msg.hd.ts_event.into();
812    let ts_close = ts_event_raw + ts_event_adjustment;
813    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
814
815    let ts_event = if timestamp_on_close {
816        ts_close
817    } else {
818        ts_event_raw
819    };
820
821    let bar = Bar::new(
822        bar_type,
823        Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
824        Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
825        Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
826        Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
827        Quantity::from(msg.volume),
828        ts_event,
829        ts_init,
830    );
831
832    Ok(bar)
833}
834
835/// Decodes a Databento status message into an `InstrumentStatus` event.
836///
837/// # Errors
838///
839/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
840pub fn decode_status_msg(
841    msg: &dbn::StatusMsg,
842    instrument_id: InstrumentId,
843    ts_init: Option<UnixNanos>,
844) -> anyhow::Result<InstrumentStatus> {
845    let ts_event = msg.hd.ts_event.into();
846    let ts_init = ts_init.unwrap_or(ts_event);
847
848    let action = MarketStatusAction::from_u16(msg.action)
849        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
850
851    let status = InstrumentStatus::new(
852        instrument_id,
853        action,
854        ts_event,
855        ts_init,
856        parse_status_reason(msg.reason)?,
857        parse_status_trading_event(msg.trading_event)?,
858        parse_optional_bool(msg.is_trading),
859        parse_optional_bool(msg.is_quoting),
860        parse_optional_bool(msg.is_short_sell_restricted),
861    );
862
863    Ok(status)
864}
865
866/// # Errors
867///
868/// Returns an error if decoding the record type fails or encounters unsupported message.
869pub fn decode_record(
870    record: &dbn::RecordRef,
871    instrument_id: InstrumentId,
872    price_precision: u8,
873    ts_init: Option<UnixNanos>,
874    include_trades: bool,
875    bars_timestamp_on_close: bool,
876) -> anyhow::Result<(Option<Data>, Option<Data>)> {
877    // Note: TBBO and TCBBO messages provide both quotes and trades.
878    // TBBO is handled explicitly below, while TCBBO is handled by
879    // the CbboMsg branch based on whether it has trade data.
880    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
881        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
882        let result = decode_mbo_msg(
883            msg,
884            instrument_id,
885            price_precision,
886            Some(ts_init),
887            include_trades,
888        )?;
889        match result {
890            (Some(delta), None) => (Some(Data::Delta(delta)), None),
891            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
892            (None, None) => (None, None),
893            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
894        }
895    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
896        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
897        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
898        (Some(Data::Trade(trade)), None)
899    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
900        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
901        let result = decode_mbp1_msg(
902            msg,
903            instrument_id,
904            price_precision,
905            Some(ts_init),
906            include_trades,
907        )?;
908        match result {
909            (quote, None) => (Some(Data::Quote(quote)), None),
910            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
911        }
912    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
913        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
914        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
915        (Some(Data::Quote(quote)), None)
916    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
917        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
918        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
919        (Some(Data::Quote(quote)), None)
920    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
921        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
922        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
923        (Some(Data::from(depth)), None)
924    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
925        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
926        // it will be set correctly in decode_ohlcv_msg instead
927        let bar = decode_ohlcv_msg(
928            msg,
929            instrument_id,
930            price_precision,
931            ts_init,
932            bars_timestamp_on_close,
933        )?;
934        (Some(Data::Bar(bar)), None)
935    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
936        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
937        let result = decode_cmbp1_msg(
938            msg,
939            instrument_id,
940            price_precision,
941            Some(ts_init),
942            include_trades,
943        )?;
944        match result {
945            (quote, None) => (Some(Data::Quote(quote)), None),
946            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
947        }
948    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
949        // TBBO always has both quote and trade
950        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
951        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
952        (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
953    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
954        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
955        if msg.price != i64::MAX && msg.size > 0 {
956            // TCBBO - has both quote and trade
957            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
958            let (quote, trade) =
959                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
960            (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
961        } else {
962            // Regular CBBO - quote only
963            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
964            let quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
965            (Some(Data::Quote(quote)), None)
966        }
967    } else {
968        anyhow::bail!("DBN message type is not currently supported")
969    };
970
971    Ok(result)
972}
973
974const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
975    match ts_init {
976        Some(ts_init) => ts_init,
977        None => msg_timestamp,
978    }
979}
980
981/// # Errors
982///
983/// Returns an error if decoding the `InstrumentDefMsg` fails.
984pub fn decode_instrument_def_msg(
985    msg: &dbn::InstrumentDefMsg,
986    instrument_id: InstrumentId,
987    ts_init: Option<UnixNanos>,
988) -> anyhow::Result<InstrumentAny> {
989    match msg.instrument_class as u8 as char {
990        'K' => Ok(InstrumentAny::Equity(decode_equity(
991            msg,
992            instrument_id,
993            ts_init,
994        )?)),
995        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
996            msg,
997            instrument_id,
998            ts_init,
999        )?)),
1000        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1001            msg,
1002            instrument_id,
1003            ts_init,
1004        )?)),
1005        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1006            msg,
1007            instrument_id,
1008            ts_init,
1009        )?)),
1010        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1011            msg,
1012            instrument_id,
1013            ts_init,
1014        )?)),
1015        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1016        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1017        _ => anyhow::bail!(
1018            "Unsupported `instrument_class` '{}'",
1019            msg.instrument_class as u8 as char
1020        ),
1021    }
1022}
1023
1024/// Decodes a Databento instrument definition message into an `Equity` instrument.
1025///
1026/// # Errors
1027///
1028/// Returns an error if parsing or constructing `Equity` fails.
1029pub fn decode_equity(
1030    msg: &dbn::InstrumentDefMsg,
1031    instrument_id: InstrumentId,
1032    ts_init: Option<UnixNanos>,
1033) -> anyhow::Result<Equity> {
1034    let currency = parse_currency_or_usd_default(msg.currency());
1035    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1036    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1037    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1038    let ts_init = ts_init.unwrap_or(ts_event);
1039
1040    Ok(Equity::new(
1041        instrument_id,
1042        instrument_id.symbol,
1043        None, // No ISIN available yet
1044        currency,
1045        price_increment.precision,
1046        price_increment,
1047        Some(lot_size),
1048        None, // TBD
1049        None, // TBD
1050        None, // TBD
1051        None, // TBD
1052        None, // TBD
1053        None, // TBD
1054        None, // TBD
1055        None, // TBD
1056        ts_event,
1057        ts_init,
1058    ))
1059}
1060
1061/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1062///
1063/// # Errors
1064///
1065/// Returns an error if parsing or constructing `FuturesContract` fails.
1066pub fn decode_futures_contract(
1067    msg: &dbn::InstrumentDefMsg,
1068    instrument_id: InstrumentId,
1069    ts_init: Option<UnixNanos>,
1070) -> anyhow::Result<FuturesContract> {
1071    let currency = parse_currency_or_usd_default(msg.currency());
1072    let exchange = Ustr::from(msg.exchange()?);
1073    let underlying = Ustr::from(msg.asset()?);
1074    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1075    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1076    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1077    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1078    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1079    let ts_init = ts_init.unwrap_or(ts_event);
1080
1081    FuturesContract::new_checked(
1082        instrument_id,
1083        instrument_id.symbol,
1084        asset_class.unwrap_or(AssetClass::Commodity),
1085        Some(exchange),
1086        underlying,
1087        msg.activation.into(),
1088        msg.expiration.into(),
1089        currency,
1090        price_increment.precision,
1091        price_increment,
1092        multiplier,
1093        lot_size,
1094        None, // TBD
1095        None, // TBD
1096        None, // TBD
1097        None, // TBD
1098        None, // TBD
1099        None, // TBD
1100        None, // TBD
1101        None, // TBD
1102        ts_event,
1103        ts_init,
1104    )
1105}
1106
1107/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1108///
1109/// # Errors
1110///
1111/// Returns an error if parsing or constructing `FuturesSpread` fails.
1112pub fn decode_futures_spread(
1113    msg: &dbn::InstrumentDefMsg,
1114    instrument_id: InstrumentId,
1115    ts_init: Option<UnixNanos>,
1116) -> anyhow::Result<FuturesSpread> {
1117    let exchange = Ustr::from(msg.exchange()?);
1118    let underlying = Ustr::from(msg.asset()?);
1119    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1120    let strategy_type = Ustr::from(msg.secsubtype()?);
1121    let currency = parse_currency_or_usd_default(msg.currency());
1122    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1123    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1124    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1125    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1126    let ts_init = ts_init.unwrap_or(ts_event);
1127
1128    FuturesSpread::new_checked(
1129        instrument_id,
1130        instrument_id.symbol,
1131        asset_class.unwrap_or(AssetClass::Commodity),
1132        Some(exchange),
1133        underlying,
1134        strategy_type,
1135        msg.activation.into(),
1136        msg.expiration.into(),
1137        currency,
1138        price_increment.precision,
1139        price_increment,
1140        multiplier,
1141        lot_size,
1142        None, // TBD
1143        None, // TBD
1144        None, // TBD
1145        None, // TBD
1146        None, // TBD
1147        None, // TBD
1148        None, // TBD
1149        None, // TBD
1150        ts_event,
1151        ts_init,
1152    )
1153}
1154
1155/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1156///
1157/// # Errors
1158///
1159/// Returns an error if parsing or constructing `OptionContract` fails.
1160pub fn decode_option_contract(
1161    msg: &dbn::InstrumentDefMsg,
1162    instrument_id: InstrumentId,
1163    ts_init: Option<UnixNanos>,
1164) -> anyhow::Result<OptionContract> {
1165    let currency = parse_currency_or_usd_default(msg.currency());
1166    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1167    let exchange = Ustr::from(msg.exchange()?);
1168    let underlying = Ustr::from(msg.underlying()?);
1169    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1170        Some(AssetClass::Equity)
1171    } else {
1172        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1173        asset_class
1174    };
1175    let option_kind = parse_option_kind(msg.instrument_class)?;
1176    let strike_price = Price::from_raw(
1177        decode_raw_price_i64(msg.strike_price),
1178        strike_price_currency.precision,
1179    );
1180    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1181    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1182    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1183    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1184    let ts_init = ts_init.unwrap_or(ts_event);
1185
1186    OptionContract::new_checked(
1187        instrument_id,
1188        instrument_id.symbol,
1189        asset_class_opt.unwrap_or(AssetClass::Commodity),
1190        Some(exchange),
1191        underlying,
1192        option_kind,
1193        strike_price,
1194        currency,
1195        msg.activation.into(),
1196        msg.expiration.into(),
1197        price_increment.precision,
1198        price_increment,
1199        multiplier,
1200        lot_size,
1201        None, // TBD
1202        None, // TBD
1203        None, // TBD
1204        None, // TBD
1205        None, // TBD
1206        None, // TBD
1207        None, // TBD
1208        None, // TBD
1209        ts_event,
1210        ts_init,
1211    )
1212}
1213
1214/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1215///
1216/// # Errors
1217///
1218/// Returns an error if parsing or constructing `OptionSpread` fails.
1219pub fn decode_option_spread(
1220    msg: &dbn::InstrumentDefMsg,
1221    instrument_id: InstrumentId,
1222    ts_init: Option<UnixNanos>,
1223) -> anyhow::Result<OptionSpread> {
1224    let exchange = Ustr::from(msg.exchange()?);
1225    let underlying = Ustr::from(msg.underlying()?);
1226    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1227        Some(AssetClass::Equity)
1228    } else {
1229        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1230        asset_class
1231    };
1232    let strategy_type = Ustr::from(msg.secsubtype()?);
1233    let currency = parse_currency_or_usd_default(msg.currency());
1234    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1235    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1236    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1237    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1238    let ts_init = ts_init.unwrap_or(ts_event);
1239
1240    OptionSpread::new_checked(
1241        instrument_id,
1242        instrument_id.symbol,
1243        asset_class_opt.unwrap_or(AssetClass::Commodity),
1244        Some(exchange),
1245        underlying,
1246        strategy_type,
1247        msg.activation.into(),
1248        msg.expiration.into(),
1249        currency,
1250        price_increment.precision,
1251        price_increment,
1252        multiplier,
1253        lot_size,
1254        None, // TBD
1255        None, // TBD
1256        None, // TBD
1257        None, // TBD
1258        None, // TBD
1259        None, // TBD
1260        None, // TBD
1261        None, // TBD
1262        ts_event,
1263        ts_init,
1264    )
1265}
1266
1267/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1268///
1269/// # Errors
1270///
1271/// Returns an error if constructing `DatabentoImbalance` fails.
1272pub fn decode_imbalance_msg(
1273    msg: &dbn::ImbalanceMsg,
1274    instrument_id: InstrumentId,
1275    price_precision: u8,
1276    ts_init: Option<UnixNanos>,
1277) -> anyhow::Result<DatabentoImbalance> {
1278    let ts_event = msg.ts_recv.into();
1279    let ts_init = ts_init.unwrap_or(ts_event);
1280
1281    Ok(DatabentoImbalance::new(
1282        instrument_id,
1283        Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1284        Price::from_raw(
1285            decode_raw_price_i64(msg.cont_book_clr_price),
1286            price_precision,
1287        ),
1288        Price::from_raw(
1289            decode_raw_price_i64(msg.auct_interest_clr_price),
1290            price_precision,
1291        ),
1292        Quantity::new(f64::from(msg.paired_qty), 0),
1293        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1294        parse_order_side(msg.side),
1295        msg.significant_imbalance as c_char,
1296        msg.hd.ts_event.into(),
1297        ts_event,
1298        ts_init,
1299    ))
1300}
1301
1302/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1303///
1304/// # Errors
1305///
1306/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1307/// `msg.update_action` is not a valid enum variant.
1308pub fn decode_statistics_msg(
1309    msg: &dbn::StatMsg,
1310    instrument_id: InstrumentId,
1311    price_precision: u8,
1312    ts_init: Option<UnixNanos>,
1313) -> anyhow::Result<DatabentoStatistics> {
1314    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1315        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1316    let update_action =
1317        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1318            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1319        })?;
1320    let ts_event = msg.ts_recv.into();
1321    let ts_init = ts_init.unwrap_or(ts_event);
1322
1323    Ok(DatabentoStatistics::new(
1324        instrument_id,
1325        stat_type,
1326        update_action,
1327        decode_optional_price(msg.price, price_precision),
1328        decode_optional_quantity(msg.quantity),
1329        msg.channel_id,
1330        msg.stat_flags,
1331        msg.sequence,
1332        msg.ts_ref.into(),
1333        msg.ts_in_delta,
1334        msg.hd.ts_event.into(),
1335        ts_event,
1336        ts_init,
1337    ))
1338}
1339
1340////////////////////////////////////////////////////////////////////////////////
1341// Tests
1342////////////////////////////////////////////////////////////////////////////////
1343#[cfg(test)]
1344mod tests {
1345    use std::path::{Path, PathBuf};
1346
1347    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1348    use fallible_streaming_iterator::FallibleStreamingIterator;
1349    use nautilus_model::instruments::Instrument;
1350    use rstest::*;
1351
1352    use super::*;
1353
1354    fn test_data_path() -> PathBuf {
1355        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1356    }
1357
1358    #[rstest]
1359    #[case('Y' as c_char, Some(true))]
1360    #[case('N' as c_char, Some(false))]
1361    #[case('X' as c_char, None)]
1362    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1363        assert_eq!(parse_optional_bool(input), expected);
1364    }
1365
1366    #[rstest]
1367    #[case('A' as c_char, OrderSide::Sell)]
1368    #[case('B' as c_char, OrderSide::Buy)]
1369    #[case('X' as c_char, OrderSide::NoOrderSide)]
1370    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1371        assert_eq!(parse_order_side(input), expected);
1372    }
1373
1374    #[rstest]
1375    #[case('A' as c_char, AggressorSide::Seller)]
1376    #[case('B' as c_char, AggressorSide::Buyer)]
1377    #[case('X' as c_char, AggressorSide::NoAggressor)]
1378    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1379        assert_eq!(parse_aggressor_side(input), expected);
1380    }
1381
1382    #[rstest]
1383    #[case('A' as c_char, Ok(BookAction::Add))]
1384    #[case('C' as c_char, Ok(BookAction::Delete))]
1385    #[case('F' as c_char, Ok(BookAction::Update))]
1386    #[case('M' as c_char, Ok(BookAction::Update))]
1387    #[case('R' as c_char, Ok(BookAction::Clear))]
1388    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1389    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1390        match parse_book_action(input) {
1391            Ok(action) => assert_eq!(Ok(action), expected),
1392            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1393        }
1394    }
1395
1396    #[rstest]
1397    #[case('C' as c_char, Ok(OptionKind::Call))]
1398    #[case('P' as c_char, Ok(OptionKind::Put))]
1399    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1400    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1401        match parse_option_kind(input) {
1402            Ok(kind) => assert_eq!(Ok(kind), expected),
1403            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1404        }
1405    }
1406
1407    #[rstest]
1408    #[case(Ok("USD"), Currency::USD())]
1409    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1410    #[case(Ok(""), Currency::USD())]
1411    #[case(Err("Error"), Currency::USD())]
1412    fn test_parse_currency_or_usd_default(
1413        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1414        #[case] expected: Currency,
1415    ) {
1416        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1417        assert_eq!(actual, expected);
1418    }
1419
1420    #[rstest]
1421    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1422    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1423    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1424    #[case("XXX", Ok((None, None)))]
1425    #[case("D", Err("Value string is too short"))]
1426    fn test_parse_cfi_iso10926(
1427        #[case] input: &str,
1428        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1429    ) {
1430        match parse_cfi_iso10926(input) {
1431            Ok(result) => assert_eq!(Ok(result), expected),
1432            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1433        }
1434    }
1435
1436    #[rstest]
1437    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1438    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1439    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1440    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1441        let actual = decode_price_increment(value, precision);
1442        assert_eq!(actual, expected);
1443    }
1444
1445    #[rstest]
1446    #[case(i64::MAX, 2, None)] // None for i64::MAX
1447    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1448    #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] // Arbitrary valid price
1449    fn test_decode_optional_price(
1450        #[case] value: i64,
1451        #[case] precision: u8,
1452        #[case] expected: Option<Price>,
1453    ) {
1454        let actual = decode_optional_price(value, precision);
1455        assert_eq!(actual, expected);
1456    }
1457
1458    #[rstest]
1459    #[case(i64::MAX, None)] // None for i32::MAX
1460    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1461    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1462    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1463        let actual = decode_optional_quantity(value);
1464        assert_eq!(actual, expected);
1465    }
1466
1467    #[rstest]
1468    #[case(0, Quantity::from(1))] // Default fallback for 0
1469    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1470    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1471    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1472    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1473    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1474    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1475    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1476    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1477    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1478        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1479    }
1480
1481    #[rstest]
1482    #[case(-1_500_000_000)] // Large negative value
1483    #[case(-1)] // Small negative value
1484    #[case(-999_999_999)] // Another negative value
1485    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1486        let result = decode_multiplier(raw);
1487        assert!(result.is_err());
1488        assert!(
1489            result
1490                .unwrap_err()
1491                .to_string()
1492                .contains("Invalid negative multiplier")
1493        );
1494    }
1495
1496    #[rstest]
1497    #[case(100, Quantity::from(100))]
1498    #[case(1000, Quantity::from(1000))]
1499    #[case(5, Quantity::from(5))]
1500    fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1501        assert_eq!(decode_quantity(value), expected);
1502    }
1503
1504    #[rstest]
1505    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1506    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1507    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1508    fn test_decode_price_increment(
1509        #[case] value: i64,
1510        #[case] precision: u8,
1511        #[case] expected: Price,
1512    ) {
1513        assert_eq!(decode_price_increment(value, precision), expected);
1514    }
1515
1516    #[rstest]
1517    #[case(0, Quantity::from(1))] // Default for 0
1518    #[case(i32::MAX, Quantity::from(1))] // Default for MAX
1519    #[case(100, Quantity::from(100))]
1520    #[case(1, Quantity::from(1))]
1521    #[case(1000, Quantity::from(1000))]
1522    fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1523        assert_eq!(decode_lot_size(value), expected);
1524    }
1525
1526    #[rstest]
1527    #[case(0, None)] // None for 0
1528    #[case(1, Some(Ustr::from("Scheduled")))]
1529    #[case(2, Some(Ustr::from("Surveillance intervention")))]
1530    #[case(3, Some(Ustr::from("Market event")))]
1531    #[case(10, Some(Ustr::from("Regulatory")))]
1532    #[case(30, Some(Ustr::from("News pending")))]
1533    #[case(40, Some(Ustr::from("Order imbalance")))]
1534    #[case(50, Some(Ustr::from("LULD pause")))]
1535    #[case(60, Some(Ustr::from("Operational")))]
1536    #[case(100, Some(Ustr::from("Corporate action")))]
1537    #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1538    fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1539        assert_eq!(parse_status_reason(value).unwrap(), expected);
1540    }
1541
1542    #[rstest]
1543    #[case(999)] // Invalid code
1544    fn test_parse_status_reason_invalid(#[case] value: u16) {
1545        assert!(parse_status_reason(value).is_err());
1546    }
1547
1548    #[rstest]
1549    #[case(0, None)] // None for 0
1550    #[case(1, Some(Ustr::from("No cancel")))]
1551    #[case(2, Some(Ustr::from("Change trading session")))]
1552    #[case(3, Some(Ustr::from("Implied matching on")))]
1553    #[case(4, Some(Ustr::from("Implied matching off")))]
1554    fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1555        assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1556    }
1557
1558    #[rstest]
1559    #[case(5)] // Invalid code
1560    #[case(100)] // Invalid code
1561    fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1562        assert!(parse_status_trading_event(value).is_err());
1563    }
1564
1565    #[rstest]
1566    fn test_decode_mbo_msg() {
1567        let path = test_data_path().join("test_data.mbo.dbn.zst");
1568        let mut dbn_stream = Decoder::from_zstd_file(path)
1569            .unwrap()
1570            .decode_stream::<dbn::MboMsg>();
1571        let msg = dbn_stream.next().unwrap().unwrap();
1572
1573        let instrument_id = InstrumentId::from("ESM4.GLBX");
1574        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1575        let delta = delta.unwrap();
1576
1577        assert_eq!(delta.instrument_id, instrument_id);
1578        assert_eq!(delta.action, BookAction::Delete);
1579        assert_eq!(delta.order.side, OrderSide::Sell);
1580        assert_eq!(delta.order.price, Price::from("3722.75"));
1581        assert_eq!(delta.order.size, Quantity::from("1"));
1582        assert_eq!(delta.order.order_id, 647_784_973_705);
1583        assert_eq!(delta.flags, 128);
1584        assert_eq!(delta.sequence, 1_170_352);
1585        assert_eq!(delta.ts_event, msg.ts_recv);
1586        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1587        assert_eq!(delta.ts_init, 0);
1588    }
1589
1590    #[rstest]
1591    fn test_decode_mbp1_msg() {
1592        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1593        let mut dbn_stream = Decoder::from_zstd_file(path)
1594            .unwrap()
1595            .decode_stream::<dbn::Mbp1Msg>();
1596        let msg = dbn_stream.next().unwrap().unwrap();
1597
1598        let instrument_id = InstrumentId::from("ESM4.GLBX");
1599        let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1600
1601        assert_eq!(quote.instrument_id, instrument_id);
1602        assert_eq!(quote.bid_price, Price::from("3720.25"));
1603        assert_eq!(quote.ask_price, Price::from("3720.50"));
1604        assert_eq!(quote.bid_size, Quantity::from("24"));
1605        assert_eq!(quote.ask_size, Quantity::from("11"));
1606        assert_eq!(quote.ts_event, msg.ts_recv);
1607        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1608        assert_eq!(quote.ts_init, 0);
1609    }
1610
1611    #[rstest]
1612    fn test_decode_bbo_1s_msg() {
1613        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1614        let mut dbn_stream = Decoder::from_zstd_file(path)
1615            .unwrap()
1616            .decode_stream::<dbn::BboMsg>();
1617        let msg = dbn_stream.next().unwrap().unwrap();
1618
1619        let instrument_id = InstrumentId::from("ESM4.GLBX");
1620        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1621
1622        assert_eq!(quote.instrument_id, instrument_id);
1623        assert_eq!(quote.bid_price, Price::from("3702.25"));
1624        assert_eq!(quote.ask_price, Price::from("3702.75"));
1625        assert_eq!(quote.bid_size, Quantity::from("18"));
1626        assert_eq!(quote.ask_size, Quantity::from("13"));
1627        assert_eq!(quote.ts_event, msg.ts_recv);
1628        assert_eq!(quote.ts_event, 1609113600000000000);
1629        assert_eq!(quote.ts_init, 0);
1630    }
1631
1632    #[rstest]
1633    fn test_decode_bbo_1m_msg() {
1634        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1635        let mut dbn_stream = Decoder::from_zstd_file(path)
1636            .unwrap()
1637            .decode_stream::<dbn::BboMsg>();
1638        let msg = dbn_stream.next().unwrap().unwrap();
1639
1640        let instrument_id = InstrumentId::from("ESM4.GLBX");
1641        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1642
1643        assert_eq!(quote.instrument_id, instrument_id);
1644        assert_eq!(quote.bid_price, Price::from("3702.25"));
1645        assert_eq!(quote.ask_price, Price::from("3702.75"));
1646        assert_eq!(quote.bid_size, Quantity::from("18"));
1647        assert_eq!(quote.ask_size, Quantity::from("13"));
1648        assert_eq!(quote.ts_event, msg.ts_recv);
1649        assert_eq!(quote.ts_event, 1609113600000000000);
1650        assert_eq!(quote.ts_init, 0);
1651    }
1652
1653    #[rstest]
1654    fn test_decode_mbp10_msg() {
1655        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1656        let mut dbn_stream = Decoder::from_zstd_file(path)
1657            .unwrap()
1658            .decode_stream::<dbn::Mbp10Msg>();
1659        let msg = dbn_stream.next().unwrap().unwrap();
1660
1661        let instrument_id = InstrumentId::from("ESM4.GLBX");
1662        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1663
1664        assert_eq!(depth10.instrument_id, instrument_id);
1665        assert_eq!(depth10.bids.len(), 10);
1666        assert_eq!(depth10.asks.len(), 10);
1667        assert_eq!(depth10.bid_counts.len(), 10);
1668        assert_eq!(depth10.ask_counts.len(), 10);
1669        assert_eq!(depth10.flags, 128);
1670        assert_eq!(depth10.sequence, 1_170_352);
1671        assert_eq!(depth10.ts_event, msg.ts_recv);
1672        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1673        assert_eq!(depth10.ts_init, 0);
1674    }
1675
1676    #[rstest]
1677    fn test_decode_trade_msg() {
1678        let path = test_data_path().join("test_data.trades.dbn.zst");
1679        let mut dbn_stream = Decoder::from_zstd_file(path)
1680            .unwrap()
1681            .decode_stream::<dbn::TradeMsg>();
1682        let msg = dbn_stream.next().unwrap().unwrap();
1683
1684        let instrument_id = InstrumentId::from("ESM4.GLBX");
1685        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1686
1687        assert_eq!(trade.instrument_id, instrument_id);
1688        assert_eq!(trade.price, Price::from("3720.25"));
1689        assert_eq!(trade.size, Quantity::from("5"));
1690        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1691        assert_eq!(trade.trade_id.to_string(), "1170380");
1692        assert_eq!(trade.ts_event, msg.ts_recv);
1693        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1694        assert_eq!(trade.ts_init, 0);
1695    }
1696
1697    #[rstest]
1698    fn test_decode_tbbo_msg() {
1699        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1700        let mut dbn_stream = Decoder::from_zstd_file(path)
1701            .unwrap()
1702            .decode_stream::<dbn::Mbp1Msg>();
1703        let msg = dbn_stream.next().unwrap().unwrap();
1704
1705        let instrument_id = InstrumentId::from("ESM4.GLBX");
1706        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1707
1708        assert_eq!(quote.instrument_id, instrument_id);
1709        assert_eq!(quote.bid_price, Price::from("3720.25"));
1710        assert_eq!(quote.ask_price, Price::from("3720.50"));
1711        assert_eq!(quote.bid_size, Quantity::from("26"));
1712        assert_eq!(quote.ask_size, Quantity::from("7"));
1713        assert_eq!(quote.ts_event, msg.ts_recv);
1714        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1715        assert_eq!(quote.ts_init, 0);
1716
1717        assert_eq!(trade.instrument_id, instrument_id);
1718        assert_eq!(trade.price, Price::from("3720.25"));
1719        assert_eq!(trade.size, Quantity::from("5"));
1720        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1721        assert_eq!(trade.trade_id.to_string(), "1170380");
1722        assert_eq!(trade.ts_event, msg.ts_recv);
1723        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1724        assert_eq!(trade.ts_init, 0);
1725    }
1726
1727    #[rstest]
1728    fn test_decode_ohlcv_msg() {
1729        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1730        let mut dbn_stream = Decoder::from_zstd_file(path)
1731            .unwrap()
1732            .decode_stream::<dbn::OhlcvMsg>();
1733        let msg = dbn_stream.next().unwrap().unwrap();
1734
1735        let instrument_id = InstrumentId::from("ESM4.GLBX");
1736        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1737
1738        assert_eq!(
1739            bar.bar_type,
1740            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1741        );
1742        assert_eq!(bar.open, Price::from("372025.00"));
1743        assert_eq!(bar.high, Price::from("372050.00"));
1744        assert_eq!(bar.low, Price::from("372025.00"));
1745        assert_eq!(bar.close, Price::from("372050.00"));
1746        assert_eq!(bar.volume, Quantity::from("57"));
1747        assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); // timestamp_on_close=true
1748        assert_eq!(bar.ts_init, 0); // ts_init was Some(0)
1749    }
1750
1751    #[rstest]
1752    fn test_decode_definition_msg() {
1753        let path = test_data_path().join("test_data.definition.dbn.zst");
1754        let mut dbn_stream = Decoder::from_zstd_file(path)
1755            .unwrap()
1756            .decode_stream::<dbn::InstrumentDefMsg>();
1757        let msg = dbn_stream.next().unwrap().unwrap();
1758
1759        let instrument_id = InstrumentId::from("ESM4.GLBX");
1760        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1761
1762        assert!(result.is_ok());
1763        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1764    }
1765
1766    #[rstest]
1767    fn test_decode_status_msg() {
1768        let path = test_data_path().join("test_data.status.dbn.zst");
1769        let mut dbn_stream = Decoder::from_zstd_file(path)
1770            .unwrap()
1771            .decode_stream::<dbn::StatusMsg>();
1772        let msg = dbn_stream.next().unwrap().unwrap();
1773
1774        let instrument_id = InstrumentId::from("ESM4.GLBX");
1775        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1776
1777        assert_eq!(status.instrument_id, instrument_id);
1778        assert_eq!(status.action, MarketStatusAction::Trading);
1779        assert_eq!(status.ts_event, msg.hd.ts_event);
1780        assert_eq!(status.ts_init, 0);
1781        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1782        assert_eq!(status.trading_event, None);
1783        assert_eq!(status.is_trading, Some(true));
1784        assert_eq!(status.is_quoting, Some(true));
1785        assert_eq!(status.is_short_sell_restricted, None);
1786    }
1787
1788    #[rstest]
1789    fn test_decode_imbalance_msg() {
1790        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1791        let mut dbn_stream = Decoder::from_zstd_file(path)
1792            .unwrap()
1793            .decode_stream::<dbn::ImbalanceMsg>();
1794        let msg = dbn_stream.next().unwrap().unwrap();
1795
1796        let instrument_id = InstrumentId::from("ESM4.GLBX");
1797        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1798
1799        assert_eq!(imbalance.instrument_id, instrument_id);
1800        assert_eq!(imbalance.ref_price, Price::from("229.43"));
1801        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1802        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1803        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1804        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1805        assert_eq!(imbalance.side, OrderSide::Buy);
1806        assert_eq!(imbalance.significant_imbalance, 126);
1807        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1808        assert_eq!(imbalance.ts_recv, msg.ts_recv);
1809        assert_eq!(imbalance.ts_init, 0);
1810    }
1811
1812    #[rstest]
1813    fn test_decode_statistics_msg() {
1814        let path = test_data_path().join("test_data.statistics.dbn.zst");
1815        let mut dbn_stream = Decoder::from_zstd_file(path)
1816            .unwrap()
1817            .decode_stream::<dbn::StatMsg>();
1818        let msg = dbn_stream.next().unwrap().unwrap();
1819
1820        let instrument_id = InstrumentId::from("ESM4.GLBX");
1821        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1822
1823        assert_eq!(statistics.instrument_id, instrument_id);
1824        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1825        assert_eq!(
1826            statistics.update_action,
1827            DatabentoStatisticUpdateAction::Added
1828        );
1829        assert_eq!(statistics.price, Some(Price::from("100.00")));
1830        assert_eq!(statistics.quantity, None);
1831        assert_eq!(statistics.channel_id, 13);
1832        assert_eq!(statistics.stat_flags, 255);
1833        assert_eq!(statistics.sequence, 2);
1834        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1835        assert_eq!(statistics.ts_in_delta, 26961);
1836        assert_eq!(statistics.ts_event, msg.hd.ts_event);
1837        assert_eq!(statistics.ts_recv, msg.ts_recv);
1838        assert_eq!(statistics.ts_init, 0);
1839    }
1840
1841    #[rstest]
1842    fn test_decode_cmbp1_msg() {
1843        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1844        let mut dbn_stream = Decoder::from_zstd_file(path)
1845            .unwrap()
1846            .decode_stream::<dbn::Cmbp1Msg>();
1847        let msg = dbn_stream.next().unwrap().unwrap();
1848
1849        let instrument_id = InstrumentId::from("ESM4.GLBX");
1850        let (quote, trade) = decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1851
1852        assert_eq!(quote.instrument_id, instrument_id);
1853        assert!(quote.bid_price.raw > 0);
1854        assert!(quote.ask_price.raw > 0);
1855        assert!(quote.bid_size.raw > 0);
1856        assert!(quote.ask_size.raw > 0);
1857        assert_eq!(quote.ts_event, msg.ts_recv);
1858        assert_eq!(quote.ts_init, 0);
1859
1860        // Check if trade is present based on action
1861        if msg.action as u8 as char == 'T' {
1862            assert!(trade.is_some());
1863            let trade = trade.unwrap();
1864            assert_eq!(trade.instrument_id, instrument_id);
1865        } else {
1866            assert!(trade.is_none());
1867        }
1868    }
1869
1870    #[rstest]
1871    fn test_decode_cbbo_1s_msg() {
1872        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1873        let mut dbn_stream = Decoder::from_zstd_file(path)
1874            .unwrap()
1875            .decode_stream::<dbn::CbboMsg>();
1876        let msg = dbn_stream.next().unwrap().unwrap();
1877
1878        let instrument_id = InstrumentId::from("ESM4.GLBX");
1879        let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1880
1881        assert_eq!(quote.instrument_id, instrument_id);
1882        assert!(quote.bid_price.raw > 0);
1883        assert!(quote.ask_price.raw > 0);
1884        assert!(quote.bid_size.raw > 0);
1885        assert!(quote.ask_size.raw > 0);
1886        assert_eq!(quote.ts_event, msg.ts_recv);
1887        assert_eq!(quote.ts_init, 0);
1888    }
1889
1890    #[rstest]
1891    fn test_decode_mbp10_msg_with_all_levels() {
1892        let mut msg = dbn::Mbp10Msg::default();
1893        for i in 0..10 {
1894            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
1895            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
1896            msg.levels[i].bid_sz = 10 + i as u32;
1897            msg.levels[i].ask_sz = 10 + i as u32;
1898            msg.levels[i].bid_ct = 1 + i as u32;
1899            msg.levels[i].ask_ct = 1 + i as u32;
1900        }
1901        msg.ts_recv = 1_609_160_400_000_704_060;
1902
1903        let instrument_id = InstrumentId::from("TEST.VENUE");
1904        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
1905
1906        assert!(result.is_ok());
1907        let depth = result.unwrap();
1908        assert_eq!(depth.bids.len(), 10);
1909        assert_eq!(depth.asks.len(), 10);
1910        assert_eq!(depth.bid_counts.len(), 10);
1911        assert_eq!(depth.ask_counts.len(), 10);
1912    }
1913
1914    #[rstest]
1915    fn test_array_conversion_error_handling() {
1916        use nautilus_model::{data::BookOrder, enums::OrderSide};
1917
1918        let mut bids = Vec::new();
1919        let mut asks = Vec::new();
1920
1921        // Intentionally create fewer than DEPTH10_LEN elements
1922        for i in 0..5 {
1923            bids.push(BookOrder::new(
1924                OrderSide::Buy,
1925                Price::from(format!("{}.00", 100 - i)),
1926                Quantity::from(10),
1927                i as u64,
1928            ));
1929            asks.push(BookOrder::new(
1930                OrderSide::Sell,
1931                Price::from(format!("{}.00", 101 + i)),
1932                Quantity::from(10),
1933                i as u64,
1934            ));
1935        }
1936
1937        let result: Result<[BookOrder; DEPTH10_LEN], _> =
1938            bids.try_into().map_err(|v: Vec<BookOrder>| {
1939                anyhow::anyhow!(
1940                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
1941                    v.len()
1942                )
1943            });
1944        assert!(result.is_err());
1945        assert!(
1946            result
1947                .unwrap_err()
1948                .to_string()
1949                .contains("Expected exactly 10 bid levels, received 5")
1950        );
1951    }
1952
1953    #[rstest]
1954    fn test_decode_tcbbo_msg() {
1955        // Use cbbo-1s as base since cbbo.dbn.zst was invalid
1956        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1957        let mut dbn_stream = Decoder::from_zstd_file(path)
1958            .unwrap()
1959            .decode_stream::<dbn::CbboMsg>();
1960        let msg = dbn_stream.next().unwrap().unwrap();
1961
1962        // Simulate TCBBO by adding trade data
1963        let mut tcbbo_msg = msg.clone();
1964        tcbbo_msg.price = 3702500000000;
1965        tcbbo_msg.size = 10;
1966
1967        let instrument_id = InstrumentId::from("ESM4.GLBX");
1968        let (quote, trade) =
1969            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
1970
1971        assert_eq!(quote.instrument_id, instrument_id);
1972        assert!(quote.bid_price.raw > 0);
1973        assert!(quote.ask_price.raw > 0);
1974        assert!(quote.bid_size.raw > 0);
1975        assert!(quote.ask_size.raw > 0);
1976        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
1977        assert_eq!(quote.ts_init, 0);
1978
1979        assert_eq!(trade.instrument_id, instrument_id);
1980        assert_eq!(trade.price, Price::from("3702.50"));
1981        assert_eq!(trade.size, Quantity::from(10));
1982        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
1983        assert_eq!(trade.ts_init, 0);
1984    }
1985
1986    #[rstest]
1987    fn test_decode_bar_type() {
1988        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
1989        let instrument_id = InstrumentId::from("ESM4.GLBX");
1990
1991        // Test 1-second bar
1992        msg.hd.rtype = 32;
1993        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
1994        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
1995
1996        // Test 1-minute bar
1997        msg.hd.rtype = 33;
1998        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
1999        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2000
2001        // Test 1-hour bar
2002        msg.hd.rtype = 34;
2003        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2004        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2005
2006        // Test 1-day bar
2007        msg.hd.rtype = 35;
2008        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2009        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2010
2011        // Test unsupported rtype
2012        msg.hd.rtype = 99;
2013        let result = decode_bar_type(&msg, instrument_id);
2014        assert!(result.is_err());
2015    }
2016
2017    #[rstest]
2018    fn test_decode_ts_event_adjustment() {
2019        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2020
2021        // Test 1-second bar adjustment
2022        msg.hd.rtype = 32;
2023        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2024        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2025
2026        // Test 1-minute bar adjustment
2027        msg.hd.rtype = 33;
2028        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2029        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2030
2031        // Test 1-hour bar adjustment
2032        msg.hd.rtype = 34;
2033        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2034        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2035
2036        // Test 1-day bar adjustment
2037        msg.hd.rtype = 35;
2038        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2039        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2040
2041        // Test eod bar adjustment (same as 1d)
2042        msg.hd.rtype = 36;
2043        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2044        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2045
2046        // Test unsupported rtype
2047        msg.hd.rtype = 99;
2048        let result = decode_ts_event_adjustment(&msg);
2049        assert!(result.is_err());
2050    }
2051
2052    #[rstest]
2053    fn test_decode_record() {
2054        // Test with MBO message
2055        let path = test_data_path().join("test_data.mbo.dbn.zst");
2056        let decoder = Decoder::from_zstd_file(path).unwrap();
2057        let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2058        let msg = dbn_stream.next().unwrap().unwrap();
2059
2060        let record_ref = dbn::RecordRef::from(msg);
2061        let instrument_id = InstrumentId::from("ESM4.GLBX");
2062
2063        let (data1, data2) =
2064            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2065
2066        assert!(data1.is_some());
2067        assert!(data2.is_none());
2068
2069        // Test with Trade message
2070        let path = test_data_path().join("test_data.trades.dbn.zst");
2071        let decoder = Decoder::from_zstd_file(path).unwrap();
2072        let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2073        let msg = dbn_stream.next().unwrap().unwrap();
2074
2075        let record_ref = dbn::RecordRef::from(msg);
2076
2077        let (data1, data2) =
2078            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2079
2080        assert!(data1.is_some());
2081        assert!(data2.is_none());
2082        assert!(matches!(data1.unwrap(), Data::Trade(_)));
2083    }
2084}