Skip to main content

nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Databento message decoding functions.
17//!
18//! # Sentinel Values
19//!
20//! Databento uses sentinel values to represent undefined/null fields:
21//!
22//! | Sentinel          | Value      | Usage                       |
23//! |-------------------|------------|-----------------------------|
24//! | `UNDEF_PRICE`     | `i64::MAX` | Undefined price fields.     |
25//! | `UNDEF_TIMESTAMP` | `u64::MAX` | Undefined timestamp fields. |
26//!
27//! # Fields Potentially Undefined
28//!
29//! According to Databento documentation, the following fields can contain sentinel values:
30//!
31//! | Message Type       | Field                          | Handling                           |
32//! |--------------------|--------------------------------|------------------------------------|
33//! | `MboMsg`           | `price`                        | Passed through as `PRICE_UNDEF`.   |
34//! | `TradeMsg`         | `price`                        | Passed through as `PRICE_UNDEF`.   |
35//! | `OhlcvMsg`         | `open`, `high`, `low`, `close` | Passed through as `PRICE_UNDEF`.   |
36//! | `Mbp1Msg`          | `bid_px`, `ask_px`             | Quote skipped if either undefined. |
37//! | `InstrumentDefMsg` | `activation`                   | Defaults to 0 (epoch).             |
38//! | `InstrumentDefMsg` | `expiration`                   | Returns error if undefined.        |
39//! | `InstrumentDefMsg` | `strike_price`                 | Returns error if undefined.        |
40//!
41//! # References
42//!
43//! - [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)
44//! - [`UNDEF_TIMESTAMP`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_TIMESTAMP.html)
45//! - [Databento DBN Schema](https://databento.com/docs/schemas)
46
47use std::{ffi::c_char, num::NonZeroUsize};
48
49use databento::dbn::{self};
50use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
51use nautilus_model::{
52    data::{
53        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
54        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
55    },
56    enums::{
57        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
58        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
59    },
60    identifiers::{InstrumentId, Symbol, TradeId},
61    instruments::{
62        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
63    },
64    types::{
65        Currency, Price, Quantity,
66        price::{PRICE_UNDEF, decode_raw_price_i64},
67    },
68};
69use ustr::Ustr;
70
71use super::{
72    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
73    types::{DatabentoImbalance, DatabentoStatistics},
74};
75
76// SAFETY: Known valid value
77const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
78
79const BAR_SPEC_1S: BarSpecification = BarSpecification {
80    step: STEP_ONE,
81    aggregation: BarAggregation::Second,
82    price_type: PriceType::Last,
83};
84const BAR_SPEC_1M: BarSpecification = BarSpecification {
85    step: STEP_ONE,
86    aggregation: BarAggregation::Minute,
87    price_type: PriceType::Last,
88};
89const BAR_SPEC_1H: BarSpecification = BarSpecification {
90    step: STEP_ONE,
91    aggregation: BarAggregation::Hour,
92    price_type: PriceType::Last,
93};
94const BAR_SPEC_1D: BarSpecification = BarSpecification {
95    step: STEP_ONE,
96    aggregation: BarAggregation::Day,
97    price_type: PriceType::Last,
98};
99
100const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
101const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
102const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
103const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
104
105#[must_use]
106pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
107    match c as u8 as char {
108        'Y' => Some(true),
109        'N' => Some(false),
110        _ => None,
111    }
112}
113
114#[must_use]
115pub const fn parse_order_side(c: c_char) -> OrderSide {
116    match c as u8 as char {
117        'A' => OrderSide::Sell,
118        'B' => OrderSide::Buy,
119        _ => OrderSide::NoOrderSide,
120    }
121}
122
123#[must_use]
124pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
125    match c as u8 as char {
126        'A' => AggressorSide::Seller,
127        'B' => AggressorSide::Buyer,
128        _ => AggressorSide::NoAggressor,
129    }
130}
131
132/// Parses a Databento book action character into a `BookAction` enum.
133///
134/// # Errors
135///
136/// Returns an error if `c` is not a valid `BookAction` character.
137pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
138    match c as u8 as char {
139        'A' => Ok(BookAction::Add),
140        'C' => Ok(BookAction::Delete),
141        'F' => Ok(BookAction::Update),
142        'M' => Ok(BookAction::Update),
143        'R' => Ok(BookAction::Clear),
144        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
145    }
146}
147
148/// Parses a Databento option kind character into an `OptionKind` enum.
149///
150/// # Errors
151///
152/// Returns an error if `c` is not a valid `OptionKind` character.
153pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
154    match c as u8 as char {
155        'C' => Ok(OptionKind::Call),
156        'P' => Ok(OptionKind::Put),
157        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
158    }
159}
160
161fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
162    match value {
163        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
164            log::warn!("Unknown currency code '{value}', defaulting to USD");
165            Currency::USD()
166        }),
167        Ok(_) => Currency::USD(),
168        Err(e) => {
169            log::error!("Error parsing currency: {e}");
170            Currency::USD()
171        }
172    }
173}
174
175/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
176///
177/// Returns `(None, None)` if `value` has fewer than 3 characters.
178#[must_use]
179pub fn parse_cfi_iso10926(value: &str) -> (Option<AssetClass>, Option<InstrumentClass>) {
180    let chars: Vec<char> = value.chars().collect();
181    if chars.len() < 3 {
182        return (None, None);
183    }
184
185    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
186    let cfi_category = chars[0];
187    let cfi_group = chars[1];
188    let cfi_attribute1 = chars[2];
189    // let cfi_attribute2 = value[3];
190    // let cfi_attribute3 = value[4];
191    // let cfi_attribute4 = value[5];
192
193    let mut asset_class = match cfi_category {
194        'D' => Some(AssetClass::Debt),
195        'E' => Some(AssetClass::Equity),
196        'S' => None,
197        _ => None,
198    };
199
200    let instrument_class = match cfi_group {
201        'I' => Some(InstrumentClass::Future),
202        _ => None,
203    };
204
205    if cfi_attribute1 == 'I' {
206        asset_class = Some(AssetClass::Index);
207    }
208
209    (asset_class, instrument_class)
210}
211
212fn decode_underlying(underlying_str: &str, symbol: &Symbol) -> Ustr {
213    if underlying_str.is_empty() {
214        // Fall back to first whitespace-separated token from symbol
215        symbol
216            .as_str()
217            .split_whitespace()
218            .next()
219            .map_or_else(|| symbol.inner(), Ustr::from)
220    } else {
221        Ustr::from(underlying_str)
222    }
223}
224
225/// Parses a Databento status reason code into a human-readable string.
226///
227/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
228///
229/// # Errors
230///
231/// Returns an error if `value` is an invalid status reason code.
232pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
233    let value_str = match value {
234        0 => return Ok(None),
235        1 => "Scheduled",
236        2 => "Surveillance intervention",
237        3 => "Market event",
238        4 => "Instrument activation",
239        5 => "Instrument expiration",
240        6 => "Recovery in process",
241        10 => "Regulatory",
242        11 => "Administrative",
243        12 => "Non-compliance",
244        13 => "Filings not current",
245        14 => "SEC trading suspension",
246        15 => "New issue",
247        16 => "Issue available",
248        17 => "Issues reviewed",
249        18 => "Filing requirements satisfied",
250        30 => "News pending",
251        31 => "News released",
252        32 => "News and resumption times",
253        33 => "News not forthcoming",
254        40 => "Order imbalance",
255        50 => "LULD pause",
256        60 => "Operational",
257        70 => "Additional information requested",
258        80 => "Merger effective",
259        90 => "ETF",
260        100 => "Corporate action",
261        110 => "New Security offering",
262        120 => "Market wide halt level 1",
263        121 => "Market wide halt level 2",
264        122 => "Market wide halt level 3",
265        123 => "Market wide halt carryover",
266        124 => "Market wide halt resumption",
267        130 => "Quotation not available",
268        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
269    };
270
271    Ok(Some(Ustr::from(value_str)))
272}
273
274/// Parses a Databento status trading event code into a human-readable string.
275///
276/// # Errors
277///
278/// Returns an error if `value` is an invalid status trading event code.
279pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
280    let value_str = match value {
281        0 => return Ok(None),
282        1 => "No cancel",
283        2 => "Change trading session",
284        3 => "Implied matching on",
285        4 => "Implied matching off",
286        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
287    };
288
289    Ok(Some(Ustr::from(value_str)))
290}
291
292/// Decodes a price, returning an error if undefined.
293///
294/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
295/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
296///
297/// # Errors
298///
299/// Returns an error if `value` is `i64::MAX` (undefined).
300#[inline(always)]
301pub fn decode_price(value: i64, precision: u8, field_name: &str) -> anyhow::Result<Price> {
302    if value == i64::MAX {
303        anyhow::bail!("Missing required price for `{field_name}`")
304    } else {
305        Ok(Price::from_raw(decode_raw_price_i64(value), precision))
306    }
307}
308
309/// Decodes a price from the given optional value, expressed in units of 1e-9.
310///
311/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
312/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
313#[inline(always)]
314#[must_use]
315pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
316    if value == i64::MAX {
317        None
318    } else {
319        Some(Price::from_raw(decode_raw_price_i64(value), precision))
320    }
321}
322
323/// Decodes a price, returning `PRICE_UNDEF` if the value is undefined.
324///
325/// This is used for market data where undefined prices should pass through
326/// as `PRICE_UNDEF` rather than causing an error.
327#[inline(always)]
328#[must_use]
329pub fn decode_price_or_undef(value: i64, precision: u8) -> Price {
330    if value == i64::MAX {
331        Price::from_raw(PRICE_UNDEF, 0)
332    } else {
333        Price::from_raw(decode_raw_price_i64(value), precision)
334    }
335}
336
337/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
338#[inline(always)]
339#[must_use]
340pub fn decode_price_increment(value: i64, precision: u8) -> Price {
341    match value {
342        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
343        _ => Price::from_raw(decode_raw_price_i64(value), precision),
344    }
345}
346
347/// Decodes a quantity from the given value, expressed in standard whole-number units.
348#[inline(always)]
349#[must_use]
350pub fn decode_quantity(value: u64) -> Quantity {
351    Quantity::from(value)
352}
353
354/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
355#[inline(always)]
356#[must_use]
357pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
358    match value {
359        i64::MAX => None,
360        _ => Some(Quantity::from(value)),
361    }
362}
363
364/// Decodes a timestamp, returning an error if undefined.
365///
366/// Databento uses `u64::MAX` as `UNDEF_TIMESTAMP` sentinel for null timestamps.
367///
368/// # Errors
369///
370/// Returns an error if `value` is `u64::MAX` (undefined).
371#[inline(always)]
372pub fn decode_timestamp(value: u64, field_name: &str) -> anyhow::Result<UnixNanos> {
373    if value == dbn::UNDEF_TIMESTAMP {
374        anyhow::bail!("Missing required timestamp for `{field_name}`")
375    } else {
376        Ok(UnixNanos::from(value))
377    }
378}
379
380/// Decodes a timestamp from the given optional value.
381///
382/// Databento uses `u64::MAX` as `UNDEF_TIMESTAMP` sentinel for null timestamps.
383#[inline(always)]
384#[must_use]
385pub fn decode_optional_timestamp(value: u64) -> Option<UnixNanos> {
386    if value == dbn::UNDEF_TIMESTAMP {
387        None
388    } else {
389        Some(UnixNanos::from(value))
390    }
391}
392
393/// Decodes a multiplier from the given value, expressed in units of 1e-9.
394/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
395///
396/// # Errors
397///
398/// Returns an error if value is negative (invalid multiplier).
399pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
400    const SCALE: u128 = 1_000_000_000;
401
402    match value {
403        0 | i64::MAX => Ok(Quantity::from(1)),
404        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
405        v => {
406            // Work in integers: v is fixed-point with 9 fractional digits.
407            // Build a canonical decimal string without floating-point.
408            let abs = v as u128;
409            let int_part = abs / SCALE;
410            let frac_part = abs % SCALE;
411
412            // Format fractional part with exactly 9 digits, then trim trailing zeros
413            // to keep a canonical representation.
414            if frac_part == 0 {
415                // Pure integer
416                Ok(Quantity::from(int_part as u64))
417            } else {
418                let mut frac_str = format!("{frac_part:09}");
419                while frac_str.ends_with('0') {
420                    frac_str.pop();
421                }
422                let s = format!("{int_part}.{frac_str}");
423                Ok(Quantity::from(s))
424            }
425        }
426    }
427}
428
429/// Decodes a lot size from the given value, expressed in standard whole-number units.
430#[inline(always)]
431#[must_use]
432pub fn decode_lot_size(value: i32) -> Quantity {
433    match value {
434        0 | i32::MAX => Quantity::from(1),
435        value => Quantity::from(value),
436    }
437}
438
439#[inline(always)]
440#[must_use]
441fn is_trade_msg(action: c_char) -> bool {
442    action as u8 as char == 'T'
443}
444
445/// Returns `true` if both bid and ask prices are defined (not `i64::MAX`).
446///
447/// Databento uses `i64::MAX` as a sentinel value for undefined/null prices.
448/// A valid quote requires both sides to be defined.
449#[inline(always)]
450#[must_use]
451fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
452    bid_px != i64::MAX && ask_px != i64::MAX
453}
454
455/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
456///
457/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
458/// whether the message represents an order book update or a trade execution.
459///
460/// # Errors
461///
462/// Returns an error if decoding the MBO message fails.
463pub fn decode_mbo_msg(
464    msg: &dbn::MboMsg,
465    instrument_id: InstrumentId,
466    price_precision: u8,
467    ts_init: Option<UnixNanos>,
468    include_trades: bool,
469) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
470    let side = parse_order_side(msg.side);
471    if is_trade_msg(msg.action) {
472        if include_trades && msg.size > 0 {
473            let price = decode_price_or_undef(msg.price, price_precision);
474            let size = decode_quantity(msg.size as u64);
475            let aggressor_side = parse_aggressor_side(msg.side);
476            let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
477            let ts_event = msg.ts_recv.into();
478            let ts_init = ts_init.unwrap_or(ts_event);
479
480            let trade = TradeTick::new(
481                instrument_id,
482                price,
483                size,
484                aggressor_side,
485                trade_id,
486                ts_event,
487                ts_init,
488            );
489            return Ok((None, Some(trade)));
490        }
491
492        return Ok((None, None));
493    }
494
495    let action = parse_book_action(msg.action)?;
496    let price = decode_price_or_undef(msg.price, price_precision);
497    let size = decode_quantity(msg.size as u64);
498    let order = BookOrder::new(side, price, size, msg.order_id);
499
500    let ts_event = msg.ts_recv.into();
501    let ts_init = ts_init.unwrap_or(ts_event);
502
503    let delta = OrderBookDelta::new(
504        instrument_id,
505        action,
506        order,
507        msg.flags.raw(),
508        msg.sequence.into(),
509        ts_event,
510        ts_init,
511    );
512
513    Ok((Some(delta), None))
514}
515
516/// Decodes a Databento Trade message into a `TradeTick`.
517///
518/// # Errors
519///
520/// Returns an error if decoding the Trade message fails.
521pub fn decode_trade_msg(
522    msg: &dbn::TradeMsg,
523    instrument_id: InstrumentId,
524    price_precision: u8,
525    ts_init: Option<UnixNanos>,
526) -> anyhow::Result<TradeTick> {
527    let ts_event = msg.ts_recv.into();
528    let ts_init = ts_init.unwrap_or(ts_event);
529
530    let trade = TradeTick::new(
531        instrument_id,
532        decode_price_or_undef(msg.price, price_precision),
533        decode_quantity(msg.size as u64),
534        parse_aggressor_side(msg.side),
535        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
536        ts_event,
537        ts_init,
538    );
539
540    Ok(trade)
541}
542
543/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
544///
545/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
546/// The trade is always returned.
547///
548/// # Errors
549///
550/// Returns an error if decoding the TBBO message fails.
551pub fn decode_tbbo_msg(
552    msg: &dbn::TbboMsg,
553    instrument_id: InstrumentId,
554    price_precision: u8,
555    ts_init: Option<UnixNanos>,
556) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
557    let top_level = &msg.levels[0];
558    let ts_event = msg.ts_recv.into();
559    let ts_init = ts_init.unwrap_or(ts_event);
560
561    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
562        Some(QuoteTick::new(
563            instrument_id,
564            decode_price_or_undef(top_level.bid_px, price_precision),
565            decode_price_or_undef(top_level.ask_px, price_precision),
566            decode_quantity(top_level.bid_sz as u64),
567            decode_quantity(top_level.ask_sz as u64),
568            ts_event,
569            ts_init,
570        ))
571    } else {
572        None
573    };
574
575    let trade = TradeTick::new(
576        instrument_id,
577        decode_price_or_undef(msg.price, price_precision),
578        decode_quantity(msg.size as u64),
579        parse_aggressor_side(msg.side),
580        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
581        ts_event,
582        ts_init,
583    );
584
585    Ok((maybe_quote, trade))
586}
587
588/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
589///
590/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
591///
592/// # Errors
593///
594/// Returns an error if decoding the MBP1 message fails.
595pub fn decode_mbp1_msg(
596    msg: &dbn::Mbp1Msg,
597    instrument_id: InstrumentId,
598    price_precision: u8,
599    ts_init: Option<UnixNanos>,
600    include_trades: bool,
601) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
602    let top_level = &msg.levels[0];
603    let ts_event = msg.ts_recv.into();
604    let ts_init = ts_init.unwrap_or(ts_event);
605
606    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
607        Some(QuoteTick::new(
608            instrument_id,
609            decode_price_or_undef(top_level.bid_px, price_precision),
610            decode_price_or_undef(top_level.ask_px, price_precision),
611            decode_quantity(top_level.bid_sz as u64),
612            decode_quantity(top_level.ask_sz as u64),
613            ts_event,
614            ts_init,
615        ))
616    } else {
617        None
618    };
619
620    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
621        Some(TradeTick::new(
622            instrument_id,
623            decode_price_or_undef(msg.price, price_precision),
624            decode_quantity(msg.size as u64),
625            parse_aggressor_side(msg.side),
626            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
627            ts_event,
628            ts_init,
629        ))
630    } else {
631        None
632    };
633
634    Ok((maybe_quote, maybe_trade))
635}
636
637/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
638///
639/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
640///
641/// # Errors
642///
643/// Returns an error if decoding the BBO message fails.
644pub fn decode_bbo_msg(
645    msg: &dbn::BboMsg,
646    instrument_id: InstrumentId,
647    price_precision: u8,
648    ts_init: Option<UnixNanos>,
649) -> anyhow::Result<Option<QuoteTick>> {
650    let top_level = &msg.levels[0];
651    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
652        return Ok(None);
653    }
654
655    let ts_event = msg.ts_recv.into();
656    let ts_init = ts_init.unwrap_or(ts_event);
657
658    let quote = QuoteTick::new(
659        instrument_id,
660        decode_price_or_undef(top_level.bid_px, price_precision),
661        decode_price_or_undef(top_level.ask_px, price_precision),
662        decode_quantity(top_level.bid_sz as u64),
663        decode_quantity(top_level.ask_sz as u64),
664        ts_event,
665        ts_init,
666    );
667
668    Ok(Some(quote))
669}
670
671/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
672///
673/// # Errors
674///
675/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
676pub fn decode_mbp10_msg(
677    msg: &dbn::Mbp10Msg,
678    instrument_id: InstrumentId,
679    price_precision: u8,
680    ts_init: Option<UnixNanos>,
681) -> anyhow::Result<OrderBookDepth10> {
682    let mut bids = Vec::with_capacity(DEPTH10_LEN);
683    let mut asks = Vec::with_capacity(DEPTH10_LEN);
684    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
685    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
686
687    for level in &msg.levels {
688        let bid_order = BookOrder::new(
689            OrderSide::Buy,
690            decode_price_or_undef(level.bid_px, price_precision),
691            decode_quantity(level.bid_sz as u64),
692            0,
693        );
694
695        let ask_order = BookOrder::new(
696            OrderSide::Sell,
697            decode_price_or_undef(level.ask_px, price_precision),
698            decode_quantity(level.ask_sz as u64),
699            0,
700        );
701
702        bids.push(bid_order);
703        asks.push(ask_order);
704        bid_counts.push(level.bid_ct);
705        ask_counts.push(level.ask_ct);
706    }
707
708    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
709        anyhow::anyhow!(
710            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
711            v.len()
712        )
713    })?;
714
715    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
716        anyhow::anyhow!(
717            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
718            v.len()
719        )
720    })?;
721
722    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
723        anyhow::anyhow!(
724            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
725            v.len()
726        )
727    })?;
728
729    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
730        anyhow::anyhow!(
731            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
732            v.len()
733        )
734    })?;
735
736    let ts_event = msg.ts_recv.into();
737    let ts_init = ts_init.unwrap_or(ts_event);
738
739    let depth = OrderBookDepth10::new(
740        instrument_id,
741        bids,
742        asks,
743        bid_counts,
744        ask_counts,
745        msg.flags.raw(),
746        msg.sequence.into(),
747        ts_event,
748        ts_init,
749    );
750
751    Ok(depth)
752}
753
754/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
755///
756/// Returns a tuple containing an optional `QuoteTick` and an optional `TradeTick`.
757/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
758///
759/// # Errors
760///
761/// Returns an error if decoding the CMBP1 message fails.
762pub fn decode_cmbp1_msg(
763    msg: &dbn::Cmbp1Msg,
764    instrument_id: InstrumentId,
765    price_precision: u8,
766    ts_init: Option<UnixNanos>,
767    include_trades: bool,
768) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
769    let top_level = &msg.levels[0];
770    let ts_event = msg.ts_recv.into();
771    let ts_init = ts_init.unwrap_or(ts_event);
772
773    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
774        Some(QuoteTick::new(
775            instrument_id,
776            decode_price_or_undef(top_level.bid_px, price_precision),
777            decode_price_or_undef(top_level.ask_px, price_precision),
778            decode_quantity(top_level.bid_sz as u64),
779            decode_quantity(top_level.ask_sz as u64),
780            ts_event,
781            ts_init,
782        ))
783    } else {
784        None
785    };
786
787    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
788        // Use UUID4 for trade ID as CMBP1 doesn't have a sequence field
789        Some(TradeTick::new(
790            instrument_id,
791            decode_price_or_undef(msg.price, price_precision),
792            decode_quantity(msg.size as u64),
793            parse_aggressor_side(msg.side),
794            TradeId::new(UUID4::new().as_str()),
795            ts_event,
796            ts_init,
797        ))
798    } else {
799        None
800    };
801
802    Ok((maybe_quote, maybe_trade))
803}
804
805/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
806///
807/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
808///
809/// # Errors
810///
811/// Returns an error if decoding the CBBO message fails.
812pub fn decode_cbbo_msg(
813    msg: &dbn::CbboMsg,
814    instrument_id: InstrumentId,
815    price_precision: u8,
816    ts_init: Option<UnixNanos>,
817) -> anyhow::Result<Option<QuoteTick>> {
818    let top_level = &msg.levels[0];
819    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
820        return Ok(None);
821    }
822
823    let ts_event = msg.ts_recv.into();
824    let ts_init = ts_init.unwrap_or(ts_event);
825
826    let quote = QuoteTick::new(
827        instrument_id,
828        decode_price_or_undef(top_level.bid_px, price_precision),
829        decode_price_or_undef(top_level.ask_px, price_precision),
830        decode_quantity(top_level.bid_sz as u64),
831        decode_quantity(top_level.ask_sz as u64),
832        ts_event,
833        ts_init,
834    );
835
836    Ok(Some(quote))
837}
838
839/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
840///
841/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
842/// The trade is always returned.
843///
844/// # Errors
845///
846/// Returns an error if decoding the TCBBO message fails.
847pub fn decode_tcbbo_msg(
848    msg: &dbn::CbboMsg,
849    instrument_id: InstrumentId,
850    price_precision: u8,
851    ts_init: Option<UnixNanos>,
852) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
853    let top_level = &msg.levels[0];
854    let ts_event = msg.ts_recv.into();
855    let ts_init = ts_init.unwrap_or(ts_event);
856
857    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
858        Some(QuoteTick::new(
859            instrument_id,
860            decode_price_or_undef(top_level.bid_px, price_precision),
861            decode_price_or_undef(top_level.ask_px, price_precision),
862            decode_quantity(top_level.bid_sz as u64),
863            decode_quantity(top_level.ask_sz as u64),
864            ts_event,
865            ts_init,
866        ))
867    } else {
868        None
869    };
870
871    // Use UUID4 for trade ID as TCBBO doesn't have a sequence field
872    let trade = TradeTick::new(
873        instrument_id,
874        decode_price_or_undef(msg.price, price_precision),
875        decode_quantity(msg.size as u64),
876        parse_aggressor_side(msg.side),
877        TradeId::new(UUID4::new().as_str()),
878        ts_event,
879        ts_init,
880    );
881
882    Ok((maybe_quote, trade))
883}
884
885/// # Errors
886///
887/// Returns an error if `rtype` is not a supported bar aggregation.
888pub fn decode_bar_type(
889    msg: &dbn::OhlcvMsg,
890    instrument_id: InstrumentId,
891) -> anyhow::Result<BarType> {
892    let bar_type = match msg.hd.rtype {
893        32 => {
894            // ohlcv-1s
895            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
896        }
897        33 => {
898            // ohlcv-1m
899            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
900        }
901        34 => {
902            // ohlcv-1h
903            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
904        }
905        35 => {
906            // ohlcv-1d
907            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
908        }
909        36 => {
910            // ohlcv-eod
911            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
912        }
913        _ => anyhow::bail!(
914            "`rtype` is not a supported bar aggregation, was {}",
915            msg.hd.rtype
916        ),
917    };
918
919    Ok(bar_type)
920}
921
922/// # Errors
923///
924/// Returns an error if `rtype` is not a supported bar aggregation.
925pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
926    let adjustment = match msg.hd.rtype {
927        32 => {
928            // ohlcv-1s
929            BAR_CLOSE_ADJUSTMENT_1S
930        }
931        33 => {
932            // ohlcv-1m
933            BAR_CLOSE_ADJUSTMENT_1M
934        }
935        34 => {
936            // ohlcv-1h
937            BAR_CLOSE_ADJUSTMENT_1H
938        }
939        35 | 36 => {
940            // ohlcv-1d and ohlcv-eod
941            BAR_CLOSE_ADJUSTMENT_1D
942        }
943        _ => anyhow::bail!(
944            "`rtype` is not a supported bar aggregation, was {}",
945            msg.hd.rtype
946        ),
947    };
948
949    Ok(adjustment.into())
950}
951
952/// # Errors
953///
954/// Returns an error if decoding the OHLCV message fails.
955pub fn decode_ohlcv_msg(
956    msg: &dbn::OhlcvMsg,
957    instrument_id: InstrumentId,
958    price_precision: u8,
959    ts_init: Option<UnixNanos>,
960    timestamp_on_close: bool,
961) -> anyhow::Result<Bar> {
962    let bar_type = decode_bar_type(msg, instrument_id)?;
963    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
964
965    let ts_event_raw = msg.hd.ts_event.into();
966    let ts_close = ts_event_raw + ts_event_adjustment;
967    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
968
969    let ts_event = if timestamp_on_close {
970        ts_close
971    } else {
972        ts_event_raw
973    };
974
975    let bar = Bar::new(
976        bar_type,
977        decode_price_or_undef(msg.open, price_precision),
978        decode_price_or_undef(msg.high, price_precision),
979        decode_price_or_undef(msg.low, price_precision),
980        decode_price_or_undef(msg.close, price_precision),
981        decode_quantity(msg.volume),
982        ts_event,
983        ts_init,
984    );
985
986    Ok(bar)
987}
988
989/// Decodes a Databento status message into an `InstrumentStatus` event.
990///
991/// # Errors
992///
993/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
994pub fn decode_status_msg(
995    msg: &dbn::StatusMsg,
996    instrument_id: InstrumentId,
997    ts_init: Option<UnixNanos>,
998) -> anyhow::Result<InstrumentStatus> {
999    let ts_event = msg.hd.ts_event.into();
1000    let ts_init = ts_init.unwrap_or(ts_event);
1001
1002    let action = MarketStatusAction::from_u16(msg.action)
1003        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
1004
1005    let status = InstrumentStatus::new(
1006        instrument_id,
1007        action,
1008        ts_event,
1009        ts_init,
1010        parse_status_reason(msg.reason)?,
1011        parse_status_trading_event(msg.trading_event)?,
1012        parse_optional_bool(msg.is_trading),
1013        parse_optional_bool(msg.is_quoting),
1014        parse_optional_bool(msg.is_short_sell_restricted),
1015    );
1016
1017    Ok(status)
1018}
1019
1020/// # Errors
1021///
1022/// Returns an error if decoding the record type fails or encounters unsupported message.
1023pub fn decode_record(
1024    record: &dbn::RecordRef,
1025    instrument_id: InstrumentId,
1026    price_precision: u8,
1027    ts_init: Option<UnixNanos>,
1028    include_trades: bool,
1029    bars_timestamp_on_close: bool,
1030) -> anyhow::Result<(Option<Data>, Option<Data>)> {
1031    // Note: TBBO and TCBBO messages provide both quotes and trades.
1032    // TBBO is handled explicitly below, while TCBBO is handled by
1033    // the CbboMsg branch based on whether it has trade data.
1034    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
1035        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1036        let result = decode_mbo_msg(
1037            msg,
1038            instrument_id,
1039            price_precision,
1040            Some(ts_init),
1041            include_trades,
1042        )?;
1043        match result {
1044            (Some(delta), None) => (Some(Data::Delta(delta)), None),
1045            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
1046            (None, None) => (None, None),
1047            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
1048        }
1049    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
1050        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1051        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1052        (Some(Data::Trade(trade)), None)
1053    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
1054        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1055        let (maybe_quote, maybe_trade) = decode_mbp1_msg(
1056            msg,
1057            instrument_id,
1058            price_precision,
1059            Some(ts_init),
1060            include_trades,
1061        )?;
1062        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1063    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
1064        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1065        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1066        (maybe_quote.map(Data::Quote), None)
1067    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
1068        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1069        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1070        (maybe_quote.map(Data::Quote), None)
1071    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
1072        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1073        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1074        (Some(Data::from(depth)), None)
1075    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1076        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
1077        // it will be set correctly in decode_ohlcv_msg instead
1078        let bar = decode_ohlcv_msg(
1079            msg,
1080            instrument_id,
1081            price_precision,
1082            ts_init,
1083            bars_timestamp_on_close,
1084        )?;
1085        (Some(Data::Bar(bar)), None)
1086    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1087        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1088        let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1089            msg,
1090            instrument_id,
1091            price_precision,
1092            Some(ts_init),
1093            include_trades,
1094        )?;
1095        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1096    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1097        // TBBO always has a trade, quote may be skipped if prices undefined
1098        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1099        let (maybe_quote, trade) =
1100            decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1101        (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1102    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1103        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
1104        if msg.price != i64::MAX && msg.size > 0 {
1105            // TCBBO - has a trade, quote may be skipped if prices undefined
1106            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1107            let (maybe_quote, trade) =
1108                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1109            (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1110        } else {
1111            // Regular CBBO - quote only (may be None if prices undefined)
1112            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1113            let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1114            (maybe_quote.map(Data::Quote), None)
1115        }
1116    } else {
1117        anyhow::bail!("DBN message type is not currently supported")
1118    };
1119
1120    Ok(result)
1121}
1122
1123const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1124    match ts_init {
1125        Some(ts_init) => ts_init,
1126        None => msg_timestamp,
1127    }
1128}
1129
1130/// # Errors
1131///
1132/// Returns an error if decoding the `InstrumentDefMsg` fails.
1133pub fn decode_instrument_def_msg(
1134    msg: &dbn::InstrumentDefMsg,
1135    instrument_id: InstrumentId,
1136    ts_init: Option<UnixNanos>,
1137) -> anyhow::Result<InstrumentAny> {
1138    match msg.instrument_class as u8 as char {
1139        'K' => Ok(InstrumentAny::Equity(decode_equity(
1140            msg,
1141            instrument_id,
1142            ts_init,
1143        )?)),
1144        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1145            msg,
1146            instrument_id,
1147            ts_init,
1148        )?)),
1149        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1150            msg,
1151            instrument_id,
1152            ts_init,
1153        )?)),
1154        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1155            msg,
1156            instrument_id,
1157            ts_init,
1158        )?)),
1159        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1160            msg,
1161            instrument_id,
1162            ts_init,
1163        )?)),
1164        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1165        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1166        _ => anyhow::bail!(
1167            "Unsupported `instrument_class` '{}'",
1168            msg.instrument_class as u8 as char
1169        ),
1170    }
1171}
1172
1173/// Decodes a Databento instrument definition message into an `Equity` instrument.
1174///
1175/// # Errors
1176///
1177/// Returns an error if parsing or constructing `Equity` fails.
1178pub fn decode_equity(
1179    msg: &dbn::InstrumentDefMsg,
1180    instrument_id: InstrumentId,
1181    ts_init: Option<UnixNanos>,
1182) -> anyhow::Result<Equity> {
1183    let currency = parse_currency_or_usd_default(msg.currency());
1184    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1185    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1186    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1187    let ts_init = ts_init.unwrap_or(ts_event);
1188
1189    Ok(Equity::new(
1190        instrument_id,
1191        instrument_id.symbol,
1192        None, // No ISIN available yet
1193        currency,
1194        price_increment.precision,
1195        price_increment,
1196        Some(lot_size),
1197        None, // TBD
1198        None, // TBD
1199        None, // TBD
1200        None, // TBD
1201        None, // TBD
1202        None, // TBD
1203        None, // TBD
1204        None, // TBD
1205        ts_event,
1206        ts_init,
1207    ))
1208}
1209
1210/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1211///
1212/// # Errors
1213///
1214/// Returns an error if parsing or constructing `FuturesContract` fails.
1215pub fn decode_futures_contract(
1216    msg: &dbn::InstrumentDefMsg,
1217    instrument_id: InstrumentId,
1218    ts_init: Option<UnixNanos>,
1219) -> anyhow::Result<FuturesContract> {
1220    let currency = parse_currency_or_usd_default(msg.currency());
1221    let exchange = Ustr::from(msg.exchange()?);
1222    let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1223    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1224    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1225    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1226    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1227    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1228    let ts_init = ts_init.unwrap_or(ts_event);
1229
1230    FuturesContract::new_checked(
1231        instrument_id,
1232        instrument_id.symbol,
1233        asset_class.unwrap_or(AssetClass::Commodity),
1234        Some(exchange),
1235        underlying,
1236        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1237        decode_timestamp(msg.expiration, "expiration")?,
1238        currency,
1239        price_increment.precision,
1240        price_increment,
1241        multiplier,
1242        lot_size,
1243        None, // TBD
1244        None, // TBD
1245        None, // TBD
1246        None, // TBD
1247        None, // TBD
1248        None, // TBD
1249        None, // TBD
1250        None, // TBD
1251        ts_event,
1252        ts_init,
1253    )
1254}
1255
1256/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1257///
1258/// # Errors
1259///
1260/// Returns an error if parsing or constructing `FuturesSpread` fails.
1261pub fn decode_futures_spread(
1262    msg: &dbn::InstrumentDefMsg,
1263    instrument_id: InstrumentId,
1264    ts_init: Option<UnixNanos>,
1265) -> anyhow::Result<FuturesSpread> {
1266    let exchange = Ustr::from(msg.exchange()?);
1267    let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1268    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1269    let strategy_type = Ustr::from(msg.secsubtype()?);
1270    let currency = parse_currency_or_usd_default(msg.currency());
1271    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1272    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1273    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1274    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1275    let ts_init = ts_init.unwrap_or(ts_event);
1276
1277    FuturesSpread::new_checked(
1278        instrument_id,
1279        instrument_id.symbol,
1280        asset_class.unwrap_or(AssetClass::Commodity),
1281        Some(exchange),
1282        underlying,
1283        strategy_type,
1284        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1285        decode_timestamp(msg.expiration, "expiration")?,
1286        currency,
1287        price_increment.precision,
1288        price_increment,
1289        multiplier,
1290        lot_size,
1291        None, // TBD
1292        None, // TBD
1293        None, // TBD
1294        None, // TBD
1295        None, // TBD
1296        None, // TBD
1297        None, // TBD
1298        None, // TBD
1299        ts_event,
1300        ts_init,
1301    )
1302}
1303
1304/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1305///
1306/// # Errors
1307///
1308/// Returns an error if parsing or constructing `OptionContract` fails.
1309pub fn decode_option_contract(
1310    msg: &dbn::InstrumentDefMsg,
1311    instrument_id: InstrumentId,
1312    ts_init: Option<UnixNanos>,
1313) -> anyhow::Result<OptionContract> {
1314    let currency = parse_currency_or_usd_default(msg.currency());
1315    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1316    let exchange = Ustr::from(msg.exchange()?);
1317    let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1318    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1319        Some(AssetClass::Equity)
1320    } else {
1321        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1322        asset_class
1323    };
1324    let option_kind = parse_option_kind(msg.instrument_class)?;
1325    let strike_price = decode_price(
1326        msg.strike_price,
1327        strike_price_currency.precision,
1328        "strike_price",
1329    )?;
1330    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1331    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1332    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1333    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1334    let ts_init = ts_init.unwrap_or(ts_event);
1335
1336    OptionContract::new_checked(
1337        instrument_id,
1338        instrument_id.symbol,
1339        asset_class_opt.unwrap_or(AssetClass::Commodity),
1340        Some(exchange),
1341        underlying,
1342        option_kind,
1343        strike_price,
1344        currency,
1345        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1346        decode_timestamp(msg.expiration, "expiration")?,
1347        price_increment.precision,
1348        price_increment,
1349        multiplier,
1350        lot_size,
1351        None, // TBD
1352        None, // TBD
1353        None, // TBD
1354        None, // TBD
1355        None, // TBD
1356        None, // TBD
1357        None, // TBD
1358        None, // TBD
1359        ts_event,
1360        ts_init,
1361    )
1362}
1363
1364/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1365///
1366/// # Errors
1367///
1368/// Returns an error if parsing or constructing `OptionSpread` fails.
1369pub fn decode_option_spread(
1370    msg: &dbn::InstrumentDefMsg,
1371    instrument_id: InstrumentId,
1372    ts_init: Option<UnixNanos>,
1373) -> anyhow::Result<OptionSpread> {
1374    let exchange = Ustr::from(msg.exchange()?);
1375    let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1376    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1377        Some(AssetClass::Equity)
1378    } else {
1379        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1380        asset_class
1381    };
1382    let strategy_type = Ustr::from(msg.secsubtype()?);
1383    let currency = parse_currency_or_usd_default(msg.currency());
1384    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1385    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1386    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1387    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1388    let ts_init = ts_init.unwrap_or(ts_event);
1389
1390    OptionSpread::new_checked(
1391        instrument_id,
1392        instrument_id.symbol,
1393        asset_class_opt.unwrap_or(AssetClass::Commodity),
1394        Some(exchange),
1395        underlying,
1396        strategy_type,
1397        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1398        decode_timestamp(msg.expiration, "expiration")?,
1399        currency,
1400        price_increment.precision,
1401        price_increment,
1402        multiplier,
1403        lot_size,
1404        None, // TBD
1405        None, // TBD
1406        None, // TBD
1407        None, // TBD
1408        None, // TBD
1409        None, // TBD
1410        None, // TBD
1411        None, // TBD
1412        ts_event,
1413        ts_init,
1414    )
1415}
1416
1417/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1418///
1419/// # Errors
1420///
1421/// Returns an error if constructing `DatabentoImbalance` fails.
1422pub fn decode_imbalance_msg(
1423    msg: &dbn::ImbalanceMsg,
1424    instrument_id: InstrumentId,
1425    price_precision: u8,
1426    ts_init: Option<UnixNanos>,
1427) -> anyhow::Result<DatabentoImbalance> {
1428    let ts_event = msg.ts_recv.into();
1429    let ts_init = ts_init.unwrap_or(ts_event);
1430
1431    Ok(DatabentoImbalance::new(
1432        instrument_id,
1433        decode_price_or_undef(msg.ref_price, price_precision),
1434        decode_price_or_undef(msg.cont_book_clr_price, price_precision),
1435        decode_price_or_undef(msg.auct_interest_clr_price, price_precision),
1436        Quantity::new(f64::from(msg.paired_qty), 0),
1437        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1438        parse_order_side(msg.side),
1439        msg.significant_imbalance as c_char,
1440        msg.hd.ts_event.into(),
1441        ts_event,
1442        ts_init,
1443    ))
1444}
1445
1446/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1447///
1448/// # Errors
1449///
1450/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1451/// `msg.update_action` is not a valid enum variant.
1452pub fn decode_statistics_msg(
1453    msg: &dbn::StatMsg,
1454    instrument_id: InstrumentId,
1455    price_precision: u8,
1456    ts_init: Option<UnixNanos>,
1457) -> anyhow::Result<DatabentoStatistics> {
1458    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1459        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1460    let update_action =
1461        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1462            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1463        })?;
1464    let ts_event = msg.ts_recv.into();
1465    let ts_init = ts_init.unwrap_or(ts_event);
1466
1467    Ok(DatabentoStatistics::new(
1468        instrument_id,
1469        stat_type,
1470        update_action,
1471        decode_optional_price(msg.price, price_precision),
1472        decode_optional_quantity(msg.quantity),
1473        msg.channel_id,
1474        msg.stat_flags,
1475        msg.sequence,
1476        msg.ts_ref.into(),
1477        msg.ts_in_delta,
1478        msg.hd.ts_event.into(),
1479        ts_event,
1480        ts_init,
1481    ))
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486    use std::path::{Path, PathBuf};
1487
1488    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1489    use fallible_streaming_iterator::FallibleStreamingIterator;
1490    use nautilus_model::instruments::Instrument;
1491    use rstest::*;
1492
1493    use super::*;
1494
1495    fn test_data_path() -> PathBuf {
1496        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1497    }
1498
1499    #[rstest]
1500    #[case('Y' as c_char, Some(true))]
1501    #[case('N' as c_char, Some(false))]
1502    #[case('X' as c_char, None)]
1503    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1504        assert_eq!(parse_optional_bool(input), expected);
1505    }
1506
1507    #[rstest]
1508    #[case('A' as c_char, OrderSide::Sell)]
1509    #[case('B' as c_char, OrderSide::Buy)]
1510    #[case('X' as c_char, OrderSide::NoOrderSide)]
1511    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1512        assert_eq!(parse_order_side(input), expected);
1513    }
1514
1515    #[rstest]
1516    #[case('A' as c_char, AggressorSide::Seller)]
1517    #[case('B' as c_char, AggressorSide::Buyer)]
1518    #[case('X' as c_char, AggressorSide::NoAggressor)]
1519    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1520        assert_eq!(parse_aggressor_side(input), expected);
1521    }
1522
1523    #[rstest]
1524    #[case('T' as c_char, true)]
1525    #[case('A' as c_char, false)]
1526    #[case('C' as c_char, false)]
1527    #[case('F' as c_char, false)]
1528    #[case('M' as c_char, false)]
1529    #[case('R' as c_char, false)]
1530    fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1531        assert_eq!(is_trade_msg(action), expected);
1532    }
1533
1534    #[rstest]
1535    #[case('A' as c_char, Ok(BookAction::Add))]
1536    #[case('C' as c_char, Ok(BookAction::Delete))]
1537    #[case('F' as c_char, Ok(BookAction::Update))]
1538    #[case('M' as c_char, Ok(BookAction::Update))]
1539    #[case('R' as c_char, Ok(BookAction::Clear))]
1540    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1541    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1542        match parse_book_action(input) {
1543            Ok(action) => assert_eq!(Ok(action), expected),
1544            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1545        }
1546    }
1547
1548    #[rstest]
1549    #[case('C' as c_char, Ok(OptionKind::Call))]
1550    #[case('P' as c_char, Ok(OptionKind::Put))]
1551    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1552    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1553        match parse_option_kind(input) {
1554            Ok(kind) => assert_eq!(Ok(kind), expected),
1555            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1556        }
1557    }
1558
1559    #[rstest]
1560    #[case(Ok("USD"), Currency::USD())]
1561    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1562    #[case(Ok(""), Currency::USD())]
1563    #[case(Err("Error"), Currency::USD())]
1564    fn test_parse_currency_or_usd_default(
1565        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1566        #[case] expected: Currency,
1567    ) {
1568        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1569        assert_eq!(actual, expected);
1570    }
1571
1572    #[rstest]
1573    #[case("DII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1574    #[case("EII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1575    #[case("EIA", (Some(AssetClass::Equity), Some(InstrumentClass::Future)))]
1576    #[case("XXX", (None, None))]
1577    #[case("D", (None, None))]
1578    #[case("", (None, None))]
1579    fn test_parse_cfi_iso10926(
1580        #[case] input: &str,
1581        #[case] expected: (Option<AssetClass>, Option<InstrumentClass>),
1582    ) {
1583        let result = parse_cfi_iso10926(input);
1584        assert_eq!(result, expected);
1585    }
1586
1587    #[rstest]
1588    #[case(0, 2, Price::from_raw(0, 2))]
1589    #[case(
1590        1_000_000_000,
1591        2,
1592        Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1593    )]
1594    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1595        let actual = decode_price(value, precision, "test_field").unwrap();
1596        assert_eq!(actual, expected);
1597    }
1598
1599    #[rstest]
1600    fn test_decode_price_undefined_errors() {
1601        let result = decode_price(i64::MAX, 2, "strike_price");
1602        assert!(result.is_err());
1603        assert!(result.unwrap_err().to_string().contains("strike_price"));
1604    }
1605
1606    #[rstest]
1607    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1608    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1609    #[case(
1610        10_000_000_000,
1611        2,
1612        Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1613    )]
1614    fn test_decode_price_increment(
1615        #[case] value: i64,
1616        #[case] precision: u8,
1617        #[case] expected: Price,
1618    ) {
1619        let actual = decode_price_increment(value, precision);
1620        assert_eq!(actual, expected);
1621    }
1622
1623    #[rstest]
1624    #[case(i64::MAX, 2, None)] // None for i64::MAX
1625    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1626    #[case(
1627        10_000_000_000,
1628        2,
1629        Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1630    )]
1631    fn test_decode_optional_price(
1632        #[case] value: i64,
1633        #[case] precision: u8,
1634        #[case] expected: Option<Price>,
1635    ) {
1636        let actual = decode_optional_price(value, precision);
1637        assert_eq!(actual, expected);
1638    }
1639
1640    #[rstest]
1641    #[case(0, 2, Price::from_raw(0, 2))]
1642    #[case(
1643        1_000_000_000,
1644        2,
1645        Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1646    )]
1647    #[case(i64::MAX, 2, Price::from_raw(PRICE_UNDEF, 0))] // Sentinel becomes PRICE_UNDEF
1648    fn test_decode_price_or_undef(
1649        #[case] value: i64,
1650        #[case] precision: u8,
1651        #[case] expected: Price,
1652    ) {
1653        let actual = decode_price_or_undef(value, precision);
1654        assert_eq!(actual, expected);
1655    }
1656
1657    #[rstest]
1658    #[case(i64::MAX, None)] // None for i32::MAX
1659    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1660    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1661    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1662        let actual = decode_optional_quantity(value);
1663        assert_eq!(actual, expected);
1664    }
1665
1666    #[rstest]
1667    #[case(0, UnixNanos::from(0))]
1668    #[case(1_000_000_000, UnixNanos::from(1_000_000_000))]
1669    fn test_decode_timestamp(#[case] value: u64, #[case] expected: UnixNanos) {
1670        let actual = decode_timestamp(value, "test_field").unwrap();
1671        assert_eq!(actual, expected);
1672    }
1673
1674    #[rstest]
1675    fn test_decode_timestamp_undefined_errors() {
1676        let result = decode_timestamp(dbn::UNDEF_TIMESTAMP, "expiration");
1677        assert!(result.is_err());
1678        assert!(result.unwrap_err().to_string().contains("expiration"));
1679    }
1680
1681    #[rstest]
1682    #[case(0, Some(UnixNanos::from(0)))]
1683    #[case(1_000_000_000, Some(UnixNanos::from(1_000_000_000)))]
1684    #[case(dbn::UNDEF_TIMESTAMP, None)]
1685    fn test_decode_optional_timestamp(#[case] value: u64, #[case] expected: Option<UnixNanos>) {
1686        let actual = decode_optional_timestamp(value);
1687        assert_eq!(actual, expected);
1688    }
1689
1690    #[rstest]
1691    #[case(0, Quantity::from(1))] // Default fallback for 0
1692    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1693    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1694    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1695    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1696    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1697    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1698    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1699    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1700    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1701        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1702    }
1703
1704    #[rstest]
1705    #[case(-1_500_000_000)] // Large negative value
1706    #[case(-1)] // Small negative value
1707    #[case(-999_999_999)] // Another negative value
1708    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1709        let result = decode_multiplier(raw);
1710        assert!(result.is_err());
1711        assert!(
1712            result
1713                .unwrap_err()
1714                .to_string()
1715                .contains("Invalid negative multiplier")
1716        );
1717    }
1718
1719    #[rstest]
1720    #[case(100, Quantity::from(100))]
1721    #[case(1000, Quantity::from(1000))]
1722    #[case(5, Quantity::from(5))]
1723    fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1724        assert_eq!(decode_quantity(value), expected);
1725    }
1726
1727    #[rstest]
1728    #[case(0, Quantity::from(1))] // Default for 0
1729    #[case(i32::MAX, Quantity::from(1))] // Default for MAX
1730    #[case(100, Quantity::from(100))]
1731    #[case(1, Quantity::from(1))]
1732    #[case(1000, Quantity::from(1000))]
1733    fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1734        assert_eq!(decode_lot_size(value), expected);
1735    }
1736
1737    #[rstest]
1738    #[case(0, None)] // None for 0
1739    #[case(1, Some(Ustr::from("Scheduled")))]
1740    #[case(2, Some(Ustr::from("Surveillance intervention")))]
1741    #[case(3, Some(Ustr::from("Market event")))]
1742    #[case(10, Some(Ustr::from("Regulatory")))]
1743    #[case(30, Some(Ustr::from("News pending")))]
1744    #[case(40, Some(Ustr::from("Order imbalance")))]
1745    #[case(50, Some(Ustr::from("LULD pause")))]
1746    #[case(60, Some(Ustr::from("Operational")))]
1747    #[case(100, Some(Ustr::from("Corporate action")))]
1748    #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1749    fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1750        assert_eq!(parse_status_reason(value).unwrap(), expected);
1751    }
1752
1753    #[rstest]
1754    #[case(999)] // Invalid code
1755    fn test_parse_status_reason_invalid(#[case] value: u16) {
1756        assert!(parse_status_reason(value).is_err());
1757    }
1758
1759    #[rstest]
1760    #[case(0, None)] // None for 0
1761    #[case(1, Some(Ustr::from("No cancel")))]
1762    #[case(2, Some(Ustr::from("Change trading session")))]
1763    #[case(3, Some(Ustr::from("Implied matching on")))]
1764    #[case(4, Some(Ustr::from("Implied matching off")))]
1765    fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1766        assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1767    }
1768
1769    #[rstest]
1770    #[case(5)] // Invalid code
1771    #[case(100)] // Invalid code
1772    fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1773        assert!(parse_status_trading_event(value).is_err());
1774    }
1775
1776    #[rstest]
1777    fn test_decode_mbo_msg() {
1778        let path = test_data_path().join("test_data.mbo.dbn.zst");
1779        let mut dbn_stream = Decoder::from_zstd_file(path)
1780            .unwrap()
1781            .decode_stream::<dbn::MboMsg>();
1782        let msg = dbn_stream.next().unwrap().unwrap();
1783
1784        let instrument_id = InstrumentId::from("ESM4.GLBX");
1785        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1786        let delta = delta.unwrap();
1787
1788        assert_eq!(delta.instrument_id, instrument_id);
1789        assert_eq!(delta.action, BookAction::Delete);
1790        assert_eq!(delta.order.side, OrderSide::Sell);
1791        assert_eq!(delta.order.price, Price::from("3722.75"));
1792        assert_eq!(delta.order.size, Quantity::from("1"));
1793        assert_eq!(delta.order.order_id, 647_784_973_705);
1794        assert_eq!(delta.flags, 128);
1795        assert_eq!(delta.sequence, 1_170_352);
1796        assert_eq!(delta.ts_event, msg.ts_recv);
1797        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1798        assert_eq!(delta.ts_init, 0);
1799    }
1800
1801    #[rstest]
1802    fn test_decode_mbo_msg_clear_action() {
1803        // Create an MBO message with Clear action (action='R', side='N')
1804        let ts_recv = 1_609_160_400_000_000_000;
1805        let msg = dbn::MboMsg {
1806            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1807            order_id: 0,
1808            price: i64::MAX,
1809            size: 0,
1810            flags: dbn::FlagSet::empty(),
1811            channel_id: 0,
1812            action: 'R' as c_char,
1813            side: 'N' as c_char, // NoOrderSide for Clear
1814            ts_recv,
1815            ts_in_delta: 0,
1816            sequence: 1_000_000,
1817        };
1818
1819        let instrument_id = InstrumentId::from("ESM4.GLBX");
1820        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1821
1822        // Clear messages should produce OrderBookDelta, not TradeTick
1823        assert!(trade.is_none());
1824        let delta = delta.expect("Clear action should produce OrderBookDelta");
1825
1826        assert_eq!(delta.instrument_id, instrument_id);
1827        assert_eq!(delta.action, BookAction::Clear);
1828        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1829        assert_eq!(delta.order.size, Quantity::from("0"));
1830        assert_eq!(delta.order.order_id, 0);
1831        assert_eq!(delta.sequence, 1_000_000);
1832        assert_eq!(delta.ts_event, ts_recv);
1833        assert_eq!(delta.ts_init, 0);
1834        assert!(delta.order.price.is_undefined());
1835        assert_eq!(delta.order.price.precision, 0);
1836    }
1837
1838    #[rstest]
1839    fn test_decode_mbo_msg_price_undef_with_precision() {
1840        // Test that PRICE_UNDEF (i64::MAX) forces precision to 0 even when price_precision is non-zero
1841        let ts_recv = 1_609_160_400_000_000_000;
1842        let msg = dbn::MboMsg {
1843            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1844            order_id: 0,
1845            price: i64::MAX, // PRICE_UNDEF
1846            size: 0,
1847            flags: dbn::FlagSet::empty(),
1848            channel_id: 0,
1849            action: 'R' as c_char, // Clear
1850            side: 'N' as c_char,   // NoOrderSide
1851            ts_recv,
1852            ts_in_delta: 0,
1853            sequence: 0,
1854        };
1855
1856        let instrument_id = InstrumentId::from("ESM4.GLBX");
1857        let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1858        let delta = delta.unwrap();
1859
1860        assert!(delta.order.price.is_undefined());
1861        assert_eq!(delta.order.price.precision, 0);
1862        assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1863    }
1864
1865    #[rstest]
1866    fn test_decode_mbo_msg_no_order_side_update() {
1867        // MBO messages with NoOrderSide are now passed through to the book
1868        // The book will resolve the side from its cache using the order_id
1869        let ts_recv = 1_609_160_400_000_000_000;
1870        let msg = dbn::MboMsg {
1871            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1872            order_id: 123_456_789,
1873            price: 4_800_250_000_000, // $4800.25 with precision 2
1874            size: 1,
1875            flags: dbn::FlagSet::empty(),
1876            channel_id: 1,
1877            action: 'M' as c_char, // Modify/Update action
1878            side: 'N' as c_char,   // NoOrderSide
1879            ts_recv,
1880            ts_in_delta: 0,
1881            sequence: 1_000_000,
1882        };
1883
1884        let instrument_id = InstrumentId::from("ESM4.GLBX");
1885        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1886
1887        // Delta should be created with NoOrderSide (book will resolve it)
1888        assert!(delta.is_some());
1889        assert!(trade.is_none());
1890        let delta = delta.unwrap();
1891        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1892        assert_eq!(delta.order.order_id, 123_456_789);
1893        assert_eq!(delta.action, BookAction::Update);
1894    }
1895
1896    #[rstest]
1897    fn test_decode_mbp1_msg() {
1898        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1899        let mut dbn_stream = Decoder::from_zstd_file(path)
1900            .unwrap()
1901            .decode_stream::<dbn::Mbp1Msg>();
1902        let msg = dbn_stream.next().unwrap().unwrap();
1903
1904        let instrument_id = InstrumentId::from("ESM4.GLBX");
1905        let (maybe_quote, _) =
1906            decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1907        let quote = maybe_quote.expect("Expected valid quote");
1908
1909        assert_eq!(quote.instrument_id, instrument_id);
1910        assert_eq!(quote.bid_price, Price::from("3720.25"));
1911        assert_eq!(quote.ask_price, Price::from("3720.50"));
1912        assert_eq!(quote.bid_size, Quantity::from("24"));
1913        assert_eq!(quote.ask_size, Quantity::from("11"));
1914        assert_eq!(quote.ts_event, msg.ts_recv);
1915        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1916        assert_eq!(quote.ts_init, 0);
1917    }
1918
1919    #[rstest]
1920    fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
1921        let ts_recv = 1_609_160_400_000_000_000;
1922        let msg = dbn::Mbp1Msg {
1923            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1924            price: 3_720_250_000_000, // Valid trade price
1925            size: 5,
1926            action: 'A' as c_char,
1927            side: 'B' as c_char,
1928            flags: dbn::FlagSet::empty(),
1929            depth: 0,
1930            ts_recv,
1931            ts_in_delta: 0,
1932            sequence: 1_170_352,
1933            levels: [dbn::BidAskPair {
1934                bid_px: 3_720_250_000_000, // Valid bid price
1935                ask_px: i64::MAX,          // Undefined ask price
1936                bid_sz: 24,
1937                ask_sz: 0,
1938                bid_ct: 1,
1939                ask_ct: 0,
1940            }],
1941        };
1942
1943        let instrument_id = InstrumentId::from("ESM4.GLBX");
1944        let (maybe_quote, _) =
1945            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1946
1947        // Quote should be None because ask price is undefined
1948        assert!(maybe_quote.is_none());
1949    }
1950
1951    #[rstest]
1952    fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
1953        let ts_recv = 1_609_160_400_000_000_000;
1954        let msg = dbn::Mbp1Msg {
1955            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1956            price: 3_720_500_000_000, // Valid trade price
1957            size: 5,
1958            action: 'A' as c_char,
1959            side: 'A' as c_char,
1960            flags: dbn::FlagSet::empty(),
1961            depth: 0,
1962            ts_recv,
1963            ts_in_delta: 0,
1964            sequence: 1_170_352,
1965            levels: [dbn::BidAskPair {
1966                bid_px: i64::MAX,          // Undefined bid price
1967                ask_px: 3_720_500_000_000, // Valid ask price
1968                bid_sz: 0,
1969                ask_sz: 11,
1970                bid_ct: 0,
1971                ask_ct: 1,
1972            }],
1973        };
1974
1975        let instrument_id = InstrumentId::from("ESM4.GLBX");
1976        let (maybe_quote, _) =
1977            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1978
1979        // Quote should be None because bid price is undefined
1980        assert!(maybe_quote.is_none());
1981    }
1982
1983    #[rstest]
1984    fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
1985        let ts_recv = 1_609_160_400_000_000_000;
1986        let msg = dbn::Mbp1Msg {
1987            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1988            price: 3_720_250_000_000, // Valid trade price
1989            size: 5,
1990            action: 'T' as c_char, // Trade action
1991            side: 'A' as c_char,
1992            flags: dbn::FlagSet::empty(),
1993            depth: 0,
1994            ts_recv,
1995            ts_in_delta: 0,
1996            sequence: 1_170_352,
1997            levels: [dbn::BidAskPair {
1998                bid_px: i64::MAX, // Undefined bid
1999                ask_px: i64::MAX, // Undefined ask
2000                bid_sz: 0,
2001                ask_sz: 0,
2002                bid_ct: 0,
2003                ask_ct: 0,
2004            }],
2005        };
2006
2007        let instrument_id = InstrumentId::from("ESM4.GLBX");
2008        let (maybe_quote, maybe_trade) =
2009            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
2010
2011        // Quote should be None because both prices are undefined
2012        assert!(maybe_quote.is_none());
2013
2014        // Trade should still be present
2015        let trade = maybe_trade.expect("Expected trade");
2016        assert_eq!(trade.instrument_id, instrument_id);
2017        assert_eq!(trade.price, Price::from("3720.25"));
2018        assert_eq!(trade.size, Quantity::from("5"));
2019    }
2020
2021    #[rstest]
2022    fn test_decode_bbo_1s_msg() {
2023        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
2024        let mut dbn_stream = Decoder::from_zstd_file(path)
2025            .unwrap()
2026            .decode_stream::<dbn::BboMsg>();
2027        let msg = dbn_stream.next().unwrap().unwrap();
2028
2029        let instrument_id = InstrumentId::from("ESM4.GLBX");
2030        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2031        let quote = maybe_quote.expect("Expected valid quote");
2032
2033        assert_eq!(quote.instrument_id, instrument_id);
2034        assert_eq!(quote.bid_price, Price::from("3702.25"));
2035        assert_eq!(quote.ask_price, Price::from("3702.75"));
2036        assert_eq!(quote.bid_size, Quantity::from("18"));
2037        assert_eq!(quote.ask_size, Quantity::from("13"));
2038        assert_eq!(quote.ts_event, msg.ts_recv);
2039        assert_eq!(quote.ts_event, 1609113600000000000);
2040        assert_eq!(quote.ts_init, 0);
2041    }
2042
2043    #[rstest]
2044    fn test_decode_bbo_1m_msg() {
2045        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
2046        let mut dbn_stream = Decoder::from_zstd_file(path)
2047            .unwrap()
2048            .decode_stream::<dbn::BboMsg>();
2049        let msg = dbn_stream.next().unwrap().unwrap();
2050
2051        let instrument_id = InstrumentId::from("ESM4.GLBX");
2052        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2053        let quote = maybe_quote.expect("Expected valid quote");
2054
2055        assert_eq!(quote.instrument_id, instrument_id);
2056        assert_eq!(quote.bid_price, Price::from("3702.25"));
2057        assert_eq!(quote.ask_price, Price::from("3702.75"));
2058        assert_eq!(quote.bid_size, Quantity::from("18"));
2059        assert_eq!(quote.ask_size, Quantity::from("13"));
2060        assert_eq!(quote.ts_event, msg.ts_recv);
2061        assert_eq!(quote.ts_event, 1609113600000000000);
2062        assert_eq!(quote.ts_init, 0);
2063    }
2064
2065    #[rstest]
2066    fn test_decode_mbp10_msg() {
2067        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
2068        let mut dbn_stream = Decoder::from_zstd_file(path)
2069            .unwrap()
2070            .decode_stream::<dbn::Mbp10Msg>();
2071        let msg = dbn_stream.next().unwrap().unwrap();
2072
2073        let instrument_id = InstrumentId::from("ESM4.GLBX");
2074        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2075
2076        assert_eq!(depth10.instrument_id, instrument_id);
2077        assert_eq!(depth10.bids.len(), 10);
2078        assert_eq!(depth10.asks.len(), 10);
2079        assert_eq!(depth10.bid_counts.len(), 10);
2080        assert_eq!(depth10.ask_counts.len(), 10);
2081        assert_eq!(depth10.flags, 128);
2082        assert_eq!(depth10.sequence, 1_170_352);
2083        assert_eq!(depth10.ts_event, msg.ts_recv);
2084        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
2085        assert_eq!(depth10.ts_init, 0);
2086    }
2087
2088    #[rstest]
2089    fn test_decode_trade_msg() {
2090        let path = test_data_path().join("test_data.trades.dbn.zst");
2091        let mut dbn_stream = Decoder::from_zstd_file(path)
2092            .unwrap()
2093            .decode_stream::<dbn::TradeMsg>();
2094        let msg = dbn_stream.next().unwrap().unwrap();
2095
2096        let instrument_id = InstrumentId::from("ESM4.GLBX");
2097        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2098
2099        assert_eq!(trade.instrument_id, instrument_id);
2100        assert_eq!(trade.price, Price::from("3720.25"));
2101        assert_eq!(trade.size, Quantity::from("5"));
2102        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2103        assert_eq!(trade.trade_id.to_string(), "1170380");
2104        assert_eq!(trade.ts_event, msg.ts_recv);
2105        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2106        assert_eq!(trade.ts_init, 0);
2107    }
2108
2109    #[rstest]
2110    fn test_decode_tbbo_msg() {
2111        let path = test_data_path().join("test_data.tbbo.dbn.zst");
2112        let mut dbn_stream = Decoder::from_zstd_file(path)
2113            .unwrap()
2114            .decode_stream::<dbn::Mbp1Msg>();
2115        let msg = dbn_stream.next().unwrap().unwrap();
2116
2117        let instrument_id = InstrumentId::from("ESM4.GLBX");
2118        let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2119        let quote = maybe_quote.expect("Expected valid quote");
2120
2121        assert_eq!(quote.instrument_id, instrument_id);
2122        assert_eq!(quote.bid_price, Price::from("3720.25"));
2123        assert_eq!(quote.ask_price, Price::from("3720.50"));
2124        assert_eq!(quote.bid_size, Quantity::from("26"));
2125        assert_eq!(quote.ask_size, Quantity::from("7"));
2126        assert_eq!(quote.ts_event, msg.ts_recv);
2127        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2128        assert_eq!(quote.ts_init, 0);
2129
2130        assert_eq!(trade.instrument_id, instrument_id);
2131        assert_eq!(trade.price, Price::from("3720.25"));
2132        assert_eq!(trade.size, Quantity::from("5"));
2133        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2134        assert_eq!(trade.trade_id.to_string(), "1170380");
2135        assert_eq!(trade.ts_event, msg.ts_recv);
2136        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2137        assert_eq!(trade.ts_init, 0);
2138    }
2139
2140    #[rstest]
2141    fn test_decode_ohlcv_msg() {
2142        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2143        let mut dbn_stream = Decoder::from_zstd_file(path)
2144            .unwrap()
2145            .decode_stream::<dbn::OhlcvMsg>();
2146        let msg = dbn_stream.next().unwrap().unwrap();
2147
2148        let instrument_id = InstrumentId::from("ESM4.GLBX");
2149        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2150
2151        assert_eq!(
2152            bar.bar_type,
2153            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2154        );
2155        assert_eq!(bar.open, Price::from("372025.00"));
2156        assert_eq!(bar.high, Price::from("372050.00"));
2157        assert_eq!(bar.low, Price::from("372025.00"));
2158        assert_eq!(bar.close, Price::from("372050.00"));
2159        assert_eq!(bar.volume, Quantity::from("57"));
2160        assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); // timestamp_on_close=true
2161        assert_eq!(bar.ts_init, 0); // ts_init was Some(0)
2162    }
2163
2164    #[rstest]
2165    fn test_decode_definition_msg() {
2166        let path = test_data_path().join("test_data.definition.dbn.zst");
2167        let mut dbn_stream = Decoder::from_zstd_file(path)
2168            .unwrap()
2169            .decode_stream::<dbn::InstrumentDefMsg>();
2170        let msg = dbn_stream.next().unwrap().unwrap();
2171
2172        let instrument_id = InstrumentId::from("ESM4.GLBX");
2173        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2174
2175        assert!(result.is_ok());
2176        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2177    }
2178
2179    #[rstest]
2180    fn test_decode_status_msg() {
2181        let path = test_data_path().join("test_data.status.dbn.zst");
2182        let mut dbn_stream = Decoder::from_zstd_file(path)
2183            .unwrap()
2184            .decode_stream::<dbn::StatusMsg>();
2185        let msg = dbn_stream.next().unwrap().unwrap();
2186
2187        let instrument_id = InstrumentId::from("ESM4.GLBX");
2188        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2189
2190        assert_eq!(status.instrument_id, instrument_id);
2191        assert_eq!(status.action, MarketStatusAction::Trading);
2192        assert_eq!(status.ts_event, msg.hd.ts_event);
2193        assert_eq!(status.ts_init, 0);
2194        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2195        assert_eq!(status.trading_event, None);
2196        assert_eq!(status.is_trading, Some(true));
2197        assert_eq!(status.is_quoting, Some(true));
2198        assert_eq!(status.is_short_sell_restricted, None);
2199    }
2200
2201    #[rstest]
2202    fn test_decode_imbalance_msg() {
2203        let path = test_data_path().join("test_data.imbalance.dbn.zst");
2204        let mut dbn_stream = Decoder::from_zstd_file(path)
2205            .unwrap()
2206            .decode_stream::<dbn::ImbalanceMsg>();
2207        let msg = dbn_stream.next().unwrap().unwrap();
2208
2209        let instrument_id = InstrumentId::from("ESM4.GLBX");
2210        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2211
2212        assert_eq!(imbalance.instrument_id, instrument_id);
2213        assert_eq!(imbalance.ref_price, Price::from("229.43"));
2214        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2215        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2216        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2217        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2218        assert_eq!(imbalance.side, OrderSide::Buy);
2219        assert_eq!(imbalance.significant_imbalance, 126);
2220        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2221        assert_eq!(imbalance.ts_recv, msg.ts_recv);
2222        assert_eq!(imbalance.ts_init, 0);
2223    }
2224
2225    #[rstest]
2226    fn test_decode_statistics_msg() {
2227        let path = test_data_path().join("test_data.statistics.dbn.zst");
2228        let mut dbn_stream = Decoder::from_zstd_file(path)
2229            .unwrap()
2230            .decode_stream::<dbn::StatMsg>();
2231        let msg = dbn_stream.next().unwrap().unwrap();
2232
2233        let instrument_id = InstrumentId::from("ESM4.GLBX");
2234        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2235
2236        assert_eq!(statistics.instrument_id, instrument_id);
2237        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2238        assert_eq!(
2239            statistics.update_action,
2240            DatabentoStatisticUpdateAction::Added
2241        );
2242        assert_eq!(statistics.price, Some(Price::from("100.00")));
2243        assert_eq!(statistics.quantity, None);
2244        assert_eq!(statistics.channel_id, 13);
2245        assert_eq!(statistics.stat_flags, 255);
2246        assert_eq!(statistics.sequence, 2);
2247        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2248        assert_eq!(statistics.ts_in_delta, 26961);
2249        assert_eq!(statistics.ts_event, msg.hd.ts_event);
2250        assert_eq!(statistics.ts_recv, msg.ts_recv);
2251        assert_eq!(statistics.ts_init, 0);
2252    }
2253
2254    #[rstest]
2255    fn test_decode_cmbp1_msg() {
2256        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2257        let mut dbn_stream = Decoder::from_zstd_file(path)
2258            .unwrap()
2259            .decode_stream::<dbn::Cmbp1Msg>();
2260        let msg = dbn_stream.next().unwrap().unwrap();
2261
2262        let instrument_id = InstrumentId::from("ESM4.GLBX");
2263        let (maybe_quote, trade) =
2264            decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2265        let quote = maybe_quote.expect("Expected valid quote");
2266
2267        assert_eq!(quote.instrument_id, instrument_id);
2268        assert!(quote.bid_price.raw > 0);
2269        assert!(quote.ask_price.raw > 0);
2270        assert!(quote.bid_size.raw > 0);
2271        assert!(quote.ask_size.raw > 0);
2272        assert_eq!(quote.ts_event, msg.ts_recv);
2273        assert_eq!(quote.ts_init, 0);
2274
2275        // Check if trade is present based on action
2276        if is_trade_msg(msg.action) {
2277            assert!(trade.is_some());
2278            let trade = trade.unwrap();
2279            assert_eq!(trade.instrument_id, instrument_id);
2280        } else {
2281            assert!(trade.is_none());
2282        }
2283    }
2284
2285    #[rstest]
2286    fn test_decode_cbbo_1s_msg() {
2287        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2288        let mut dbn_stream = Decoder::from_zstd_file(path)
2289            .unwrap()
2290            .decode_stream::<dbn::CbboMsg>();
2291        let msg = dbn_stream.next().unwrap().unwrap();
2292
2293        let instrument_id = InstrumentId::from("ESM4.GLBX");
2294        let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2295        let quote = maybe_quote.expect("Expected valid quote");
2296
2297        assert_eq!(quote.instrument_id, instrument_id);
2298        assert!(quote.bid_price.raw > 0);
2299        assert!(quote.ask_price.raw > 0);
2300        assert!(quote.bid_size.raw > 0);
2301        assert!(quote.ask_size.raw > 0);
2302        assert_eq!(quote.ts_event, msg.ts_recv);
2303        assert_eq!(quote.ts_init, 0);
2304    }
2305
2306    #[rstest]
2307    fn test_decode_mbp10_msg_with_all_levels() {
2308        let mut msg = dbn::Mbp10Msg::default();
2309        for i in 0..10 {
2310            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2311            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2312            msg.levels[i].bid_sz = 10 + i as u32;
2313            msg.levels[i].ask_sz = 10 + i as u32;
2314            msg.levels[i].bid_ct = 1 + i as u32;
2315            msg.levels[i].ask_ct = 1 + i as u32;
2316        }
2317        msg.ts_recv = 1_609_160_400_000_704_060;
2318
2319        let instrument_id = InstrumentId::from("TEST.VENUE");
2320        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2321
2322        assert!(result.is_ok());
2323        let depth = result.unwrap();
2324        assert_eq!(depth.bids.len(), 10);
2325        assert_eq!(depth.asks.len(), 10);
2326        assert_eq!(depth.bid_counts.len(), 10);
2327        assert_eq!(depth.ask_counts.len(), 10);
2328    }
2329
2330    #[rstest]
2331    fn test_array_conversion_error_handling() {
2332        let mut bids = Vec::new();
2333        let mut asks = Vec::new();
2334
2335        // Intentionally create fewer than DEPTH10_LEN elements
2336        for i in 0..5 {
2337            bids.push(BookOrder::new(
2338                OrderSide::Buy,
2339                Price::from(format!("{}.00", 100 - i)),
2340                Quantity::from(10),
2341                i as u64,
2342            ));
2343            asks.push(BookOrder::new(
2344                OrderSide::Sell,
2345                Price::from(format!("{}.00", 101 + i)),
2346                Quantity::from(10),
2347                i as u64,
2348            ));
2349        }
2350
2351        let result: Result<[BookOrder; DEPTH10_LEN], _> =
2352            bids.try_into().map_err(|v: Vec<BookOrder>| {
2353                anyhow::anyhow!(
2354                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2355                    v.len()
2356                )
2357            });
2358        assert!(result.is_err());
2359        assert!(
2360            result
2361                .unwrap_err()
2362                .to_string()
2363                .contains("Expected exactly 10 bid levels, received 5")
2364        );
2365    }
2366
2367    #[rstest]
2368    fn test_decode_tcbbo_msg() {
2369        // Use cbbo-1s as base since cbbo.dbn.zst was invalid
2370        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2371        let mut dbn_stream = Decoder::from_zstd_file(path)
2372            .unwrap()
2373            .decode_stream::<dbn::CbboMsg>();
2374        let msg = dbn_stream.next().unwrap().unwrap();
2375
2376        // Simulate TCBBO by adding trade data
2377        let mut tcbbo_msg = msg.clone();
2378        tcbbo_msg.price = 3702500000000;
2379        tcbbo_msg.size = 10;
2380
2381        let instrument_id = InstrumentId::from("ESM4.GLBX");
2382        let (maybe_quote, trade) =
2383            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2384        let quote = maybe_quote.expect("Expected valid quote");
2385
2386        assert_eq!(quote.instrument_id, instrument_id);
2387        assert!(quote.bid_price.raw > 0);
2388        assert!(quote.ask_price.raw > 0);
2389        assert!(quote.bid_size.raw > 0);
2390        assert!(quote.ask_size.raw > 0);
2391        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2392        assert_eq!(quote.ts_init, 0);
2393
2394        assert_eq!(trade.instrument_id, instrument_id);
2395        assert_eq!(trade.price, Price::from("3702.50"));
2396        assert_eq!(trade.size, Quantity::from(10));
2397        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2398        assert_eq!(trade.ts_init, 0);
2399    }
2400
2401    #[rstest]
2402    fn test_decode_bar_type() {
2403        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2404        let instrument_id = InstrumentId::from("ESM4.GLBX");
2405
2406        // Test 1-second bar
2407        msg.hd.rtype = 32;
2408        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2409        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2410
2411        // Test 1-minute bar
2412        msg.hd.rtype = 33;
2413        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2414        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2415
2416        // Test 1-hour bar
2417        msg.hd.rtype = 34;
2418        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2419        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2420
2421        // Test 1-day bar
2422        msg.hd.rtype = 35;
2423        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2424        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2425
2426        // Test unsupported rtype
2427        msg.hd.rtype = 99;
2428        let result = decode_bar_type(&msg, instrument_id);
2429        assert!(result.is_err());
2430    }
2431
2432    #[rstest]
2433    fn test_decode_ts_event_adjustment() {
2434        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2435
2436        // Test 1-second bar adjustment
2437        msg.hd.rtype = 32;
2438        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2439        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2440
2441        // Test 1-minute bar adjustment
2442        msg.hd.rtype = 33;
2443        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2444        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2445
2446        // Test 1-hour bar adjustment
2447        msg.hd.rtype = 34;
2448        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2449        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2450
2451        // Test 1-day bar adjustment
2452        msg.hd.rtype = 35;
2453        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2454        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2455
2456        // Test eod bar adjustment (same as 1d)
2457        msg.hd.rtype = 36;
2458        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2459        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2460
2461        // Test unsupported rtype
2462        msg.hd.rtype = 99;
2463        let result = decode_ts_event_adjustment(&msg);
2464        assert!(result.is_err());
2465    }
2466
2467    #[rstest]
2468    fn test_decode_record() {
2469        // Test with MBO message
2470        let path = test_data_path().join("test_data.mbo.dbn.zst");
2471        let decoder = Decoder::from_zstd_file(path).unwrap();
2472        let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2473        let msg = dbn_stream.next().unwrap().unwrap();
2474
2475        let record_ref = dbn::RecordRef::from(msg);
2476        let instrument_id = InstrumentId::from("ESM4.GLBX");
2477
2478        let (data1, data2) =
2479            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2480
2481        assert!(data1.is_some());
2482        assert!(data2.is_none());
2483
2484        // Test with Trade message
2485        let path = test_data_path().join("test_data.trades.dbn.zst");
2486        let decoder = Decoder::from_zstd_file(path).unwrap();
2487        let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2488        let msg = dbn_stream.next().unwrap().unwrap();
2489
2490        let record_ref = dbn::RecordRef::from(msg);
2491
2492        let (data1, data2) =
2493            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2494
2495        assert!(data1.is_some());
2496        assert!(data2.is_none());
2497        assert!(matches!(data1.unwrap(), Data::Trade(_)));
2498    }
2499}