nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Databento message decoding functions.
17//!
18//! # Sentinel Values
19//!
20//! Databento uses sentinel values to represent undefined/null fields:
21//!
22//! | Sentinel          | Value      | Usage                       |
23//! |-------------------|------------|-----------------------------|
24//! | `UNDEF_PRICE`     | `i64::MAX` | Undefined price fields.     |
25//! | `UNDEF_TIMESTAMP` | `u64::MAX` | Undefined timestamp fields. |
26//!
27//! # Fields Potentially Undefined
28//!
29//! According to Databento documentation, the following fields can contain sentinel values:
30//!
31//! | Message Type       | Field                          | Handling                           |
32//! |--------------------|--------------------------------|------------------------------------|
33//! | `MboMsg`           | `price`                        | Passed through as `PRICE_UNDEF`.   |
34//! | `TradeMsg`         | `price`                        | Passed through as `PRICE_UNDEF`.   |
35//! | `OhlcvMsg`         | `open`, `high`, `low`, `close` | Passed through as `PRICE_UNDEF`.   |
36//! | `Mbp1Msg`          | `bid_px`, `ask_px`             | Quote skipped if either undefined. |
37//! | `InstrumentDefMsg` | `activation`                   | Defaults to 0 (epoch).             |
38//! | `InstrumentDefMsg` | `expiration`                   | Returns error if undefined.        |
39//! | `InstrumentDefMsg` | `strike_price`                 | Returns error if undefined.        |
40//!
41//! # References
42//!
43//! - [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)
44//! - [`UNDEF_TIMESTAMP`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_TIMESTAMP.html)
45//! - [Databento DBN Schema](https://databento.com/docs/schemas)
46
47use std::{ffi::c_char, num::NonZeroUsize};
48
49use databento::dbn::{self};
50use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
51use nautilus_model::{
52    data::{
53        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
54        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
55    },
56    enums::{
57        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
58        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
59    },
60    identifiers::{InstrumentId, TradeId},
61    instruments::{
62        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
63    },
64    types::{
65        Currency, Price, Quantity,
66        price::{PRICE_UNDEF, decode_raw_price_i64},
67    },
68};
69use ustr::Ustr;
70
71use super::{
72    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
73    types::{DatabentoImbalance, DatabentoStatistics},
74};
75
76// SAFETY: Known valid value
77const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
78
79const BAR_SPEC_1S: BarSpecification = BarSpecification {
80    step: STEP_ONE,
81    aggregation: BarAggregation::Second,
82    price_type: PriceType::Last,
83};
84const BAR_SPEC_1M: BarSpecification = BarSpecification {
85    step: STEP_ONE,
86    aggregation: BarAggregation::Minute,
87    price_type: PriceType::Last,
88};
89const BAR_SPEC_1H: BarSpecification = BarSpecification {
90    step: STEP_ONE,
91    aggregation: BarAggregation::Hour,
92    price_type: PriceType::Last,
93};
94const BAR_SPEC_1D: BarSpecification = BarSpecification {
95    step: STEP_ONE,
96    aggregation: BarAggregation::Day,
97    price_type: PriceType::Last,
98};
99
100const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
101const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
102const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
103const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
104
105#[must_use]
106pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
107    match c as u8 as char {
108        'Y' => Some(true),
109        'N' => Some(false),
110        _ => None,
111    }
112}
113
114#[must_use]
115pub const fn parse_order_side(c: c_char) -> OrderSide {
116    match c as u8 as char {
117        'A' => OrderSide::Sell,
118        'B' => OrderSide::Buy,
119        _ => OrderSide::NoOrderSide,
120    }
121}
122
123#[must_use]
124pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
125    match c as u8 as char {
126        'A' => AggressorSide::Seller,
127        'B' => AggressorSide::Buyer,
128        _ => AggressorSide::NoAggressor,
129    }
130}
131
132/// Parses a Databento book action character into a `BookAction` enum.
133///
134/// # Errors
135///
136/// Returns an error if `c` is not a valid `BookAction` character.
137pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
138    match c as u8 as char {
139        'A' => Ok(BookAction::Add),
140        'C' => Ok(BookAction::Delete),
141        'F' => Ok(BookAction::Update),
142        'M' => Ok(BookAction::Update),
143        'R' => Ok(BookAction::Clear),
144        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
145    }
146}
147
148/// Parses a Databento option kind character into an `OptionKind` enum.
149///
150/// # Errors
151///
152/// Returns an error if `c` is not a valid `OptionKind` character.
153pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
154    match c as u8 as char {
155        'C' => Ok(OptionKind::Call),
156        'P' => Ok(OptionKind::Put),
157        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
158    }
159}
160
161fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
162    match value {
163        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
164            log::warn!("Unknown currency code '{value}', defaulting to USD");
165            Currency::USD()
166        }),
167        Ok(_) => Currency::USD(),
168        Err(e) => {
169            log::error!("Error parsing currency: {e}");
170            Currency::USD()
171        }
172    }
173}
174
175/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
176///
177/// # Errors
178///
179/// Returns an error if `value` has fewer than 3 characters.
180pub fn parse_cfi_iso10926(
181    value: &str,
182) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
183    let chars: Vec<char> = value.chars().collect();
184    if chars.len() < 3 {
185        anyhow::bail!("Value string is too short");
186    }
187
188    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
189    let cfi_category = chars[0];
190    let cfi_group = chars[1];
191    let cfi_attribute1 = chars[2];
192    // let cfi_attribute2 = value[3];
193    // let cfi_attribute3 = value[4];
194    // let cfi_attribute4 = value[5];
195
196    let mut asset_class = match cfi_category {
197        'D' => Some(AssetClass::Debt),
198        'E' => Some(AssetClass::Equity),
199        'S' => None,
200        _ => None,
201    };
202
203    let instrument_class = match cfi_group {
204        'I' => Some(InstrumentClass::Future),
205        _ => None,
206    };
207
208    if cfi_attribute1 == 'I' {
209        asset_class = Some(AssetClass::Index);
210    }
211
212    Ok((asset_class, instrument_class))
213}
214
215/// Parses a Databento status reason code into a human-readable string.
216///
217/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
218///
219/// # Errors
220///
221/// Returns an error if `value` is an invalid status reason code.
222pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
223    let value_str = match value {
224        0 => return Ok(None),
225        1 => "Scheduled",
226        2 => "Surveillance intervention",
227        3 => "Market event",
228        4 => "Instrument activation",
229        5 => "Instrument expiration",
230        6 => "Recovery in process",
231        10 => "Regulatory",
232        11 => "Administrative",
233        12 => "Non-compliance",
234        13 => "Filings not current",
235        14 => "SEC trading suspension",
236        15 => "New issue",
237        16 => "Issue available",
238        17 => "Issues reviewed",
239        18 => "Filing requirements satisfied",
240        30 => "News pending",
241        31 => "News released",
242        32 => "News and resumption times",
243        33 => "News not forthcoming",
244        40 => "Order imbalance",
245        50 => "LULD pause",
246        60 => "Operational",
247        70 => "Additional information requested",
248        80 => "Merger effective",
249        90 => "ETF",
250        100 => "Corporate action",
251        110 => "New Security offering",
252        120 => "Market wide halt level 1",
253        121 => "Market wide halt level 2",
254        122 => "Market wide halt level 3",
255        123 => "Market wide halt carryover",
256        124 => "Market wide halt resumption",
257        130 => "Quotation not available",
258        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
259    };
260
261    Ok(Some(Ustr::from(value_str)))
262}
263
264/// Parses a Databento status trading event code into a human-readable string.
265///
266/// # Errors
267///
268/// Returns an error if `value` is an invalid status trading event code.
269pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
270    let value_str = match value {
271        0 => return Ok(None),
272        1 => "No cancel",
273        2 => "Change trading session",
274        3 => "Implied matching on",
275        4 => "Implied matching off",
276        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
277    };
278
279    Ok(Some(Ustr::from(value_str)))
280}
281
282/// Decodes a price, returning an error if undefined.
283///
284/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
285/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
286///
287/// # Errors
288///
289/// Returns an error if `value` is `i64::MAX` (undefined).
290#[inline(always)]
291pub fn decode_price(value: i64, precision: u8, field_name: &str) -> anyhow::Result<Price> {
292    if value == i64::MAX {
293        anyhow::bail!("Missing required price for `{field_name}`")
294    } else {
295        Ok(Price::from_raw(decode_raw_price_i64(value), precision))
296    }
297}
298
299/// Decodes a price from the given optional value, expressed in units of 1e-9.
300///
301/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
302/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
303#[inline(always)]
304#[must_use]
305pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
306    if value == i64::MAX {
307        None
308    } else {
309        Some(Price::from_raw(decode_raw_price_i64(value), precision))
310    }
311}
312
313/// Decodes a price, returning `PRICE_UNDEF` if the value is undefined.
314///
315/// This is used for market data where undefined prices should pass through
316/// as `PRICE_UNDEF` rather than causing an error.
317#[inline(always)]
318#[must_use]
319pub fn decode_price_or_undef(value: i64, precision: u8) -> Price {
320    if value == i64::MAX {
321        Price::from_raw(PRICE_UNDEF, 0)
322    } else {
323        Price::from_raw(decode_raw_price_i64(value), precision)
324    }
325}
326
327/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
328#[inline(always)]
329#[must_use]
330pub fn decode_price_increment(value: i64, precision: u8) -> Price {
331    match value {
332        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
333        _ => Price::from_raw(decode_raw_price_i64(value), precision),
334    }
335}
336
337/// Decodes a quantity from the given value, expressed in standard whole-number units.
338#[inline(always)]
339#[must_use]
340pub fn decode_quantity(value: u64) -> Quantity {
341    Quantity::from(value)
342}
343
344/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
345#[inline(always)]
346#[must_use]
347pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
348    match value {
349        i64::MAX => None,
350        _ => Some(Quantity::from(value)),
351    }
352}
353
354/// Decodes a timestamp, returning an error if undefined.
355///
356/// Databento uses `u64::MAX` as `UNDEF_TIMESTAMP` sentinel for null timestamps.
357///
358/// # Errors
359///
360/// Returns an error if `value` is `u64::MAX` (undefined).
361#[inline(always)]
362pub fn decode_timestamp(value: u64, field_name: &str) -> anyhow::Result<UnixNanos> {
363    if value == dbn::UNDEF_TIMESTAMP {
364        anyhow::bail!("Missing required timestamp for `{field_name}`")
365    } else {
366        Ok(UnixNanos::from(value))
367    }
368}
369
370/// Decodes a timestamp from the given optional value.
371///
372/// Databento uses `u64::MAX` as `UNDEF_TIMESTAMP` sentinel for null timestamps.
373#[inline(always)]
374#[must_use]
375pub fn decode_optional_timestamp(value: u64) -> Option<UnixNanos> {
376    if value == dbn::UNDEF_TIMESTAMP {
377        None
378    } else {
379        Some(UnixNanos::from(value))
380    }
381}
382
383/// Decodes a multiplier from the given value, expressed in units of 1e-9.
384/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
385///
386/// # Errors
387///
388/// Returns an error if value is negative (invalid multiplier).
389pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
390    const SCALE: u128 = 1_000_000_000;
391
392    match value {
393        0 | i64::MAX => Ok(Quantity::from(1)),
394        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
395        v => {
396            // Work in integers: v is fixed-point with 9 fractional digits.
397            // Build a canonical decimal string without floating-point.
398            let abs = v as u128;
399            let int_part = abs / SCALE;
400            let frac_part = abs % SCALE;
401
402            // Format fractional part with exactly 9 digits, then trim trailing zeros
403            // to keep a canonical representation.
404            if frac_part == 0 {
405                // Pure integer
406                Ok(Quantity::from(int_part as u64))
407            } else {
408                let mut frac_str = format!("{frac_part:09}");
409                while frac_str.ends_with('0') {
410                    frac_str.pop();
411                }
412                let s = format!("{int_part}.{frac_str}");
413                Ok(Quantity::from(s))
414            }
415        }
416    }
417}
418
419/// Decodes a lot size from the given value, expressed in standard whole-number units.
420#[inline(always)]
421#[must_use]
422pub fn decode_lot_size(value: i32) -> Quantity {
423    match value {
424        0 | i32::MAX => Quantity::from(1),
425        value => Quantity::from(value),
426    }
427}
428
429#[inline(always)]
430#[must_use]
431fn is_trade_msg(action: c_char) -> bool {
432    action as u8 as char == 'T'
433}
434
435/// Returns `true` if both bid and ask prices are defined (not `i64::MAX`).
436///
437/// Databento uses `i64::MAX` as a sentinel value for undefined/null prices.
438/// A valid quote requires both sides to be defined.
439#[inline(always)]
440#[must_use]
441fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
442    bid_px != i64::MAX && ask_px != i64::MAX
443}
444
445/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
446///
447/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
448/// whether the message represents an order book update or a trade execution.
449///
450/// # Errors
451///
452/// Returns an error if decoding the MBO message fails.
453pub fn decode_mbo_msg(
454    msg: &dbn::MboMsg,
455    instrument_id: InstrumentId,
456    price_precision: u8,
457    ts_init: Option<UnixNanos>,
458    include_trades: bool,
459) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
460    let side = parse_order_side(msg.side);
461    if is_trade_msg(msg.action) {
462        if include_trades && msg.size > 0 {
463            let price = decode_price_or_undef(msg.price, price_precision);
464            let size = decode_quantity(msg.size as u64);
465            let aggressor_side = parse_aggressor_side(msg.side);
466            let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
467            let ts_event = msg.ts_recv.into();
468            let ts_init = ts_init.unwrap_or(ts_event);
469
470            let trade = TradeTick::new(
471                instrument_id,
472                price,
473                size,
474                aggressor_side,
475                trade_id,
476                ts_event,
477                ts_init,
478            );
479            return Ok((None, Some(trade)));
480        }
481
482        return Ok((None, None));
483    }
484
485    let action = parse_book_action(msg.action)?;
486    let price = decode_price_or_undef(msg.price, price_precision);
487    let size = decode_quantity(msg.size as u64);
488    let order = BookOrder::new(side, price, size, msg.order_id);
489
490    let ts_event = msg.ts_recv.into();
491    let ts_init = ts_init.unwrap_or(ts_event);
492
493    let delta = OrderBookDelta::new(
494        instrument_id,
495        action,
496        order,
497        msg.flags.raw(),
498        msg.sequence.into(),
499        ts_event,
500        ts_init,
501    );
502
503    Ok((Some(delta), None))
504}
505
506/// Decodes a Databento Trade message into a `TradeTick`.
507///
508/// # Errors
509///
510/// Returns an error if decoding the Trade message fails.
511pub fn decode_trade_msg(
512    msg: &dbn::TradeMsg,
513    instrument_id: InstrumentId,
514    price_precision: u8,
515    ts_init: Option<UnixNanos>,
516) -> anyhow::Result<TradeTick> {
517    let ts_event = msg.ts_recv.into();
518    let ts_init = ts_init.unwrap_or(ts_event);
519
520    let trade = TradeTick::new(
521        instrument_id,
522        decode_price_or_undef(msg.price, price_precision),
523        decode_quantity(msg.size as u64),
524        parse_aggressor_side(msg.side),
525        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
526        ts_event,
527        ts_init,
528    );
529
530    Ok(trade)
531}
532
533/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
534///
535/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
536/// The trade is always returned.
537///
538/// # Errors
539///
540/// Returns an error if decoding the TBBO message fails.
541pub fn decode_tbbo_msg(
542    msg: &dbn::TbboMsg,
543    instrument_id: InstrumentId,
544    price_precision: u8,
545    ts_init: Option<UnixNanos>,
546) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
547    let top_level = &msg.levels[0];
548    let ts_event = msg.ts_recv.into();
549    let ts_init = ts_init.unwrap_or(ts_event);
550
551    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
552        Some(QuoteTick::new(
553            instrument_id,
554            decode_price_or_undef(top_level.bid_px, price_precision),
555            decode_price_or_undef(top_level.ask_px, price_precision),
556            decode_quantity(top_level.bid_sz as u64),
557            decode_quantity(top_level.ask_sz as u64),
558            ts_event,
559            ts_init,
560        ))
561    } else {
562        None
563    };
564
565    let trade = TradeTick::new(
566        instrument_id,
567        decode_price_or_undef(msg.price, price_precision),
568        decode_quantity(msg.size as u64),
569        parse_aggressor_side(msg.side),
570        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
571        ts_event,
572        ts_init,
573    );
574
575    Ok((maybe_quote, trade))
576}
577
578/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
579///
580/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
581///
582/// # Errors
583///
584/// Returns an error if decoding the MBP1 message fails.
585pub fn decode_mbp1_msg(
586    msg: &dbn::Mbp1Msg,
587    instrument_id: InstrumentId,
588    price_precision: u8,
589    ts_init: Option<UnixNanos>,
590    include_trades: bool,
591) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
592    let top_level = &msg.levels[0];
593    let ts_event = msg.ts_recv.into();
594    let ts_init = ts_init.unwrap_or(ts_event);
595
596    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
597        Some(QuoteTick::new(
598            instrument_id,
599            decode_price_or_undef(top_level.bid_px, price_precision),
600            decode_price_or_undef(top_level.ask_px, price_precision),
601            decode_quantity(top_level.bid_sz as u64),
602            decode_quantity(top_level.ask_sz as u64),
603            ts_event,
604            ts_init,
605        ))
606    } else {
607        None
608    };
609
610    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
611        Some(TradeTick::new(
612            instrument_id,
613            decode_price_or_undef(msg.price, price_precision),
614            decode_quantity(msg.size as u64),
615            parse_aggressor_side(msg.side),
616            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
617            ts_event,
618            ts_init,
619        ))
620    } else {
621        None
622    };
623
624    Ok((maybe_quote, maybe_trade))
625}
626
627/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
628///
629/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
630///
631/// # Errors
632///
633/// Returns an error if decoding the BBO message fails.
634pub fn decode_bbo_msg(
635    msg: &dbn::BboMsg,
636    instrument_id: InstrumentId,
637    price_precision: u8,
638    ts_init: Option<UnixNanos>,
639) -> anyhow::Result<Option<QuoteTick>> {
640    let top_level = &msg.levels[0];
641    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
642        return Ok(None);
643    }
644
645    let ts_event = msg.ts_recv.into();
646    let ts_init = ts_init.unwrap_or(ts_event);
647
648    let quote = QuoteTick::new(
649        instrument_id,
650        decode_price_or_undef(top_level.bid_px, price_precision),
651        decode_price_or_undef(top_level.ask_px, price_precision),
652        decode_quantity(top_level.bid_sz as u64),
653        decode_quantity(top_level.ask_sz as u64),
654        ts_event,
655        ts_init,
656    );
657
658    Ok(Some(quote))
659}
660
661/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
662///
663/// # Errors
664///
665/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
666pub fn decode_mbp10_msg(
667    msg: &dbn::Mbp10Msg,
668    instrument_id: InstrumentId,
669    price_precision: u8,
670    ts_init: Option<UnixNanos>,
671) -> anyhow::Result<OrderBookDepth10> {
672    let mut bids = Vec::with_capacity(DEPTH10_LEN);
673    let mut asks = Vec::with_capacity(DEPTH10_LEN);
674    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
675    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
676
677    for level in &msg.levels {
678        let bid_order = BookOrder::new(
679            OrderSide::Buy,
680            decode_price_or_undef(level.bid_px, price_precision),
681            decode_quantity(level.bid_sz as u64),
682            0,
683        );
684
685        let ask_order = BookOrder::new(
686            OrderSide::Sell,
687            decode_price_or_undef(level.ask_px, price_precision),
688            decode_quantity(level.ask_sz as u64),
689            0,
690        );
691
692        bids.push(bid_order);
693        asks.push(ask_order);
694        bid_counts.push(level.bid_ct);
695        ask_counts.push(level.ask_ct);
696    }
697
698    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
699        anyhow::anyhow!(
700            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
701            v.len()
702        )
703    })?;
704
705    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
706        anyhow::anyhow!(
707            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
708            v.len()
709        )
710    })?;
711
712    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
713        anyhow::anyhow!(
714            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
715            v.len()
716        )
717    })?;
718
719    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
720        anyhow::anyhow!(
721            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
722            v.len()
723        )
724    })?;
725
726    let ts_event = msg.ts_recv.into();
727    let ts_init = ts_init.unwrap_or(ts_event);
728
729    let depth = OrderBookDepth10::new(
730        instrument_id,
731        bids,
732        asks,
733        bid_counts,
734        ask_counts,
735        msg.flags.raw(),
736        msg.sequence.into(),
737        ts_event,
738        ts_init,
739    );
740
741    Ok(depth)
742}
743
744/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
745///
746/// Returns a tuple containing an optional `QuoteTick` and an optional `TradeTick`.
747/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
748///
749/// # Errors
750///
751/// Returns an error if decoding the CMBP1 message fails.
752pub fn decode_cmbp1_msg(
753    msg: &dbn::Cmbp1Msg,
754    instrument_id: InstrumentId,
755    price_precision: u8,
756    ts_init: Option<UnixNanos>,
757    include_trades: bool,
758) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
759    let top_level = &msg.levels[0];
760    let ts_event = msg.ts_recv.into();
761    let ts_init = ts_init.unwrap_or(ts_event);
762
763    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
764        Some(QuoteTick::new(
765            instrument_id,
766            decode_price_or_undef(top_level.bid_px, price_precision),
767            decode_price_or_undef(top_level.ask_px, price_precision),
768            decode_quantity(top_level.bid_sz as u64),
769            decode_quantity(top_level.ask_sz as u64),
770            ts_event,
771            ts_init,
772        ))
773    } else {
774        None
775    };
776
777    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
778        // Use UUID4 for trade ID as CMBP1 doesn't have a sequence field
779        Some(TradeTick::new(
780            instrument_id,
781            decode_price_or_undef(msg.price, price_precision),
782            decode_quantity(msg.size as u64),
783            parse_aggressor_side(msg.side),
784            TradeId::new(UUID4::new().as_str()),
785            ts_event,
786            ts_init,
787        ))
788    } else {
789        None
790    };
791
792    Ok((maybe_quote, maybe_trade))
793}
794
795/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
796///
797/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
798///
799/// # Errors
800///
801/// Returns an error if decoding the CBBO message fails.
802pub fn decode_cbbo_msg(
803    msg: &dbn::CbboMsg,
804    instrument_id: InstrumentId,
805    price_precision: u8,
806    ts_init: Option<UnixNanos>,
807) -> anyhow::Result<Option<QuoteTick>> {
808    let top_level = &msg.levels[0];
809    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
810        return Ok(None);
811    }
812
813    let ts_event = msg.ts_recv.into();
814    let ts_init = ts_init.unwrap_or(ts_event);
815
816    let quote = QuoteTick::new(
817        instrument_id,
818        decode_price_or_undef(top_level.bid_px, price_precision),
819        decode_price_or_undef(top_level.ask_px, price_precision),
820        decode_quantity(top_level.bid_sz as u64),
821        decode_quantity(top_level.ask_sz as u64),
822        ts_event,
823        ts_init,
824    );
825
826    Ok(Some(quote))
827}
828
829/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
830///
831/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
832/// The trade is always returned.
833///
834/// # Errors
835///
836/// Returns an error if decoding the TCBBO message fails.
837pub fn decode_tcbbo_msg(
838    msg: &dbn::CbboMsg,
839    instrument_id: InstrumentId,
840    price_precision: u8,
841    ts_init: Option<UnixNanos>,
842) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
843    let top_level = &msg.levels[0];
844    let ts_event = msg.ts_recv.into();
845    let ts_init = ts_init.unwrap_or(ts_event);
846
847    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
848        Some(QuoteTick::new(
849            instrument_id,
850            decode_price_or_undef(top_level.bid_px, price_precision),
851            decode_price_or_undef(top_level.ask_px, price_precision),
852            decode_quantity(top_level.bid_sz as u64),
853            decode_quantity(top_level.ask_sz as u64),
854            ts_event,
855            ts_init,
856        ))
857    } else {
858        None
859    };
860
861    // Use UUID4 for trade ID as TCBBO doesn't have a sequence field
862    let trade = TradeTick::new(
863        instrument_id,
864        decode_price_or_undef(msg.price, price_precision),
865        decode_quantity(msg.size as u64),
866        parse_aggressor_side(msg.side),
867        TradeId::new(UUID4::new().as_str()),
868        ts_event,
869        ts_init,
870    );
871
872    Ok((maybe_quote, trade))
873}
874
875/// # Errors
876///
877/// Returns an error if `rtype` is not a supported bar aggregation.
878pub fn decode_bar_type(
879    msg: &dbn::OhlcvMsg,
880    instrument_id: InstrumentId,
881) -> anyhow::Result<BarType> {
882    let bar_type = match msg.hd.rtype {
883        32 => {
884            // ohlcv-1s
885            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
886        }
887        33 => {
888            // ohlcv-1m
889            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
890        }
891        34 => {
892            // ohlcv-1h
893            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
894        }
895        35 => {
896            // ohlcv-1d
897            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
898        }
899        36 => {
900            // ohlcv-eod
901            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
902        }
903        _ => anyhow::bail!(
904            "`rtype` is not a supported bar aggregation, was {}",
905            msg.hd.rtype
906        ),
907    };
908
909    Ok(bar_type)
910}
911
912/// # Errors
913///
914/// Returns an error if `rtype` is not a supported bar aggregation.
915pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
916    let adjustment = match msg.hd.rtype {
917        32 => {
918            // ohlcv-1s
919            BAR_CLOSE_ADJUSTMENT_1S
920        }
921        33 => {
922            // ohlcv-1m
923            BAR_CLOSE_ADJUSTMENT_1M
924        }
925        34 => {
926            // ohlcv-1h
927            BAR_CLOSE_ADJUSTMENT_1H
928        }
929        35 | 36 => {
930            // ohlcv-1d and ohlcv-eod
931            BAR_CLOSE_ADJUSTMENT_1D
932        }
933        _ => anyhow::bail!(
934            "`rtype` is not a supported bar aggregation, was {}",
935            msg.hd.rtype
936        ),
937    };
938
939    Ok(adjustment.into())
940}
941
942/// # Errors
943///
944/// Returns an error if decoding the OHLCV message fails.
945pub fn decode_ohlcv_msg(
946    msg: &dbn::OhlcvMsg,
947    instrument_id: InstrumentId,
948    price_precision: u8,
949    ts_init: Option<UnixNanos>,
950    timestamp_on_close: bool,
951) -> anyhow::Result<Bar> {
952    let bar_type = decode_bar_type(msg, instrument_id)?;
953    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
954
955    let ts_event_raw = msg.hd.ts_event.into();
956    let ts_close = ts_event_raw + ts_event_adjustment;
957    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
958
959    let ts_event = if timestamp_on_close {
960        ts_close
961    } else {
962        ts_event_raw
963    };
964
965    let bar = Bar::new(
966        bar_type,
967        decode_price_or_undef(msg.open, price_precision),
968        decode_price_or_undef(msg.high, price_precision),
969        decode_price_or_undef(msg.low, price_precision),
970        decode_price_or_undef(msg.close, price_precision),
971        decode_quantity(msg.volume),
972        ts_event,
973        ts_init,
974    );
975
976    Ok(bar)
977}
978
979/// Decodes a Databento status message into an `InstrumentStatus` event.
980///
981/// # Errors
982///
983/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
984pub fn decode_status_msg(
985    msg: &dbn::StatusMsg,
986    instrument_id: InstrumentId,
987    ts_init: Option<UnixNanos>,
988) -> anyhow::Result<InstrumentStatus> {
989    let ts_event = msg.hd.ts_event.into();
990    let ts_init = ts_init.unwrap_or(ts_event);
991
992    let action = MarketStatusAction::from_u16(msg.action)
993        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
994
995    let status = InstrumentStatus::new(
996        instrument_id,
997        action,
998        ts_event,
999        ts_init,
1000        parse_status_reason(msg.reason)?,
1001        parse_status_trading_event(msg.trading_event)?,
1002        parse_optional_bool(msg.is_trading),
1003        parse_optional_bool(msg.is_quoting),
1004        parse_optional_bool(msg.is_short_sell_restricted),
1005    );
1006
1007    Ok(status)
1008}
1009
1010/// # Errors
1011///
1012/// Returns an error if decoding the record type fails or encounters unsupported message.
1013pub fn decode_record(
1014    record: &dbn::RecordRef,
1015    instrument_id: InstrumentId,
1016    price_precision: u8,
1017    ts_init: Option<UnixNanos>,
1018    include_trades: bool,
1019    bars_timestamp_on_close: bool,
1020) -> anyhow::Result<(Option<Data>, Option<Data>)> {
1021    // Note: TBBO and TCBBO messages provide both quotes and trades.
1022    // TBBO is handled explicitly below, while TCBBO is handled by
1023    // the CbboMsg branch based on whether it has trade data.
1024    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
1025        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1026        let result = decode_mbo_msg(
1027            msg,
1028            instrument_id,
1029            price_precision,
1030            Some(ts_init),
1031            include_trades,
1032        )?;
1033        match result {
1034            (Some(delta), None) => (Some(Data::Delta(delta)), None),
1035            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
1036            (None, None) => (None, None),
1037            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
1038        }
1039    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
1040        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1041        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1042        (Some(Data::Trade(trade)), None)
1043    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
1044        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1045        let (maybe_quote, maybe_trade) = decode_mbp1_msg(
1046            msg,
1047            instrument_id,
1048            price_precision,
1049            Some(ts_init),
1050            include_trades,
1051        )?;
1052        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1053    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
1054        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1055        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1056        (maybe_quote.map(Data::Quote), None)
1057    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
1058        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1059        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1060        (maybe_quote.map(Data::Quote), None)
1061    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
1062        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1063        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1064        (Some(Data::from(depth)), None)
1065    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1066        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
1067        // it will be set correctly in decode_ohlcv_msg instead
1068        let bar = decode_ohlcv_msg(
1069            msg,
1070            instrument_id,
1071            price_precision,
1072            ts_init,
1073            bars_timestamp_on_close,
1074        )?;
1075        (Some(Data::Bar(bar)), None)
1076    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1077        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1078        let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1079            msg,
1080            instrument_id,
1081            price_precision,
1082            Some(ts_init),
1083            include_trades,
1084        )?;
1085        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1086    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1087        // TBBO always has a trade, quote may be skipped if prices undefined
1088        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1089        let (maybe_quote, trade) =
1090            decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1091        (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1092    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1093        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
1094        if msg.price != i64::MAX && msg.size > 0 {
1095            // TCBBO - has a trade, quote may be skipped if prices undefined
1096            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1097            let (maybe_quote, trade) =
1098                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1099            (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1100        } else {
1101            // Regular CBBO - quote only (may be None if prices undefined)
1102            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1103            let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1104            (maybe_quote.map(Data::Quote), None)
1105        }
1106    } else {
1107        anyhow::bail!("DBN message type is not currently supported")
1108    };
1109
1110    Ok(result)
1111}
1112
1113const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1114    match ts_init {
1115        Some(ts_init) => ts_init,
1116        None => msg_timestamp,
1117    }
1118}
1119
1120/// # Errors
1121///
1122/// Returns an error if decoding the `InstrumentDefMsg` fails.
1123pub fn decode_instrument_def_msg(
1124    msg: &dbn::InstrumentDefMsg,
1125    instrument_id: InstrumentId,
1126    ts_init: Option<UnixNanos>,
1127) -> anyhow::Result<InstrumentAny> {
1128    match msg.instrument_class as u8 as char {
1129        'K' => Ok(InstrumentAny::Equity(decode_equity(
1130            msg,
1131            instrument_id,
1132            ts_init,
1133        )?)),
1134        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1135            msg,
1136            instrument_id,
1137            ts_init,
1138        )?)),
1139        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1140            msg,
1141            instrument_id,
1142            ts_init,
1143        )?)),
1144        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1145            msg,
1146            instrument_id,
1147            ts_init,
1148        )?)),
1149        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1150            msg,
1151            instrument_id,
1152            ts_init,
1153        )?)),
1154        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1155        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1156        _ => anyhow::bail!(
1157            "Unsupported `instrument_class` '{}'",
1158            msg.instrument_class as u8 as char
1159        ),
1160    }
1161}
1162
1163/// Decodes a Databento instrument definition message into an `Equity` instrument.
1164///
1165/// # Errors
1166///
1167/// Returns an error if parsing or constructing `Equity` fails.
1168pub fn decode_equity(
1169    msg: &dbn::InstrumentDefMsg,
1170    instrument_id: InstrumentId,
1171    ts_init: Option<UnixNanos>,
1172) -> anyhow::Result<Equity> {
1173    let currency = parse_currency_or_usd_default(msg.currency());
1174    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1175    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1176    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1177    let ts_init = ts_init.unwrap_or(ts_event);
1178
1179    Ok(Equity::new(
1180        instrument_id,
1181        instrument_id.symbol,
1182        None, // No ISIN available yet
1183        currency,
1184        price_increment.precision,
1185        price_increment,
1186        Some(lot_size),
1187        None, // TBD
1188        None, // TBD
1189        None, // TBD
1190        None, // TBD
1191        None, // TBD
1192        None, // TBD
1193        None, // TBD
1194        None, // TBD
1195        ts_event,
1196        ts_init,
1197    ))
1198}
1199
1200/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1201///
1202/// # Errors
1203///
1204/// Returns an error if parsing or constructing `FuturesContract` fails.
1205pub fn decode_futures_contract(
1206    msg: &dbn::InstrumentDefMsg,
1207    instrument_id: InstrumentId,
1208    ts_init: Option<UnixNanos>,
1209) -> anyhow::Result<FuturesContract> {
1210    let currency = parse_currency_or_usd_default(msg.currency());
1211    let exchange = Ustr::from(msg.exchange()?);
1212    let underlying = Ustr::from(msg.asset()?);
1213    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1214    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1215    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1216    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1217    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1218    let ts_init = ts_init.unwrap_or(ts_event);
1219
1220    FuturesContract::new_checked(
1221        instrument_id,
1222        instrument_id.symbol,
1223        asset_class.unwrap_or(AssetClass::Commodity),
1224        Some(exchange),
1225        underlying,
1226        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1227        decode_timestamp(msg.expiration, "expiration")?,
1228        currency,
1229        price_increment.precision,
1230        price_increment,
1231        multiplier,
1232        lot_size,
1233        None, // TBD
1234        None, // TBD
1235        None, // TBD
1236        None, // TBD
1237        None, // TBD
1238        None, // TBD
1239        None, // TBD
1240        None, // TBD
1241        ts_event,
1242        ts_init,
1243    )
1244}
1245
1246/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1247///
1248/// # Errors
1249///
1250/// Returns an error if parsing or constructing `FuturesSpread` fails.
1251pub fn decode_futures_spread(
1252    msg: &dbn::InstrumentDefMsg,
1253    instrument_id: InstrumentId,
1254    ts_init: Option<UnixNanos>,
1255) -> anyhow::Result<FuturesSpread> {
1256    let exchange = Ustr::from(msg.exchange()?);
1257    let underlying = Ustr::from(msg.asset()?);
1258    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1259    let strategy_type = Ustr::from(msg.secsubtype()?);
1260    let currency = parse_currency_or_usd_default(msg.currency());
1261    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1262    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1263    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1264    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1265    let ts_init = ts_init.unwrap_or(ts_event);
1266
1267    FuturesSpread::new_checked(
1268        instrument_id,
1269        instrument_id.symbol,
1270        asset_class.unwrap_or(AssetClass::Commodity),
1271        Some(exchange),
1272        underlying,
1273        strategy_type,
1274        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1275        decode_timestamp(msg.expiration, "expiration")?,
1276        currency,
1277        price_increment.precision,
1278        price_increment,
1279        multiplier,
1280        lot_size,
1281        None, // TBD
1282        None, // TBD
1283        None, // TBD
1284        None, // TBD
1285        None, // TBD
1286        None, // TBD
1287        None, // TBD
1288        None, // TBD
1289        ts_event,
1290        ts_init,
1291    )
1292}
1293
1294/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1295///
1296/// # Errors
1297///
1298/// Returns an error if parsing or constructing `OptionContract` fails.
1299pub fn decode_option_contract(
1300    msg: &dbn::InstrumentDefMsg,
1301    instrument_id: InstrumentId,
1302    ts_init: Option<UnixNanos>,
1303) -> anyhow::Result<OptionContract> {
1304    let currency = parse_currency_or_usd_default(msg.currency());
1305    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1306    let exchange = Ustr::from(msg.exchange()?);
1307    let underlying = Ustr::from(msg.underlying()?);
1308    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1309        Some(AssetClass::Equity)
1310    } else {
1311        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1312        asset_class
1313    };
1314    let option_kind = parse_option_kind(msg.instrument_class)?;
1315    let strike_price = decode_price(
1316        msg.strike_price,
1317        strike_price_currency.precision,
1318        "strike_price",
1319    )?;
1320    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1321    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1322    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1323    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1324    let ts_init = ts_init.unwrap_or(ts_event);
1325
1326    OptionContract::new_checked(
1327        instrument_id,
1328        instrument_id.symbol,
1329        asset_class_opt.unwrap_or(AssetClass::Commodity),
1330        Some(exchange),
1331        underlying,
1332        option_kind,
1333        strike_price,
1334        currency,
1335        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1336        decode_timestamp(msg.expiration, "expiration")?,
1337        price_increment.precision,
1338        price_increment,
1339        multiplier,
1340        lot_size,
1341        None, // TBD
1342        None, // TBD
1343        None, // TBD
1344        None, // TBD
1345        None, // TBD
1346        None, // TBD
1347        None, // TBD
1348        None, // TBD
1349        ts_event,
1350        ts_init,
1351    )
1352}
1353
1354/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1355///
1356/// # Errors
1357///
1358/// Returns an error if parsing or constructing `OptionSpread` fails.
1359pub fn decode_option_spread(
1360    msg: &dbn::InstrumentDefMsg,
1361    instrument_id: InstrumentId,
1362    ts_init: Option<UnixNanos>,
1363) -> anyhow::Result<OptionSpread> {
1364    let exchange = Ustr::from(msg.exchange()?);
1365    let underlying = Ustr::from(msg.underlying()?);
1366    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1367        Some(AssetClass::Equity)
1368    } else {
1369        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1370        asset_class
1371    };
1372    let strategy_type = Ustr::from(msg.secsubtype()?);
1373    let currency = parse_currency_or_usd_default(msg.currency());
1374    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1375    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1376    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1377    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1378    let ts_init = ts_init.unwrap_or(ts_event);
1379
1380    OptionSpread::new_checked(
1381        instrument_id,
1382        instrument_id.symbol,
1383        asset_class_opt.unwrap_or(AssetClass::Commodity),
1384        Some(exchange),
1385        underlying,
1386        strategy_type,
1387        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1388        decode_timestamp(msg.expiration, "expiration")?,
1389        currency,
1390        price_increment.precision,
1391        price_increment,
1392        multiplier,
1393        lot_size,
1394        None, // TBD
1395        None, // TBD
1396        None, // TBD
1397        None, // TBD
1398        None, // TBD
1399        None, // TBD
1400        None, // TBD
1401        None, // TBD
1402        ts_event,
1403        ts_init,
1404    )
1405}
1406
1407/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1408///
1409/// # Errors
1410///
1411/// Returns an error if constructing `DatabentoImbalance` fails.
1412pub fn decode_imbalance_msg(
1413    msg: &dbn::ImbalanceMsg,
1414    instrument_id: InstrumentId,
1415    price_precision: u8,
1416    ts_init: Option<UnixNanos>,
1417) -> anyhow::Result<DatabentoImbalance> {
1418    let ts_event = msg.ts_recv.into();
1419    let ts_init = ts_init.unwrap_or(ts_event);
1420
1421    Ok(DatabentoImbalance::new(
1422        instrument_id,
1423        decode_price_or_undef(msg.ref_price, price_precision),
1424        decode_price_or_undef(msg.cont_book_clr_price, price_precision),
1425        decode_price_or_undef(msg.auct_interest_clr_price, price_precision),
1426        Quantity::new(f64::from(msg.paired_qty), 0),
1427        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1428        parse_order_side(msg.side),
1429        msg.significant_imbalance as c_char,
1430        msg.hd.ts_event.into(),
1431        ts_event,
1432        ts_init,
1433    ))
1434}
1435
1436/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1437///
1438/// # Errors
1439///
1440/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1441/// `msg.update_action` is not a valid enum variant.
1442pub fn decode_statistics_msg(
1443    msg: &dbn::StatMsg,
1444    instrument_id: InstrumentId,
1445    price_precision: u8,
1446    ts_init: Option<UnixNanos>,
1447) -> anyhow::Result<DatabentoStatistics> {
1448    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1449        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1450    let update_action =
1451        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1452            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1453        })?;
1454    let ts_event = msg.ts_recv.into();
1455    let ts_init = ts_init.unwrap_or(ts_event);
1456
1457    Ok(DatabentoStatistics::new(
1458        instrument_id,
1459        stat_type,
1460        update_action,
1461        decode_optional_price(msg.price, price_precision),
1462        decode_optional_quantity(msg.quantity),
1463        msg.channel_id,
1464        msg.stat_flags,
1465        msg.sequence,
1466        msg.ts_ref.into(),
1467        msg.ts_in_delta,
1468        msg.hd.ts_event.into(),
1469        ts_event,
1470        ts_init,
1471    ))
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476    use std::path::{Path, PathBuf};
1477
1478    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1479    use fallible_streaming_iterator::FallibleStreamingIterator;
1480    use nautilus_model::instruments::Instrument;
1481    use rstest::*;
1482
1483    use super::*;
1484
1485    fn test_data_path() -> PathBuf {
1486        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1487    }
1488
1489    #[rstest]
1490    #[case('Y' as c_char, Some(true))]
1491    #[case('N' as c_char, Some(false))]
1492    #[case('X' as c_char, None)]
1493    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1494        assert_eq!(parse_optional_bool(input), expected);
1495    }
1496
1497    #[rstest]
1498    #[case('A' as c_char, OrderSide::Sell)]
1499    #[case('B' as c_char, OrderSide::Buy)]
1500    #[case('X' as c_char, OrderSide::NoOrderSide)]
1501    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1502        assert_eq!(parse_order_side(input), expected);
1503    }
1504
1505    #[rstest]
1506    #[case('A' as c_char, AggressorSide::Seller)]
1507    #[case('B' as c_char, AggressorSide::Buyer)]
1508    #[case('X' as c_char, AggressorSide::NoAggressor)]
1509    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1510        assert_eq!(parse_aggressor_side(input), expected);
1511    }
1512
1513    #[rstest]
1514    #[case('T' as c_char, true)]
1515    #[case('A' as c_char, false)]
1516    #[case('C' as c_char, false)]
1517    #[case('F' as c_char, false)]
1518    #[case('M' as c_char, false)]
1519    #[case('R' as c_char, false)]
1520    fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1521        assert_eq!(is_trade_msg(action), expected);
1522    }
1523
1524    #[rstest]
1525    #[case('A' as c_char, Ok(BookAction::Add))]
1526    #[case('C' as c_char, Ok(BookAction::Delete))]
1527    #[case('F' as c_char, Ok(BookAction::Update))]
1528    #[case('M' as c_char, Ok(BookAction::Update))]
1529    #[case('R' as c_char, Ok(BookAction::Clear))]
1530    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1531    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1532        match parse_book_action(input) {
1533            Ok(action) => assert_eq!(Ok(action), expected),
1534            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1535        }
1536    }
1537
1538    #[rstest]
1539    #[case('C' as c_char, Ok(OptionKind::Call))]
1540    #[case('P' as c_char, Ok(OptionKind::Put))]
1541    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1542    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1543        match parse_option_kind(input) {
1544            Ok(kind) => assert_eq!(Ok(kind), expected),
1545            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1546        }
1547    }
1548
1549    #[rstest]
1550    #[case(Ok("USD"), Currency::USD())]
1551    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1552    #[case(Ok(""), Currency::USD())]
1553    #[case(Err("Error"), Currency::USD())]
1554    fn test_parse_currency_or_usd_default(
1555        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1556        #[case] expected: Currency,
1557    ) {
1558        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1559        assert_eq!(actual, expected);
1560    }
1561
1562    #[rstest]
1563    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1564    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1565    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1566    #[case("XXX", Ok((None, None)))]
1567    #[case("D", Err("Value string is too short"))]
1568    fn test_parse_cfi_iso10926(
1569        #[case] input: &str,
1570        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1571    ) {
1572        match parse_cfi_iso10926(input) {
1573            Ok(result) => assert_eq!(Ok(result), expected),
1574            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1575        }
1576    }
1577
1578    #[rstest]
1579    #[case(0, 2, Price::from_raw(0, 2))]
1580    #[case(
1581        1_000_000_000,
1582        2,
1583        Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1584    )]
1585    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1586        let actual = decode_price(value, precision, "test_field").unwrap();
1587        assert_eq!(actual, expected);
1588    }
1589
1590    #[rstest]
1591    fn test_decode_price_undefined_errors() {
1592        let result = decode_price(i64::MAX, 2, "strike_price");
1593        assert!(result.is_err());
1594        assert!(result.unwrap_err().to_string().contains("strike_price"));
1595    }
1596
1597    #[rstest]
1598    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1599    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1600    #[case(
1601        10_000_000_000,
1602        2,
1603        Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1604    )]
1605    fn test_decode_price_increment(
1606        #[case] value: i64,
1607        #[case] precision: u8,
1608        #[case] expected: Price,
1609    ) {
1610        let actual = decode_price_increment(value, precision);
1611        assert_eq!(actual, expected);
1612    }
1613
1614    #[rstest]
1615    #[case(i64::MAX, 2, None)] // None for i64::MAX
1616    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1617    #[case(
1618        10_000_000_000,
1619        2,
1620        Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1621    )]
1622    fn test_decode_optional_price(
1623        #[case] value: i64,
1624        #[case] precision: u8,
1625        #[case] expected: Option<Price>,
1626    ) {
1627        let actual = decode_optional_price(value, precision);
1628        assert_eq!(actual, expected);
1629    }
1630
1631    #[rstest]
1632    #[case(0, 2, Price::from_raw(0, 2))]
1633    #[case(
1634        1_000_000_000,
1635        2,
1636        Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1637    )]
1638    #[case(i64::MAX, 2, Price::from_raw(PRICE_UNDEF, 0))] // Sentinel becomes PRICE_UNDEF
1639    fn test_decode_price_or_undef(
1640        #[case] value: i64,
1641        #[case] precision: u8,
1642        #[case] expected: Price,
1643    ) {
1644        let actual = decode_price_or_undef(value, precision);
1645        assert_eq!(actual, expected);
1646    }
1647
1648    #[rstest]
1649    #[case(i64::MAX, None)] // None for i32::MAX
1650    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1651    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1652    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1653        let actual = decode_optional_quantity(value);
1654        assert_eq!(actual, expected);
1655    }
1656
1657    #[rstest]
1658    #[case(0, UnixNanos::from(0))]
1659    #[case(1_000_000_000, UnixNanos::from(1_000_000_000))]
1660    fn test_decode_timestamp(#[case] value: u64, #[case] expected: UnixNanos) {
1661        let actual = decode_timestamp(value, "test_field").unwrap();
1662        assert_eq!(actual, expected);
1663    }
1664
1665    #[rstest]
1666    fn test_decode_timestamp_undefined_errors() {
1667        let result = decode_timestamp(dbn::UNDEF_TIMESTAMP, "expiration");
1668        assert!(result.is_err());
1669        assert!(result.unwrap_err().to_string().contains("expiration"));
1670    }
1671
1672    #[rstest]
1673    #[case(0, Some(UnixNanos::from(0)))]
1674    #[case(1_000_000_000, Some(UnixNanos::from(1_000_000_000)))]
1675    #[case(dbn::UNDEF_TIMESTAMP, None)]
1676    fn test_decode_optional_timestamp(#[case] value: u64, #[case] expected: Option<UnixNanos>) {
1677        let actual = decode_optional_timestamp(value);
1678        assert_eq!(actual, expected);
1679    }
1680
1681    #[rstest]
1682    #[case(0, Quantity::from(1))] // Default fallback for 0
1683    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1684    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1685    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1686    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1687    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1688    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1689    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1690    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1691    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1692        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1693    }
1694
1695    #[rstest]
1696    #[case(-1_500_000_000)] // Large negative value
1697    #[case(-1)] // Small negative value
1698    #[case(-999_999_999)] // Another negative value
1699    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1700        let result = decode_multiplier(raw);
1701        assert!(result.is_err());
1702        assert!(
1703            result
1704                .unwrap_err()
1705                .to_string()
1706                .contains("Invalid negative multiplier")
1707        );
1708    }
1709
1710    #[rstest]
1711    #[case(100, Quantity::from(100))]
1712    #[case(1000, Quantity::from(1000))]
1713    #[case(5, Quantity::from(5))]
1714    fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1715        assert_eq!(decode_quantity(value), expected);
1716    }
1717
1718    #[rstest]
1719    #[case(0, Quantity::from(1))] // Default for 0
1720    #[case(i32::MAX, Quantity::from(1))] // Default for MAX
1721    #[case(100, Quantity::from(100))]
1722    #[case(1, Quantity::from(1))]
1723    #[case(1000, Quantity::from(1000))]
1724    fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1725        assert_eq!(decode_lot_size(value), expected);
1726    }
1727
1728    #[rstest]
1729    #[case(0, None)] // None for 0
1730    #[case(1, Some(Ustr::from("Scheduled")))]
1731    #[case(2, Some(Ustr::from("Surveillance intervention")))]
1732    #[case(3, Some(Ustr::from("Market event")))]
1733    #[case(10, Some(Ustr::from("Regulatory")))]
1734    #[case(30, Some(Ustr::from("News pending")))]
1735    #[case(40, Some(Ustr::from("Order imbalance")))]
1736    #[case(50, Some(Ustr::from("LULD pause")))]
1737    #[case(60, Some(Ustr::from("Operational")))]
1738    #[case(100, Some(Ustr::from("Corporate action")))]
1739    #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1740    fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1741        assert_eq!(parse_status_reason(value).unwrap(), expected);
1742    }
1743
1744    #[rstest]
1745    #[case(999)] // Invalid code
1746    fn test_parse_status_reason_invalid(#[case] value: u16) {
1747        assert!(parse_status_reason(value).is_err());
1748    }
1749
1750    #[rstest]
1751    #[case(0, None)] // None for 0
1752    #[case(1, Some(Ustr::from("No cancel")))]
1753    #[case(2, Some(Ustr::from("Change trading session")))]
1754    #[case(3, Some(Ustr::from("Implied matching on")))]
1755    #[case(4, Some(Ustr::from("Implied matching off")))]
1756    fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1757        assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1758    }
1759
1760    #[rstest]
1761    #[case(5)] // Invalid code
1762    #[case(100)] // Invalid code
1763    fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1764        assert!(parse_status_trading_event(value).is_err());
1765    }
1766
1767    #[rstest]
1768    fn test_decode_mbo_msg() {
1769        let path = test_data_path().join("test_data.mbo.dbn.zst");
1770        let mut dbn_stream = Decoder::from_zstd_file(path)
1771            .unwrap()
1772            .decode_stream::<dbn::MboMsg>();
1773        let msg = dbn_stream.next().unwrap().unwrap();
1774
1775        let instrument_id = InstrumentId::from("ESM4.GLBX");
1776        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1777        let delta = delta.unwrap();
1778
1779        assert_eq!(delta.instrument_id, instrument_id);
1780        assert_eq!(delta.action, BookAction::Delete);
1781        assert_eq!(delta.order.side, OrderSide::Sell);
1782        assert_eq!(delta.order.price, Price::from("3722.75"));
1783        assert_eq!(delta.order.size, Quantity::from("1"));
1784        assert_eq!(delta.order.order_id, 647_784_973_705);
1785        assert_eq!(delta.flags, 128);
1786        assert_eq!(delta.sequence, 1_170_352);
1787        assert_eq!(delta.ts_event, msg.ts_recv);
1788        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1789        assert_eq!(delta.ts_init, 0);
1790    }
1791
1792    #[rstest]
1793    fn test_decode_mbo_msg_clear_action() {
1794        // Create an MBO message with Clear action (action='R', side='N')
1795        let ts_recv = 1_609_160_400_000_000_000;
1796        let msg = dbn::MboMsg {
1797            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1798            order_id: 0,
1799            price: i64::MAX,
1800            size: 0,
1801            flags: dbn::FlagSet::empty(),
1802            channel_id: 0,
1803            action: 'R' as c_char,
1804            side: 'N' as c_char, // NoOrderSide for Clear
1805            ts_recv,
1806            ts_in_delta: 0,
1807            sequence: 1_000_000,
1808        };
1809
1810        let instrument_id = InstrumentId::from("ESM4.GLBX");
1811        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1812
1813        // Clear messages should produce OrderBookDelta, not TradeTick
1814        assert!(trade.is_none());
1815        let delta = delta.expect("Clear action should produce OrderBookDelta");
1816
1817        assert_eq!(delta.instrument_id, instrument_id);
1818        assert_eq!(delta.action, BookAction::Clear);
1819        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1820        assert_eq!(delta.order.size, Quantity::from("0"));
1821        assert_eq!(delta.order.order_id, 0);
1822        assert_eq!(delta.sequence, 1_000_000);
1823        assert_eq!(delta.ts_event, ts_recv);
1824        assert_eq!(delta.ts_init, 0);
1825        assert!(delta.order.price.is_undefined());
1826        assert_eq!(delta.order.price.precision, 0);
1827    }
1828
1829    #[rstest]
1830    fn test_decode_mbo_msg_price_undef_with_precision() {
1831        // Test that PRICE_UNDEF (i64::MAX) forces precision to 0 even when price_precision is non-zero
1832        let ts_recv = 1_609_160_400_000_000_000;
1833        let msg = dbn::MboMsg {
1834            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1835            order_id: 0,
1836            price: i64::MAX, // PRICE_UNDEF
1837            size: 0,
1838            flags: dbn::FlagSet::empty(),
1839            channel_id: 0,
1840            action: 'R' as c_char, // Clear
1841            side: 'N' as c_char,   // NoOrderSide
1842            ts_recv,
1843            ts_in_delta: 0,
1844            sequence: 0,
1845        };
1846
1847        let instrument_id = InstrumentId::from("ESM4.GLBX");
1848        let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1849        let delta = delta.unwrap();
1850
1851        assert!(delta.order.price.is_undefined());
1852        assert_eq!(delta.order.price.precision, 0);
1853        assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1854    }
1855
1856    #[rstest]
1857    fn test_decode_mbo_msg_no_order_side_update() {
1858        // MBO messages with NoOrderSide are now passed through to the book
1859        // The book will resolve the side from its cache using the order_id
1860        let ts_recv = 1_609_160_400_000_000_000;
1861        let msg = dbn::MboMsg {
1862            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1863            order_id: 123_456_789,
1864            price: 4_800_250_000_000, // $4800.25 with precision 2
1865            size: 1,
1866            flags: dbn::FlagSet::empty(),
1867            channel_id: 1,
1868            action: 'M' as c_char, // Modify/Update action
1869            side: 'N' as c_char,   // NoOrderSide
1870            ts_recv,
1871            ts_in_delta: 0,
1872            sequence: 1_000_000,
1873        };
1874
1875        let instrument_id = InstrumentId::from("ESM4.GLBX");
1876        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1877
1878        // Delta should be created with NoOrderSide (book will resolve it)
1879        assert!(delta.is_some());
1880        assert!(trade.is_none());
1881        let delta = delta.unwrap();
1882        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1883        assert_eq!(delta.order.order_id, 123_456_789);
1884        assert_eq!(delta.action, BookAction::Update);
1885    }
1886
1887    #[rstest]
1888    fn test_decode_mbp1_msg() {
1889        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1890        let mut dbn_stream = Decoder::from_zstd_file(path)
1891            .unwrap()
1892            .decode_stream::<dbn::Mbp1Msg>();
1893        let msg = dbn_stream.next().unwrap().unwrap();
1894
1895        let instrument_id = InstrumentId::from("ESM4.GLBX");
1896        let (maybe_quote, _) =
1897            decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1898        let quote = maybe_quote.expect("Expected valid quote");
1899
1900        assert_eq!(quote.instrument_id, instrument_id);
1901        assert_eq!(quote.bid_price, Price::from("3720.25"));
1902        assert_eq!(quote.ask_price, Price::from("3720.50"));
1903        assert_eq!(quote.bid_size, Quantity::from("24"));
1904        assert_eq!(quote.ask_size, Quantity::from("11"));
1905        assert_eq!(quote.ts_event, msg.ts_recv);
1906        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1907        assert_eq!(quote.ts_init, 0);
1908    }
1909
1910    #[rstest]
1911    fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
1912        let ts_recv = 1_609_160_400_000_000_000;
1913        let msg = dbn::Mbp1Msg {
1914            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1915            price: 3_720_250_000_000, // Valid trade price
1916            size: 5,
1917            action: 'A' as c_char,
1918            side: 'B' as c_char,
1919            flags: dbn::FlagSet::empty(),
1920            depth: 0,
1921            ts_recv,
1922            ts_in_delta: 0,
1923            sequence: 1_170_352,
1924            levels: [dbn::BidAskPair {
1925                bid_px: 3_720_250_000_000, // Valid bid price
1926                ask_px: i64::MAX,          // Undefined ask price
1927                bid_sz: 24,
1928                ask_sz: 0,
1929                bid_ct: 1,
1930                ask_ct: 0,
1931            }],
1932        };
1933
1934        let instrument_id = InstrumentId::from("ESM4.GLBX");
1935        let (maybe_quote, _) =
1936            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1937
1938        // Quote should be None because ask price is undefined
1939        assert!(maybe_quote.is_none());
1940    }
1941
1942    #[rstest]
1943    fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
1944        let ts_recv = 1_609_160_400_000_000_000;
1945        let msg = dbn::Mbp1Msg {
1946            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1947            price: 3_720_500_000_000, // Valid trade price
1948            size: 5,
1949            action: 'A' as c_char,
1950            side: 'A' as c_char,
1951            flags: dbn::FlagSet::empty(),
1952            depth: 0,
1953            ts_recv,
1954            ts_in_delta: 0,
1955            sequence: 1_170_352,
1956            levels: [dbn::BidAskPair {
1957                bid_px: i64::MAX,          // Undefined bid price
1958                ask_px: 3_720_500_000_000, // Valid ask price
1959                bid_sz: 0,
1960                ask_sz: 11,
1961                bid_ct: 0,
1962                ask_ct: 1,
1963            }],
1964        };
1965
1966        let instrument_id = InstrumentId::from("ESM4.GLBX");
1967        let (maybe_quote, _) =
1968            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1969
1970        // Quote should be None because bid price is undefined
1971        assert!(maybe_quote.is_none());
1972    }
1973
1974    #[rstest]
1975    fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
1976        let ts_recv = 1_609_160_400_000_000_000;
1977        let msg = dbn::Mbp1Msg {
1978            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1979            price: 3_720_250_000_000, // Valid trade price
1980            size: 5,
1981            action: 'T' as c_char, // Trade action
1982            side: 'A' as c_char,
1983            flags: dbn::FlagSet::empty(),
1984            depth: 0,
1985            ts_recv,
1986            ts_in_delta: 0,
1987            sequence: 1_170_352,
1988            levels: [dbn::BidAskPair {
1989                bid_px: i64::MAX, // Undefined bid
1990                ask_px: i64::MAX, // Undefined ask
1991                bid_sz: 0,
1992                ask_sz: 0,
1993                bid_ct: 0,
1994                ask_ct: 0,
1995            }],
1996        };
1997
1998        let instrument_id = InstrumentId::from("ESM4.GLBX");
1999        let (maybe_quote, maybe_trade) =
2000            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
2001
2002        // Quote should be None because both prices are undefined
2003        assert!(maybe_quote.is_none());
2004
2005        // Trade should still be present
2006        let trade = maybe_trade.expect("Expected trade");
2007        assert_eq!(trade.instrument_id, instrument_id);
2008        assert_eq!(trade.price, Price::from("3720.25"));
2009        assert_eq!(trade.size, Quantity::from("5"));
2010    }
2011
2012    #[rstest]
2013    fn test_decode_bbo_1s_msg() {
2014        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
2015        let mut dbn_stream = Decoder::from_zstd_file(path)
2016            .unwrap()
2017            .decode_stream::<dbn::BboMsg>();
2018        let msg = dbn_stream.next().unwrap().unwrap();
2019
2020        let instrument_id = InstrumentId::from("ESM4.GLBX");
2021        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2022        let quote = maybe_quote.expect("Expected valid quote");
2023
2024        assert_eq!(quote.instrument_id, instrument_id);
2025        assert_eq!(quote.bid_price, Price::from("3702.25"));
2026        assert_eq!(quote.ask_price, Price::from("3702.75"));
2027        assert_eq!(quote.bid_size, Quantity::from("18"));
2028        assert_eq!(quote.ask_size, Quantity::from("13"));
2029        assert_eq!(quote.ts_event, msg.ts_recv);
2030        assert_eq!(quote.ts_event, 1609113600000000000);
2031        assert_eq!(quote.ts_init, 0);
2032    }
2033
2034    #[rstest]
2035    fn test_decode_bbo_1m_msg() {
2036        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
2037        let mut dbn_stream = Decoder::from_zstd_file(path)
2038            .unwrap()
2039            .decode_stream::<dbn::BboMsg>();
2040        let msg = dbn_stream.next().unwrap().unwrap();
2041
2042        let instrument_id = InstrumentId::from("ESM4.GLBX");
2043        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2044        let quote = maybe_quote.expect("Expected valid quote");
2045
2046        assert_eq!(quote.instrument_id, instrument_id);
2047        assert_eq!(quote.bid_price, Price::from("3702.25"));
2048        assert_eq!(quote.ask_price, Price::from("3702.75"));
2049        assert_eq!(quote.bid_size, Quantity::from("18"));
2050        assert_eq!(quote.ask_size, Quantity::from("13"));
2051        assert_eq!(quote.ts_event, msg.ts_recv);
2052        assert_eq!(quote.ts_event, 1609113600000000000);
2053        assert_eq!(quote.ts_init, 0);
2054    }
2055
2056    #[rstest]
2057    fn test_decode_mbp10_msg() {
2058        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
2059        let mut dbn_stream = Decoder::from_zstd_file(path)
2060            .unwrap()
2061            .decode_stream::<dbn::Mbp10Msg>();
2062        let msg = dbn_stream.next().unwrap().unwrap();
2063
2064        let instrument_id = InstrumentId::from("ESM4.GLBX");
2065        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2066
2067        assert_eq!(depth10.instrument_id, instrument_id);
2068        assert_eq!(depth10.bids.len(), 10);
2069        assert_eq!(depth10.asks.len(), 10);
2070        assert_eq!(depth10.bid_counts.len(), 10);
2071        assert_eq!(depth10.ask_counts.len(), 10);
2072        assert_eq!(depth10.flags, 128);
2073        assert_eq!(depth10.sequence, 1_170_352);
2074        assert_eq!(depth10.ts_event, msg.ts_recv);
2075        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
2076        assert_eq!(depth10.ts_init, 0);
2077    }
2078
2079    #[rstest]
2080    fn test_decode_trade_msg() {
2081        let path = test_data_path().join("test_data.trades.dbn.zst");
2082        let mut dbn_stream = Decoder::from_zstd_file(path)
2083            .unwrap()
2084            .decode_stream::<dbn::TradeMsg>();
2085        let msg = dbn_stream.next().unwrap().unwrap();
2086
2087        let instrument_id = InstrumentId::from("ESM4.GLBX");
2088        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2089
2090        assert_eq!(trade.instrument_id, instrument_id);
2091        assert_eq!(trade.price, Price::from("3720.25"));
2092        assert_eq!(trade.size, Quantity::from("5"));
2093        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2094        assert_eq!(trade.trade_id.to_string(), "1170380");
2095        assert_eq!(trade.ts_event, msg.ts_recv);
2096        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2097        assert_eq!(trade.ts_init, 0);
2098    }
2099
2100    #[rstest]
2101    fn test_decode_tbbo_msg() {
2102        let path = test_data_path().join("test_data.tbbo.dbn.zst");
2103        let mut dbn_stream = Decoder::from_zstd_file(path)
2104            .unwrap()
2105            .decode_stream::<dbn::Mbp1Msg>();
2106        let msg = dbn_stream.next().unwrap().unwrap();
2107
2108        let instrument_id = InstrumentId::from("ESM4.GLBX");
2109        let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2110        let quote = maybe_quote.expect("Expected valid quote");
2111
2112        assert_eq!(quote.instrument_id, instrument_id);
2113        assert_eq!(quote.bid_price, Price::from("3720.25"));
2114        assert_eq!(quote.ask_price, Price::from("3720.50"));
2115        assert_eq!(quote.bid_size, Quantity::from("26"));
2116        assert_eq!(quote.ask_size, Quantity::from("7"));
2117        assert_eq!(quote.ts_event, msg.ts_recv);
2118        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2119        assert_eq!(quote.ts_init, 0);
2120
2121        assert_eq!(trade.instrument_id, instrument_id);
2122        assert_eq!(trade.price, Price::from("3720.25"));
2123        assert_eq!(trade.size, Quantity::from("5"));
2124        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2125        assert_eq!(trade.trade_id.to_string(), "1170380");
2126        assert_eq!(trade.ts_event, msg.ts_recv);
2127        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2128        assert_eq!(trade.ts_init, 0);
2129    }
2130
2131    #[rstest]
2132    fn test_decode_ohlcv_msg() {
2133        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2134        let mut dbn_stream = Decoder::from_zstd_file(path)
2135            .unwrap()
2136            .decode_stream::<dbn::OhlcvMsg>();
2137        let msg = dbn_stream.next().unwrap().unwrap();
2138
2139        let instrument_id = InstrumentId::from("ESM4.GLBX");
2140        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2141
2142        assert_eq!(
2143            bar.bar_type,
2144            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2145        );
2146        assert_eq!(bar.open, Price::from("372025.00"));
2147        assert_eq!(bar.high, Price::from("372050.00"));
2148        assert_eq!(bar.low, Price::from("372025.00"));
2149        assert_eq!(bar.close, Price::from("372050.00"));
2150        assert_eq!(bar.volume, Quantity::from("57"));
2151        assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); // timestamp_on_close=true
2152        assert_eq!(bar.ts_init, 0); // ts_init was Some(0)
2153    }
2154
2155    #[rstest]
2156    fn test_decode_definition_msg() {
2157        let path = test_data_path().join("test_data.definition.dbn.zst");
2158        let mut dbn_stream = Decoder::from_zstd_file(path)
2159            .unwrap()
2160            .decode_stream::<dbn::InstrumentDefMsg>();
2161        let msg = dbn_stream.next().unwrap().unwrap();
2162
2163        let instrument_id = InstrumentId::from("ESM4.GLBX");
2164        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2165
2166        assert!(result.is_ok());
2167        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2168    }
2169
2170    #[rstest]
2171    fn test_decode_status_msg() {
2172        let path = test_data_path().join("test_data.status.dbn.zst");
2173        let mut dbn_stream = Decoder::from_zstd_file(path)
2174            .unwrap()
2175            .decode_stream::<dbn::StatusMsg>();
2176        let msg = dbn_stream.next().unwrap().unwrap();
2177
2178        let instrument_id = InstrumentId::from("ESM4.GLBX");
2179        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2180
2181        assert_eq!(status.instrument_id, instrument_id);
2182        assert_eq!(status.action, MarketStatusAction::Trading);
2183        assert_eq!(status.ts_event, msg.hd.ts_event);
2184        assert_eq!(status.ts_init, 0);
2185        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2186        assert_eq!(status.trading_event, None);
2187        assert_eq!(status.is_trading, Some(true));
2188        assert_eq!(status.is_quoting, Some(true));
2189        assert_eq!(status.is_short_sell_restricted, None);
2190    }
2191
2192    #[rstest]
2193    fn test_decode_imbalance_msg() {
2194        let path = test_data_path().join("test_data.imbalance.dbn.zst");
2195        let mut dbn_stream = Decoder::from_zstd_file(path)
2196            .unwrap()
2197            .decode_stream::<dbn::ImbalanceMsg>();
2198        let msg = dbn_stream.next().unwrap().unwrap();
2199
2200        let instrument_id = InstrumentId::from("ESM4.GLBX");
2201        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2202
2203        assert_eq!(imbalance.instrument_id, instrument_id);
2204        assert_eq!(imbalance.ref_price, Price::from("229.43"));
2205        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2206        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2207        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2208        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2209        assert_eq!(imbalance.side, OrderSide::Buy);
2210        assert_eq!(imbalance.significant_imbalance, 126);
2211        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2212        assert_eq!(imbalance.ts_recv, msg.ts_recv);
2213        assert_eq!(imbalance.ts_init, 0);
2214    }
2215
2216    #[rstest]
2217    fn test_decode_statistics_msg() {
2218        let path = test_data_path().join("test_data.statistics.dbn.zst");
2219        let mut dbn_stream = Decoder::from_zstd_file(path)
2220            .unwrap()
2221            .decode_stream::<dbn::StatMsg>();
2222        let msg = dbn_stream.next().unwrap().unwrap();
2223
2224        let instrument_id = InstrumentId::from("ESM4.GLBX");
2225        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2226
2227        assert_eq!(statistics.instrument_id, instrument_id);
2228        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2229        assert_eq!(
2230            statistics.update_action,
2231            DatabentoStatisticUpdateAction::Added
2232        );
2233        assert_eq!(statistics.price, Some(Price::from("100.00")));
2234        assert_eq!(statistics.quantity, None);
2235        assert_eq!(statistics.channel_id, 13);
2236        assert_eq!(statistics.stat_flags, 255);
2237        assert_eq!(statistics.sequence, 2);
2238        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2239        assert_eq!(statistics.ts_in_delta, 26961);
2240        assert_eq!(statistics.ts_event, msg.hd.ts_event);
2241        assert_eq!(statistics.ts_recv, msg.ts_recv);
2242        assert_eq!(statistics.ts_init, 0);
2243    }
2244
2245    #[rstest]
2246    fn test_decode_cmbp1_msg() {
2247        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2248        let mut dbn_stream = Decoder::from_zstd_file(path)
2249            .unwrap()
2250            .decode_stream::<dbn::Cmbp1Msg>();
2251        let msg = dbn_stream.next().unwrap().unwrap();
2252
2253        let instrument_id = InstrumentId::from("ESM4.GLBX");
2254        let (maybe_quote, trade) =
2255            decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2256        let quote = maybe_quote.expect("Expected valid quote");
2257
2258        assert_eq!(quote.instrument_id, instrument_id);
2259        assert!(quote.bid_price.raw > 0);
2260        assert!(quote.ask_price.raw > 0);
2261        assert!(quote.bid_size.raw > 0);
2262        assert!(quote.ask_size.raw > 0);
2263        assert_eq!(quote.ts_event, msg.ts_recv);
2264        assert_eq!(quote.ts_init, 0);
2265
2266        // Check if trade is present based on action
2267        if is_trade_msg(msg.action) {
2268            assert!(trade.is_some());
2269            let trade = trade.unwrap();
2270            assert_eq!(trade.instrument_id, instrument_id);
2271        } else {
2272            assert!(trade.is_none());
2273        }
2274    }
2275
2276    #[rstest]
2277    fn test_decode_cbbo_1s_msg() {
2278        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2279        let mut dbn_stream = Decoder::from_zstd_file(path)
2280            .unwrap()
2281            .decode_stream::<dbn::CbboMsg>();
2282        let msg = dbn_stream.next().unwrap().unwrap();
2283
2284        let instrument_id = InstrumentId::from("ESM4.GLBX");
2285        let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2286        let quote = maybe_quote.expect("Expected valid quote");
2287
2288        assert_eq!(quote.instrument_id, instrument_id);
2289        assert!(quote.bid_price.raw > 0);
2290        assert!(quote.ask_price.raw > 0);
2291        assert!(quote.bid_size.raw > 0);
2292        assert!(quote.ask_size.raw > 0);
2293        assert_eq!(quote.ts_event, msg.ts_recv);
2294        assert_eq!(quote.ts_init, 0);
2295    }
2296
2297    #[rstest]
2298    fn test_decode_mbp10_msg_with_all_levels() {
2299        let mut msg = dbn::Mbp10Msg::default();
2300        for i in 0..10 {
2301            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2302            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2303            msg.levels[i].bid_sz = 10 + i as u32;
2304            msg.levels[i].ask_sz = 10 + i as u32;
2305            msg.levels[i].bid_ct = 1 + i as u32;
2306            msg.levels[i].ask_ct = 1 + i as u32;
2307        }
2308        msg.ts_recv = 1_609_160_400_000_704_060;
2309
2310        let instrument_id = InstrumentId::from("TEST.VENUE");
2311        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2312
2313        assert!(result.is_ok());
2314        let depth = result.unwrap();
2315        assert_eq!(depth.bids.len(), 10);
2316        assert_eq!(depth.asks.len(), 10);
2317        assert_eq!(depth.bid_counts.len(), 10);
2318        assert_eq!(depth.ask_counts.len(), 10);
2319    }
2320
2321    #[rstest]
2322    fn test_array_conversion_error_handling() {
2323        let mut bids = Vec::new();
2324        let mut asks = Vec::new();
2325
2326        // Intentionally create fewer than DEPTH10_LEN elements
2327        for i in 0..5 {
2328            bids.push(BookOrder::new(
2329                OrderSide::Buy,
2330                Price::from(format!("{}.00", 100 - i)),
2331                Quantity::from(10),
2332                i as u64,
2333            ));
2334            asks.push(BookOrder::new(
2335                OrderSide::Sell,
2336                Price::from(format!("{}.00", 101 + i)),
2337                Quantity::from(10),
2338                i as u64,
2339            ));
2340        }
2341
2342        let result: Result<[BookOrder; DEPTH10_LEN], _> =
2343            bids.try_into().map_err(|v: Vec<BookOrder>| {
2344                anyhow::anyhow!(
2345                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2346                    v.len()
2347                )
2348            });
2349        assert!(result.is_err());
2350        assert!(
2351            result
2352                .unwrap_err()
2353                .to_string()
2354                .contains("Expected exactly 10 bid levels, received 5")
2355        );
2356    }
2357
2358    #[rstest]
2359    fn test_decode_tcbbo_msg() {
2360        // Use cbbo-1s as base since cbbo.dbn.zst was invalid
2361        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2362        let mut dbn_stream = Decoder::from_zstd_file(path)
2363            .unwrap()
2364            .decode_stream::<dbn::CbboMsg>();
2365        let msg = dbn_stream.next().unwrap().unwrap();
2366
2367        // Simulate TCBBO by adding trade data
2368        let mut tcbbo_msg = msg.clone();
2369        tcbbo_msg.price = 3702500000000;
2370        tcbbo_msg.size = 10;
2371
2372        let instrument_id = InstrumentId::from("ESM4.GLBX");
2373        let (maybe_quote, trade) =
2374            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2375        let quote = maybe_quote.expect("Expected valid quote");
2376
2377        assert_eq!(quote.instrument_id, instrument_id);
2378        assert!(quote.bid_price.raw > 0);
2379        assert!(quote.ask_price.raw > 0);
2380        assert!(quote.bid_size.raw > 0);
2381        assert!(quote.ask_size.raw > 0);
2382        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2383        assert_eq!(quote.ts_init, 0);
2384
2385        assert_eq!(trade.instrument_id, instrument_id);
2386        assert_eq!(trade.price, Price::from("3702.50"));
2387        assert_eq!(trade.size, Quantity::from(10));
2388        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2389        assert_eq!(trade.ts_init, 0);
2390    }
2391
2392    #[rstest]
2393    fn test_decode_bar_type() {
2394        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2395        let instrument_id = InstrumentId::from("ESM4.GLBX");
2396
2397        // Test 1-second bar
2398        msg.hd.rtype = 32;
2399        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2400        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2401
2402        // Test 1-minute bar
2403        msg.hd.rtype = 33;
2404        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2405        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2406
2407        // Test 1-hour bar
2408        msg.hd.rtype = 34;
2409        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2410        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2411
2412        // Test 1-day bar
2413        msg.hd.rtype = 35;
2414        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2415        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2416
2417        // Test unsupported rtype
2418        msg.hd.rtype = 99;
2419        let result = decode_bar_type(&msg, instrument_id);
2420        assert!(result.is_err());
2421    }
2422
2423    #[rstest]
2424    fn test_decode_ts_event_adjustment() {
2425        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2426
2427        // Test 1-second bar adjustment
2428        msg.hd.rtype = 32;
2429        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2430        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2431
2432        // Test 1-minute bar adjustment
2433        msg.hd.rtype = 33;
2434        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2435        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2436
2437        // Test 1-hour bar adjustment
2438        msg.hd.rtype = 34;
2439        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2440        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2441
2442        // Test 1-day bar adjustment
2443        msg.hd.rtype = 35;
2444        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2445        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2446
2447        // Test eod bar adjustment (same as 1d)
2448        msg.hd.rtype = 36;
2449        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2450        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2451
2452        // Test unsupported rtype
2453        msg.hd.rtype = 99;
2454        let result = decode_ts_event_adjustment(&msg);
2455        assert!(result.is_err());
2456    }
2457
2458    #[rstest]
2459    fn test_decode_record() {
2460        // Test with MBO message
2461        let path = test_data_path().join("test_data.mbo.dbn.zst");
2462        let decoder = Decoder::from_zstd_file(path).unwrap();
2463        let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2464        let msg = dbn_stream.next().unwrap().unwrap();
2465
2466        let record_ref = dbn::RecordRef::from(msg);
2467        let instrument_id = InstrumentId::from("ESM4.GLBX");
2468
2469        let (data1, data2) =
2470            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2471
2472        assert!(data1.is_some());
2473        assert!(data2.is_none());
2474
2475        // Test with Trade message
2476        let path = test_data_path().join("test_data.trades.dbn.zst");
2477        let decoder = Decoder::from_zstd_file(path).unwrap();
2478        let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2479        let msg = dbn_stream.next().unwrap().unwrap();
2480
2481        let record_ref = dbn::RecordRef::from(msg);
2482
2483        let (data1, data2) =
2484            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2485
2486        assert!(data1.is_some());
2487        assert!(data2.is_none());
2488        assert!(matches!(data1.unwrap(), Data::Trade(_)));
2489    }
2490}