nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24    },
25    enums::{
26        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28    },
29    identifiers::{InstrumentId, TradeId},
30    instruments::{
31        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32    },
33    types::{
34        Currency, Price, Quantity,
35        price::{PRICE_UNDEF, PriceRaw, decode_raw_price_i64},
36    },
37};
38use ustr::Ustr;
39
40use super::{
41    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
42    types::{DatabentoImbalance, DatabentoStatistics},
43};
44
45// SAFETY: Known valid value
46const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
47
48const BAR_SPEC_1S: BarSpecification = BarSpecification {
49    step: STEP_ONE,
50    aggregation: BarAggregation::Second,
51    price_type: PriceType::Last,
52};
53const BAR_SPEC_1M: BarSpecification = BarSpecification {
54    step: STEP_ONE,
55    aggregation: BarAggregation::Minute,
56    price_type: PriceType::Last,
57};
58const BAR_SPEC_1H: BarSpecification = BarSpecification {
59    step: STEP_ONE,
60    aggregation: BarAggregation::Hour,
61    price_type: PriceType::Last,
62};
63const BAR_SPEC_1D: BarSpecification = BarSpecification {
64    step: STEP_ONE,
65    aggregation: BarAggregation::Day,
66    price_type: PriceType::Last,
67};
68
69const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
70const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
71const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
72const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
73
74#[must_use]
75pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
76    match c as u8 as char {
77        'Y' => Some(true),
78        'N' => Some(false),
79        _ => None,
80    }
81}
82
83#[must_use]
84pub const fn parse_order_side(c: c_char) -> OrderSide {
85    match c as u8 as char {
86        'A' => OrderSide::Sell,
87        'B' => OrderSide::Buy,
88        _ => OrderSide::NoOrderSide,
89    }
90}
91
92#[must_use]
93pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
94    match c as u8 as char {
95        'A' => AggressorSide::Seller,
96        'B' => AggressorSide::Buyer,
97        _ => AggressorSide::NoAggressor,
98    }
99}
100
101/// Parses a Databento book action character into a `BookAction` enum.
102///
103/// # Errors
104///
105/// Returns an error if `c` is not a valid `BookAction` character.
106pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
107    match c as u8 as char {
108        'A' => Ok(BookAction::Add),
109        'C' => Ok(BookAction::Delete),
110        'F' => Ok(BookAction::Update),
111        'M' => Ok(BookAction::Update),
112        'R' => Ok(BookAction::Clear),
113        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
114    }
115}
116
117/// Parses a Databento option kind character into an `OptionKind` enum.
118///
119/// # Errors
120///
121/// Returns an error if `c` is not a valid `OptionKind` character.
122pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
123    match c as u8 as char {
124        'C' => Ok(OptionKind::Call),
125        'P' => Ok(OptionKind::Put),
126        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
127    }
128}
129
130fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
131    match value {
132        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
133            tracing::warn!("Unknown currency code '{value}', defaulting to USD");
134            Currency::USD()
135        }),
136        Ok(_) => Currency::USD(),
137        Err(e) => {
138            tracing::error!("Error parsing currency: {e}");
139            Currency::USD()
140        }
141    }
142}
143
144/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
145///
146/// # Errors
147///
148/// Returns an error if `value` has fewer than 3 characters.
149pub fn parse_cfi_iso10926(
150    value: &str,
151) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
152    let chars: Vec<char> = value.chars().collect();
153    if chars.len() < 3 {
154        anyhow::bail!("Value string is too short");
155    }
156
157    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
158    let cfi_category = chars[0];
159    let cfi_group = chars[1];
160    let cfi_attribute1 = chars[2];
161    // let cfi_attribute2 = value[3];
162    // let cfi_attribute3 = value[4];
163    // let cfi_attribute4 = value[5];
164
165    let mut asset_class = match cfi_category {
166        'D' => Some(AssetClass::Debt),
167        'E' => Some(AssetClass::Equity),
168        'S' => None,
169        _ => None,
170    };
171
172    let instrument_class = match cfi_group {
173        'I' => Some(InstrumentClass::Future),
174        _ => None,
175    };
176
177    if cfi_attribute1 == 'I' {
178        asset_class = Some(AssetClass::Index);
179    }
180
181    Ok((asset_class, instrument_class))
182}
183
184/// Parses a Databento status reason code into a human-readable string.
185///
186/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
187///
188/// # Errors
189///
190/// Returns an error if `value` is an invalid status reason code.
191pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
192    let value_str = match value {
193        0 => return Ok(None),
194        1 => "Scheduled",
195        2 => "Surveillance intervention",
196        3 => "Market event",
197        4 => "Instrument activation",
198        5 => "Instrument expiration",
199        6 => "Recovery in process",
200        10 => "Regulatory",
201        11 => "Administrative",
202        12 => "Non-compliance",
203        13 => "Filings not current",
204        14 => "SEC trading suspension",
205        15 => "New issue",
206        16 => "Issue available",
207        17 => "Issues reviewed",
208        18 => "Filing requirements satisfied",
209        30 => "News pending",
210        31 => "News released",
211        32 => "News and resumption times",
212        33 => "News not forthcoming",
213        40 => "Order imbalance",
214        50 => "LULD pause",
215        60 => "Operational",
216        70 => "Additional information requested",
217        80 => "Merger effective",
218        90 => "ETF",
219        100 => "Corporate action",
220        110 => "New Security offering",
221        120 => "Market wide halt level 1",
222        121 => "Market wide halt level 2",
223        122 => "Market wide halt level 3",
224        123 => "Market wide halt carryover",
225        124 => "Market wide halt resumption",
226        130 => "Quotation not available",
227        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
228    };
229
230    Ok(Some(Ustr::from(value_str)))
231}
232
233/// Parses a Databento status trading event code into a human-readable string.
234///
235/// # Errors
236///
237/// Returns an error if `value` is an invalid status trading event code.
238pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
239    let value_str = match value {
240        0 => return Ok(None),
241        1 => "No cancel",
242        2 => "Change trading session",
243        3 => "Implied matching on",
244        4 => "Implied matching off",
245        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
246    };
247
248    Ok(Some(Ustr::from(value_str)))
249}
250
251/// Decodes a raw price from an i64 value and returns the appropriate precision.
252///
253/// If the decoded raw value equals `PRICE_UNDEF`, precision is forced to 0
254/// as required by the `Price` type invariants.
255///
256/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
257/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
258#[inline(always)]
259#[must_use]
260fn decode_raw_price_with_precision(value: i64, precision: u8) -> (PriceRaw, u8) {
261    let raw = if value == i64::MAX {
262        PRICE_UNDEF
263    } else {
264        decode_raw_price_i64(value)
265    };
266
267    // PRICE_UNDEF must always have precision 0
268    let precision = if raw == PRICE_UNDEF { 0 } else { precision };
269    (raw, precision)
270}
271
272/// Decodes a price from the given value, expressed in units of 1e-9.
273#[inline(always)]
274#[must_use]
275pub fn decode_price(value: i64, precision: u8) -> Price {
276    let (raw, precision) = decode_raw_price_with_precision(value, precision);
277    Price::from_raw(raw, precision)
278}
279
280/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
281#[inline(always)]
282#[must_use]
283pub fn decode_price_increment(value: i64, precision: u8) -> Price {
284    match value {
285        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
286        _ => decode_price(value, precision),
287    }
288}
289
290/// Decodes a price from the given optional value, expressed in units of 1e-9.
291#[inline(always)]
292#[must_use]
293pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
294    match value {
295        i64::MAX => None,
296        _ => Some(decode_price(value, precision)),
297    }
298}
299
300/// Decodes a quantity from the given value, expressed in standard whole-number units.
301#[inline(always)]
302#[must_use]
303pub fn decode_quantity(value: u64) -> Quantity {
304    Quantity::from(value)
305}
306
307/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
308#[inline(always)]
309#[must_use]
310pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
311    match value {
312        i64::MAX => None,
313        _ => Some(Quantity::from(value)),
314    }
315}
316
317/// Decodes a multiplier from the given value, expressed in units of 1e-9.
318/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
319///
320/// # Errors
321///
322/// Returns an error if value is negative (invalid multiplier).
323pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
324    match value {
325        0 | i64::MAX => Ok(Quantity::from(1)),
326        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
327        v => {
328            // Work in integers: v is fixed-point with 9 fractional digits.
329            // Build a canonical decimal string without floating-point.
330            let abs = v as u128;
331
332            const SCALE: u128 = 1_000_000_000;
333            let int_part = abs / SCALE;
334            let frac_part = abs % SCALE;
335
336            // Format fractional part with exactly 9 digits, then trim trailing zeros
337            // to keep a canonical representation.
338            if frac_part == 0 {
339                // Pure integer
340                Ok(Quantity::from(int_part as u64))
341            } else {
342                let mut frac_str = format!("{frac_part:09}");
343                while frac_str.ends_with('0') {
344                    frac_str.pop();
345                }
346                let s = format!("{int_part}.{frac_str}");
347                Ok(Quantity::from(s))
348            }
349        }
350    }
351}
352
353/// Decodes a lot size from the given value, expressed in standard whole-number units.
354#[inline(always)]
355#[must_use]
356pub fn decode_lot_size(value: i32) -> Quantity {
357    match value {
358        0 | i32::MAX => Quantity::from(1),
359        value => Quantity::from(value),
360    }
361}
362
363#[inline(always)]
364#[must_use]
365fn is_trade_msg(action: c_char) -> bool {
366    action as u8 as char == 'T'
367}
368
369/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
370///
371/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
372/// whether the message represents an order book update or a trade execution.
373///
374/// # Errors
375///
376/// Returns an error if decoding the MBO message fails.
377pub fn decode_mbo_msg(
378    msg: &dbn::MboMsg,
379    instrument_id: InstrumentId,
380    price_precision: u8,
381    ts_init: Option<UnixNanos>,
382    include_trades: bool,
383) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
384    let side = parse_order_side(msg.side);
385    if is_trade_msg(msg.action) {
386        if include_trades && msg.size > 0 {
387            let price = decode_price(msg.price, price_precision);
388            let size = decode_quantity(msg.size as u64);
389            let aggressor_side = parse_aggressor_side(msg.side);
390            let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
391            let ts_event = msg.ts_recv.into();
392            let ts_init = ts_init.unwrap_or(ts_event);
393
394            let trade = TradeTick::new(
395                instrument_id,
396                price,
397                size,
398                aggressor_side,
399                trade_id,
400                ts_event,
401                ts_init,
402            );
403            return Ok((None, Some(trade)));
404        }
405
406        return Ok((None, None));
407    }
408
409    let action = parse_book_action(msg.action)?;
410    let (raw_price, precision) = decode_raw_price_with_precision(msg.price, price_precision);
411    let price = Price::from_raw(raw_price, precision);
412    let size = decode_quantity(msg.size as u64);
413    let order = BookOrder::new(side, price, size, msg.order_id);
414
415    let ts_event = msg.ts_recv.into();
416    let ts_init = ts_init.unwrap_or(ts_event);
417
418    let delta = OrderBookDelta::new(
419        instrument_id,
420        action,
421        order,
422        msg.flags.raw(),
423        msg.sequence.into(),
424        ts_event,
425        ts_init,
426    );
427
428    Ok((Some(delta), None))
429}
430
431/// Decodes a Databento Trade message into a `TradeTick`.
432///
433/// # Errors
434///
435/// Returns an error if decoding the Trade message fails.
436pub fn decode_trade_msg(
437    msg: &dbn::TradeMsg,
438    instrument_id: InstrumentId,
439    price_precision: u8,
440    ts_init: Option<UnixNanos>,
441) -> anyhow::Result<TradeTick> {
442    let ts_event = msg.ts_recv.into();
443    let ts_init = ts_init.unwrap_or(ts_event);
444
445    let trade = TradeTick::new(
446        instrument_id,
447        decode_price(msg.price, price_precision),
448        decode_quantity(msg.size as u64),
449        parse_aggressor_side(msg.side),
450        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
451        ts_event,
452        ts_init,
453    );
454
455    Ok(trade)
456}
457
458/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
459///
460/// # Errors
461///
462/// Returns an error if decoding the TBBO message fails.
463pub fn decode_tbbo_msg(
464    msg: &dbn::TbboMsg,
465    instrument_id: InstrumentId,
466    price_precision: u8,
467    ts_init: Option<UnixNanos>,
468) -> anyhow::Result<(QuoteTick, TradeTick)> {
469    let top_level = &msg.levels[0];
470    let ts_event = msg.ts_recv.into();
471    let ts_init = ts_init.unwrap_or(ts_event);
472
473    let quote = QuoteTick::new(
474        instrument_id,
475        decode_price(top_level.bid_px, price_precision),
476        decode_price(top_level.ask_px, price_precision),
477        decode_quantity(top_level.bid_sz as u64),
478        decode_quantity(top_level.ask_sz as u64),
479        ts_event,
480        ts_init,
481    );
482
483    let trade = TradeTick::new(
484        instrument_id,
485        decode_price(msg.price, price_precision),
486        decode_quantity(msg.size as u64),
487        parse_aggressor_side(msg.side),
488        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
489        ts_event,
490        ts_init,
491    );
492
493    Ok((quote, trade))
494}
495
496/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
497///
498/// # Errors
499///
500/// Returns an error if decoding the MBP1 message fails.
501pub fn decode_mbp1_msg(
502    msg: &dbn::Mbp1Msg,
503    instrument_id: InstrumentId,
504    price_precision: u8,
505    ts_init: Option<UnixNanos>,
506    include_trades: bool,
507) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
508    let top_level = &msg.levels[0];
509    let ts_event = msg.ts_recv.into();
510    let ts_init = ts_init.unwrap_or(ts_event);
511
512    let quote = QuoteTick::new(
513        instrument_id,
514        decode_price(top_level.bid_px, price_precision),
515        decode_price(top_level.ask_px, price_precision),
516        decode_quantity(top_level.bid_sz as u64),
517        decode_quantity(top_level.ask_sz as u64),
518        ts_event,
519        ts_init,
520    );
521
522    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
523        Some(TradeTick::new(
524            instrument_id,
525            decode_price(msg.price, price_precision),
526            decode_quantity(msg.size as u64),
527            parse_aggressor_side(msg.side),
528            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
529            ts_event,
530            ts_init,
531        ))
532    } else {
533        None
534    };
535
536    Ok((quote, maybe_trade))
537}
538
539/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
540///
541/// # Errors
542///
543/// Returns an error if decoding the BBO message fails.
544pub fn decode_bbo_msg(
545    msg: &dbn::BboMsg,
546    instrument_id: InstrumentId,
547    price_precision: u8,
548    ts_init: Option<UnixNanos>,
549) -> anyhow::Result<QuoteTick> {
550    let top_level = &msg.levels[0];
551    let ts_event = msg.ts_recv.into();
552    let ts_init = ts_init.unwrap_or(ts_event);
553
554    let quote = QuoteTick::new(
555        instrument_id,
556        decode_price(top_level.bid_px, price_precision),
557        decode_price(top_level.ask_px, price_precision),
558        decode_quantity(top_level.bid_sz as u64),
559        decode_quantity(top_level.ask_sz as u64),
560        ts_event,
561        ts_init,
562    );
563
564    Ok(quote)
565}
566
567/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
568///
569/// # Errors
570///
571/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
572pub fn decode_mbp10_msg(
573    msg: &dbn::Mbp10Msg,
574    instrument_id: InstrumentId,
575    price_precision: u8,
576    ts_init: Option<UnixNanos>,
577) -> anyhow::Result<OrderBookDepth10> {
578    let mut bids = Vec::with_capacity(DEPTH10_LEN);
579    let mut asks = Vec::with_capacity(DEPTH10_LEN);
580    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
581    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
582
583    for level in &msg.levels {
584        let bid_order = BookOrder::new(
585            OrderSide::Buy,
586            decode_price(level.bid_px, price_precision),
587            decode_quantity(level.bid_sz as u64),
588            0,
589        );
590
591        let ask_order = BookOrder::new(
592            OrderSide::Sell,
593            decode_price(level.ask_px, price_precision),
594            decode_quantity(level.ask_sz as u64),
595            0,
596        );
597
598        bids.push(bid_order);
599        asks.push(ask_order);
600        bid_counts.push(level.bid_ct);
601        ask_counts.push(level.ask_ct);
602    }
603
604    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
605        anyhow::anyhow!(
606            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
607            v.len()
608        )
609    })?;
610
611    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
612        anyhow::anyhow!(
613            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
614            v.len()
615        )
616    })?;
617
618    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
619        anyhow::anyhow!(
620            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
621            v.len()
622        )
623    })?;
624
625    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
626        anyhow::anyhow!(
627            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
628            v.len()
629        )
630    })?;
631
632    let ts_event = msg.ts_recv.into();
633    let ts_init = ts_init.unwrap_or(ts_event);
634
635    let depth = OrderBookDepth10::new(
636        instrument_id,
637        bids,
638        asks,
639        bid_counts,
640        ask_counts,
641        msg.flags.raw(),
642        msg.sequence.into(),
643        ts_event,
644        ts_init,
645    );
646
647    Ok(depth)
648}
649
650/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
651///
652/// Returns a tuple containing a `QuoteTick` and an optional `TradeTick` based on the message content.
653///
654/// # Errors
655///
656/// Returns an error if decoding the CMBP1 message fails.
657pub fn decode_cmbp1_msg(
658    msg: &dbn::Cmbp1Msg,
659    instrument_id: InstrumentId,
660    price_precision: u8,
661    ts_init: Option<UnixNanos>,
662    include_trades: bool,
663) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
664    let top_level = &msg.levels[0];
665    let ts_event = msg.ts_recv.into();
666    let ts_init = ts_init.unwrap_or(ts_event);
667
668    let quote = QuoteTick::new(
669        instrument_id,
670        decode_price(top_level.bid_px, price_precision),
671        decode_price(top_level.ask_px, price_precision),
672        decode_quantity(top_level.bid_sz as u64),
673        decode_quantity(top_level.ask_sz as u64),
674        ts_event,
675        ts_init,
676    );
677
678    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
679        // Use UUID4 for trade ID as CMBP1 doesn't have a sequence field
680        Some(TradeTick::new(
681            instrument_id,
682            decode_price(msg.price, price_precision),
683            decode_quantity(msg.size as u64),
684            parse_aggressor_side(msg.side),
685            TradeId::new(UUID4::new().to_string()),
686            ts_event,
687            ts_init,
688        ))
689    } else {
690        None
691    };
692
693    Ok((quote, maybe_trade))
694}
695
696/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
697///
698/// Returns a `QuoteTick` representing the consolidated best bid and offer.
699///
700/// # Errors
701///
702/// Returns an error if decoding the CBBO message fails.
703pub fn decode_cbbo_msg(
704    msg: &dbn::CbboMsg,
705    instrument_id: InstrumentId,
706    price_precision: u8,
707    ts_init: Option<UnixNanos>,
708) -> anyhow::Result<QuoteTick> {
709    let top_level = &msg.levels[0];
710    let ts_event = msg.ts_recv.into();
711    let ts_init = ts_init.unwrap_or(ts_event);
712
713    let quote = QuoteTick::new(
714        instrument_id,
715        decode_price(top_level.bid_px, price_precision),
716        decode_price(top_level.ask_px, price_precision),
717        decode_quantity(top_level.bid_sz as u64),
718        decode_quantity(top_level.ask_sz as u64),
719        ts_event,
720        ts_init,
721    );
722
723    Ok(quote)
724}
725
726/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
727///
728/// Returns a tuple containing both a `QuoteTick` and a `TradeTick`.
729///
730/// # Errors
731///
732/// Returns an error if decoding the TCBBO message fails.
733pub fn decode_tcbbo_msg(
734    msg: &dbn::CbboMsg,
735    instrument_id: InstrumentId,
736    price_precision: u8,
737    ts_init: Option<UnixNanos>,
738) -> anyhow::Result<(QuoteTick, TradeTick)> {
739    let top_level = &msg.levels[0];
740    let ts_event = msg.ts_recv.into();
741    let ts_init = ts_init.unwrap_or(ts_event);
742
743    let quote = QuoteTick::new(
744        instrument_id,
745        decode_price(top_level.bid_px, price_precision),
746        decode_price(top_level.ask_px, price_precision),
747        decode_quantity(top_level.bid_sz as u64),
748        decode_quantity(top_level.ask_sz as u64),
749        ts_event,
750        ts_init,
751    );
752
753    // Use UUID4 for trade ID as TCBBO doesn't have a sequence field
754    let trade = TradeTick::new(
755        instrument_id,
756        decode_price(msg.price, price_precision),
757        decode_quantity(msg.size as u64),
758        parse_aggressor_side(msg.side),
759        TradeId::new(UUID4::new().to_string()),
760        ts_event,
761        ts_init,
762    );
763
764    Ok((quote, trade))
765}
766
767/// # Errors
768///
769/// Returns an error if `rtype` is not a supported bar aggregation.
770pub fn decode_bar_type(
771    msg: &dbn::OhlcvMsg,
772    instrument_id: InstrumentId,
773) -> anyhow::Result<BarType> {
774    let bar_type = match msg.hd.rtype {
775        32 => {
776            // ohlcv-1s
777            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
778        }
779        33 => {
780            // ohlcv-1m
781            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
782        }
783        34 => {
784            // ohlcv-1h
785            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
786        }
787        35 => {
788            // ohlcv-1d
789            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
790        }
791        36 => {
792            // ohlcv-eod
793            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
794        }
795        _ => anyhow::bail!(
796            "`rtype` is not a supported bar aggregation, was {}",
797            msg.hd.rtype
798        ),
799    };
800
801    Ok(bar_type)
802}
803
804/// # Errors
805///
806/// Returns an error if `rtype` is not a supported bar aggregation.
807pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
808    let adjustment = match msg.hd.rtype {
809        32 => {
810            // ohlcv-1s
811            BAR_CLOSE_ADJUSTMENT_1S
812        }
813        33 => {
814            // ohlcv-1m
815            BAR_CLOSE_ADJUSTMENT_1M
816        }
817        34 => {
818            // ohlcv-1h
819            BAR_CLOSE_ADJUSTMENT_1H
820        }
821        35 | 36 => {
822            // ohlcv-1d and ohlcv-eod
823            BAR_CLOSE_ADJUSTMENT_1D
824        }
825        _ => anyhow::bail!(
826            "`rtype` is not a supported bar aggregation, was {}",
827            msg.hd.rtype
828        ),
829    };
830
831    Ok(adjustment.into())
832}
833
834/// # Errors
835///
836/// Returns an error if decoding the OHLCV message fails.
837pub fn decode_ohlcv_msg(
838    msg: &dbn::OhlcvMsg,
839    instrument_id: InstrumentId,
840    price_precision: u8,
841    ts_init: Option<UnixNanos>,
842    timestamp_on_close: bool,
843) -> anyhow::Result<Bar> {
844    let bar_type = decode_bar_type(msg, instrument_id)?;
845    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
846
847    let ts_event_raw = msg.hd.ts_event.into();
848    let ts_close = ts_event_raw + ts_event_adjustment;
849    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
850
851    let ts_event = if timestamp_on_close {
852        ts_close
853    } else {
854        ts_event_raw
855    };
856
857    let bar = Bar::new(
858        bar_type,
859        decode_price(msg.open, price_precision),
860        decode_price(msg.high, price_precision),
861        decode_price(msg.low, price_precision),
862        decode_price(msg.close, price_precision),
863        decode_quantity(msg.volume),
864        ts_event,
865        ts_init,
866    );
867
868    Ok(bar)
869}
870
871/// Decodes a Databento status message into an `InstrumentStatus` event.
872///
873/// # Errors
874///
875/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
876pub fn decode_status_msg(
877    msg: &dbn::StatusMsg,
878    instrument_id: InstrumentId,
879    ts_init: Option<UnixNanos>,
880) -> anyhow::Result<InstrumentStatus> {
881    let ts_event = msg.hd.ts_event.into();
882    let ts_init = ts_init.unwrap_or(ts_event);
883
884    let action = MarketStatusAction::from_u16(msg.action)
885        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
886
887    let status = InstrumentStatus::new(
888        instrument_id,
889        action,
890        ts_event,
891        ts_init,
892        parse_status_reason(msg.reason)?,
893        parse_status_trading_event(msg.trading_event)?,
894        parse_optional_bool(msg.is_trading),
895        parse_optional_bool(msg.is_quoting),
896        parse_optional_bool(msg.is_short_sell_restricted),
897    );
898
899    Ok(status)
900}
901
902/// # Errors
903///
904/// Returns an error if decoding the record type fails or encounters unsupported message.
905pub fn decode_record(
906    record: &dbn::RecordRef,
907    instrument_id: InstrumentId,
908    price_precision: u8,
909    ts_init: Option<UnixNanos>,
910    include_trades: bool,
911    bars_timestamp_on_close: bool,
912) -> anyhow::Result<(Option<Data>, Option<Data>)> {
913    // Note: TBBO and TCBBO messages provide both quotes and trades.
914    // TBBO is handled explicitly below, while TCBBO is handled by
915    // the CbboMsg branch based on whether it has trade data.
916    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
917        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
918        let result = decode_mbo_msg(
919            msg,
920            instrument_id,
921            price_precision,
922            Some(ts_init),
923            include_trades,
924        )?;
925        match result {
926            (Some(delta), None) => (Some(Data::Delta(delta)), None),
927            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
928            (None, None) => (None, None),
929            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
930        }
931    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
932        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
933        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
934        (Some(Data::Trade(trade)), None)
935    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
936        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
937        let result = decode_mbp1_msg(
938            msg,
939            instrument_id,
940            price_precision,
941            Some(ts_init),
942            include_trades,
943        )?;
944        match result {
945            (quote, None) => (Some(Data::Quote(quote)), None),
946            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
947        }
948    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
949        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
950        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
951        (Some(Data::Quote(quote)), None)
952    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
953        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
954        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
955        (Some(Data::Quote(quote)), None)
956    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
957        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
958        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
959        (Some(Data::from(depth)), None)
960    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
961        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
962        // it will be set correctly in decode_ohlcv_msg instead
963        let bar = decode_ohlcv_msg(
964            msg,
965            instrument_id,
966            price_precision,
967            ts_init,
968            bars_timestamp_on_close,
969        )?;
970        (Some(Data::Bar(bar)), None)
971    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
972        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
973        let result = decode_cmbp1_msg(
974            msg,
975            instrument_id,
976            price_precision,
977            Some(ts_init),
978            include_trades,
979        )?;
980        match result {
981            (quote, None) => (Some(Data::Quote(quote)), None),
982            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
983        }
984    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
985        // TBBO always has both quote and trade
986        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
987        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
988        (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
989    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
990        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
991        if msg.price != i64::MAX && msg.size > 0 {
992            // TCBBO - has both quote and trade
993            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
994            let (quote, trade) =
995                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
996            (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
997        } else {
998            // Regular CBBO - quote only
999            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1000            let quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1001            (Some(Data::Quote(quote)), None)
1002        }
1003    } else {
1004        anyhow::bail!("DBN message type is not currently supported")
1005    };
1006
1007    Ok(result)
1008}
1009
1010const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1011    match ts_init {
1012        Some(ts_init) => ts_init,
1013        None => msg_timestamp,
1014    }
1015}
1016
1017/// # Errors
1018///
1019/// Returns an error if decoding the `InstrumentDefMsg` fails.
1020pub fn decode_instrument_def_msg(
1021    msg: &dbn::InstrumentDefMsg,
1022    instrument_id: InstrumentId,
1023    ts_init: Option<UnixNanos>,
1024) -> anyhow::Result<InstrumentAny> {
1025    match msg.instrument_class as u8 as char {
1026        'K' => Ok(InstrumentAny::Equity(decode_equity(
1027            msg,
1028            instrument_id,
1029            ts_init,
1030        )?)),
1031        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1032            msg,
1033            instrument_id,
1034            ts_init,
1035        )?)),
1036        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1037            msg,
1038            instrument_id,
1039            ts_init,
1040        )?)),
1041        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1042            msg,
1043            instrument_id,
1044            ts_init,
1045        )?)),
1046        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1047            msg,
1048            instrument_id,
1049            ts_init,
1050        )?)),
1051        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1052        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1053        _ => anyhow::bail!(
1054            "Unsupported `instrument_class` '{}'",
1055            msg.instrument_class as u8 as char
1056        ),
1057    }
1058}
1059
1060/// Decodes a Databento instrument definition message into an `Equity` instrument.
1061///
1062/// # Errors
1063///
1064/// Returns an error if parsing or constructing `Equity` fails.
1065pub fn decode_equity(
1066    msg: &dbn::InstrumentDefMsg,
1067    instrument_id: InstrumentId,
1068    ts_init: Option<UnixNanos>,
1069) -> anyhow::Result<Equity> {
1070    let currency = parse_currency_or_usd_default(msg.currency());
1071    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1072    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1073    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1074    let ts_init = ts_init.unwrap_or(ts_event);
1075
1076    Ok(Equity::new(
1077        instrument_id,
1078        instrument_id.symbol,
1079        None, // No ISIN available yet
1080        currency,
1081        price_increment.precision,
1082        price_increment,
1083        Some(lot_size),
1084        None, // TBD
1085        None, // TBD
1086        None, // TBD
1087        None, // TBD
1088        None, // TBD
1089        None, // TBD
1090        None, // TBD
1091        None, // TBD
1092        ts_event,
1093        ts_init,
1094    ))
1095}
1096
1097/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1098///
1099/// # Errors
1100///
1101/// Returns an error if parsing or constructing `FuturesContract` fails.
1102pub fn decode_futures_contract(
1103    msg: &dbn::InstrumentDefMsg,
1104    instrument_id: InstrumentId,
1105    ts_init: Option<UnixNanos>,
1106) -> anyhow::Result<FuturesContract> {
1107    let currency = parse_currency_or_usd_default(msg.currency());
1108    let exchange = Ustr::from(msg.exchange()?);
1109    let underlying = Ustr::from(msg.asset()?);
1110    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1111    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1112    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1113    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1114    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1115    let ts_init = ts_init.unwrap_or(ts_event);
1116
1117    FuturesContract::new_checked(
1118        instrument_id,
1119        instrument_id.symbol,
1120        asset_class.unwrap_or(AssetClass::Commodity),
1121        Some(exchange),
1122        underlying,
1123        msg.activation.into(),
1124        msg.expiration.into(),
1125        currency,
1126        price_increment.precision,
1127        price_increment,
1128        multiplier,
1129        lot_size,
1130        None, // TBD
1131        None, // TBD
1132        None, // TBD
1133        None, // TBD
1134        None, // TBD
1135        None, // TBD
1136        None, // TBD
1137        None, // TBD
1138        ts_event,
1139        ts_init,
1140    )
1141}
1142
1143/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1144///
1145/// # Errors
1146///
1147/// Returns an error if parsing or constructing `FuturesSpread` fails.
1148pub fn decode_futures_spread(
1149    msg: &dbn::InstrumentDefMsg,
1150    instrument_id: InstrumentId,
1151    ts_init: Option<UnixNanos>,
1152) -> anyhow::Result<FuturesSpread> {
1153    let exchange = Ustr::from(msg.exchange()?);
1154    let underlying = Ustr::from(msg.asset()?);
1155    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1156    let strategy_type = Ustr::from(msg.secsubtype()?);
1157    let currency = parse_currency_or_usd_default(msg.currency());
1158    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1159    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1160    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1161    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1162    let ts_init = ts_init.unwrap_or(ts_event);
1163
1164    FuturesSpread::new_checked(
1165        instrument_id,
1166        instrument_id.symbol,
1167        asset_class.unwrap_or(AssetClass::Commodity),
1168        Some(exchange),
1169        underlying,
1170        strategy_type,
1171        msg.activation.into(),
1172        msg.expiration.into(),
1173        currency,
1174        price_increment.precision,
1175        price_increment,
1176        multiplier,
1177        lot_size,
1178        None, // TBD
1179        None, // TBD
1180        None, // TBD
1181        None, // TBD
1182        None, // TBD
1183        None, // TBD
1184        None, // TBD
1185        None, // TBD
1186        ts_event,
1187        ts_init,
1188    )
1189}
1190
1191/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1192///
1193/// # Errors
1194///
1195/// Returns an error if parsing or constructing `OptionContract` fails.
1196pub fn decode_option_contract(
1197    msg: &dbn::InstrumentDefMsg,
1198    instrument_id: InstrumentId,
1199    ts_init: Option<UnixNanos>,
1200) -> anyhow::Result<OptionContract> {
1201    let currency = parse_currency_or_usd_default(msg.currency());
1202    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1203    let exchange = Ustr::from(msg.exchange()?);
1204    let underlying = Ustr::from(msg.underlying()?);
1205    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1206        Some(AssetClass::Equity)
1207    } else {
1208        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1209        asset_class
1210    };
1211    let option_kind = parse_option_kind(msg.instrument_class)?;
1212    let strike_price = Price::from_raw(
1213        decode_raw_price_i64(msg.strike_price),
1214        strike_price_currency.precision,
1215    );
1216    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1217    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1218    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1219    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1220    let ts_init = ts_init.unwrap_or(ts_event);
1221
1222    OptionContract::new_checked(
1223        instrument_id,
1224        instrument_id.symbol,
1225        asset_class_opt.unwrap_or(AssetClass::Commodity),
1226        Some(exchange),
1227        underlying,
1228        option_kind,
1229        strike_price,
1230        currency,
1231        msg.activation.into(),
1232        msg.expiration.into(),
1233        price_increment.precision,
1234        price_increment,
1235        multiplier,
1236        lot_size,
1237        None, // TBD
1238        None, // TBD
1239        None, // TBD
1240        None, // TBD
1241        None, // TBD
1242        None, // TBD
1243        None, // TBD
1244        None, // TBD
1245        ts_event,
1246        ts_init,
1247    )
1248}
1249
1250/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1251///
1252/// # Errors
1253///
1254/// Returns an error if parsing or constructing `OptionSpread` fails.
1255pub fn decode_option_spread(
1256    msg: &dbn::InstrumentDefMsg,
1257    instrument_id: InstrumentId,
1258    ts_init: Option<UnixNanos>,
1259) -> anyhow::Result<OptionSpread> {
1260    let exchange = Ustr::from(msg.exchange()?);
1261    let underlying = Ustr::from(msg.underlying()?);
1262    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1263        Some(AssetClass::Equity)
1264    } else {
1265        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1266        asset_class
1267    };
1268    let strategy_type = Ustr::from(msg.secsubtype()?);
1269    let currency = parse_currency_or_usd_default(msg.currency());
1270    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1271    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1272    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1273    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1274    let ts_init = ts_init.unwrap_or(ts_event);
1275
1276    OptionSpread::new_checked(
1277        instrument_id,
1278        instrument_id.symbol,
1279        asset_class_opt.unwrap_or(AssetClass::Commodity),
1280        Some(exchange),
1281        underlying,
1282        strategy_type,
1283        msg.activation.into(),
1284        msg.expiration.into(),
1285        currency,
1286        price_increment.precision,
1287        price_increment,
1288        multiplier,
1289        lot_size,
1290        None, // TBD
1291        None, // TBD
1292        None, // TBD
1293        None, // TBD
1294        None, // TBD
1295        None, // TBD
1296        None, // TBD
1297        None, // TBD
1298        ts_event,
1299        ts_init,
1300    )
1301}
1302
1303/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1304///
1305/// # Errors
1306///
1307/// Returns an error if constructing `DatabentoImbalance` fails.
1308pub fn decode_imbalance_msg(
1309    msg: &dbn::ImbalanceMsg,
1310    instrument_id: InstrumentId,
1311    price_precision: u8,
1312    ts_init: Option<UnixNanos>,
1313) -> anyhow::Result<DatabentoImbalance> {
1314    let ts_event = msg.ts_recv.into();
1315    let ts_init = ts_init.unwrap_or(ts_event);
1316
1317    Ok(DatabentoImbalance::new(
1318        instrument_id,
1319        decode_price(msg.ref_price, price_precision),
1320        decode_price(msg.cont_book_clr_price, price_precision),
1321        decode_price(msg.auct_interest_clr_price, price_precision),
1322        Quantity::new(f64::from(msg.paired_qty), 0),
1323        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1324        parse_order_side(msg.side),
1325        msg.significant_imbalance as c_char,
1326        msg.hd.ts_event.into(),
1327        ts_event,
1328        ts_init,
1329    ))
1330}
1331
1332/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1333///
1334/// # Errors
1335///
1336/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1337/// `msg.update_action` is not a valid enum variant.
1338pub fn decode_statistics_msg(
1339    msg: &dbn::StatMsg,
1340    instrument_id: InstrumentId,
1341    price_precision: u8,
1342    ts_init: Option<UnixNanos>,
1343) -> anyhow::Result<DatabentoStatistics> {
1344    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1345        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1346    let update_action =
1347        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1348            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1349        })?;
1350    let ts_event = msg.ts_recv.into();
1351    let ts_init = ts_init.unwrap_or(ts_event);
1352
1353    Ok(DatabentoStatistics::new(
1354        instrument_id,
1355        stat_type,
1356        update_action,
1357        decode_optional_price(msg.price, price_precision),
1358        decode_optional_quantity(msg.quantity),
1359        msg.channel_id,
1360        msg.stat_flags,
1361        msg.sequence,
1362        msg.ts_ref.into(),
1363        msg.ts_in_delta,
1364        msg.hd.ts_event.into(),
1365        ts_event,
1366        ts_init,
1367    ))
1368}
1369
1370////////////////////////////////////////////////////////////////////////////////
1371// Tests
1372////////////////////////////////////////////////////////////////////////////////
1373#[cfg(test)]
1374mod tests {
1375    use std::path::{Path, PathBuf};
1376
1377    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1378    use fallible_streaming_iterator::FallibleStreamingIterator;
1379    use nautilus_model::instruments::Instrument;
1380    use rstest::*;
1381
1382    use super::*;
1383
1384    fn test_data_path() -> PathBuf {
1385        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1386    }
1387
1388    #[rstest]
1389    #[case('Y' as c_char, Some(true))]
1390    #[case('N' as c_char, Some(false))]
1391    #[case('X' as c_char, None)]
1392    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1393        assert_eq!(parse_optional_bool(input), expected);
1394    }
1395
1396    #[rstest]
1397    #[case('A' as c_char, OrderSide::Sell)]
1398    #[case('B' as c_char, OrderSide::Buy)]
1399    #[case('X' as c_char, OrderSide::NoOrderSide)]
1400    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1401        assert_eq!(parse_order_side(input), expected);
1402    }
1403
1404    #[rstest]
1405    #[case('A' as c_char, AggressorSide::Seller)]
1406    #[case('B' as c_char, AggressorSide::Buyer)]
1407    #[case('X' as c_char, AggressorSide::NoAggressor)]
1408    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1409        assert_eq!(parse_aggressor_side(input), expected);
1410    }
1411
1412    #[rstest]
1413    #[case('T' as c_char, true)]
1414    #[case('A' as c_char, false)]
1415    #[case('C' as c_char, false)]
1416    #[case('F' as c_char, false)]
1417    #[case('M' as c_char, false)]
1418    #[case('R' as c_char, false)]
1419    fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1420        assert_eq!(is_trade_msg(action), expected);
1421    }
1422
1423    #[rstest]
1424    #[case('A' as c_char, Ok(BookAction::Add))]
1425    #[case('C' as c_char, Ok(BookAction::Delete))]
1426    #[case('F' as c_char, Ok(BookAction::Update))]
1427    #[case('M' as c_char, Ok(BookAction::Update))]
1428    #[case('R' as c_char, Ok(BookAction::Clear))]
1429    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1430    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1431        match parse_book_action(input) {
1432            Ok(action) => assert_eq!(Ok(action), expected),
1433            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1434        }
1435    }
1436
1437    #[rstest]
1438    #[case('C' as c_char, Ok(OptionKind::Call))]
1439    #[case('P' as c_char, Ok(OptionKind::Put))]
1440    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1441    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1442        match parse_option_kind(input) {
1443            Ok(kind) => assert_eq!(Ok(kind), expected),
1444            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1445        }
1446    }
1447
1448    #[rstest]
1449    #[case(Ok("USD"), Currency::USD())]
1450    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1451    #[case(Ok(""), Currency::USD())]
1452    #[case(Err("Error"), Currency::USD())]
1453    fn test_parse_currency_or_usd_default(
1454        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1455        #[case] expected: Currency,
1456    ) {
1457        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1458        assert_eq!(actual, expected);
1459    }
1460
1461    #[rstest]
1462    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1463    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1464    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1465    #[case("XXX", Ok((None, None)))]
1466    #[case("D", Err("Value string is too short"))]
1467    fn test_parse_cfi_iso10926(
1468        #[case] input: &str,
1469        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1470    ) {
1471        match parse_cfi_iso10926(input) {
1472            Ok(result) => assert_eq!(Ok(result), expected),
1473            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1474        }
1475    }
1476
1477    #[rstest]
1478    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1479    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1480    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1481    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1482        let actual = decode_price_increment(value, precision);
1483        assert_eq!(actual, expected);
1484    }
1485
1486    #[rstest]
1487    #[case(i64::MAX, 2, None)] // None for i64::MAX
1488    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1489    #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] // Arbitrary valid price
1490    fn test_decode_optional_price(
1491        #[case] value: i64,
1492        #[case] precision: u8,
1493        #[case] expected: Option<Price>,
1494    ) {
1495        let actual = decode_optional_price(value, precision);
1496        assert_eq!(actual, expected);
1497    }
1498
1499    #[rstest]
1500    #[case(i64::MAX, None)] // None for i32::MAX
1501    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1502    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1503    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1504        let actual = decode_optional_quantity(value);
1505        assert_eq!(actual, expected);
1506    }
1507
1508    #[rstest]
1509    #[case(0, Quantity::from(1))] // Default fallback for 0
1510    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1511    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1512    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1513    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1514    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1515    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1516    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1517    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1518    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1519        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1520    }
1521
1522    #[rstest]
1523    #[case(-1_500_000_000)] // Large negative value
1524    #[case(-1)] // Small negative value
1525    #[case(-999_999_999)] // Another negative value
1526    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1527        let result = decode_multiplier(raw);
1528        assert!(result.is_err());
1529        assert!(
1530            result
1531                .unwrap_err()
1532                .to_string()
1533                .contains("Invalid negative multiplier")
1534        );
1535    }
1536
1537    #[rstest]
1538    #[case(100, Quantity::from(100))]
1539    #[case(1000, Quantity::from(1000))]
1540    #[case(5, Quantity::from(5))]
1541    fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1542        assert_eq!(decode_quantity(value), expected);
1543    }
1544
1545    #[rstest]
1546    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1547    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1548    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1549    fn test_decode_price_increment(
1550        #[case] value: i64,
1551        #[case] precision: u8,
1552        #[case] expected: Price,
1553    ) {
1554        assert_eq!(decode_price_increment(value, precision), expected);
1555    }
1556
1557    #[rstest]
1558    #[case(0, Quantity::from(1))] // Default for 0
1559    #[case(i32::MAX, Quantity::from(1))] // Default for MAX
1560    #[case(100, Quantity::from(100))]
1561    #[case(1, Quantity::from(1))]
1562    #[case(1000, Quantity::from(1000))]
1563    fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1564        assert_eq!(decode_lot_size(value), expected);
1565    }
1566
1567    #[rstest]
1568    #[case(0, None)] // None for 0
1569    #[case(1, Some(Ustr::from("Scheduled")))]
1570    #[case(2, Some(Ustr::from("Surveillance intervention")))]
1571    #[case(3, Some(Ustr::from("Market event")))]
1572    #[case(10, Some(Ustr::from("Regulatory")))]
1573    #[case(30, Some(Ustr::from("News pending")))]
1574    #[case(40, Some(Ustr::from("Order imbalance")))]
1575    #[case(50, Some(Ustr::from("LULD pause")))]
1576    #[case(60, Some(Ustr::from("Operational")))]
1577    #[case(100, Some(Ustr::from("Corporate action")))]
1578    #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1579    fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1580        assert_eq!(parse_status_reason(value).unwrap(), expected);
1581    }
1582
1583    #[rstest]
1584    #[case(999)] // Invalid code
1585    fn test_parse_status_reason_invalid(#[case] value: u16) {
1586        assert!(parse_status_reason(value).is_err());
1587    }
1588
1589    #[rstest]
1590    #[case(0, None)] // None for 0
1591    #[case(1, Some(Ustr::from("No cancel")))]
1592    #[case(2, Some(Ustr::from("Change trading session")))]
1593    #[case(3, Some(Ustr::from("Implied matching on")))]
1594    #[case(4, Some(Ustr::from("Implied matching off")))]
1595    fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1596        assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1597    }
1598
1599    #[rstest]
1600    #[case(5)] // Invalid code
1601    #[case(100)] // Invalid code
1602    fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1603        assert!(parse_status_trading_event(value).is_err());
1604    }
1605
1606    #[rstest]
1607    fn test_decode_mbo_msg() {
1608        let path = test_data_path().join("test_data.mbo.dbn.zst");
1609        let mut dbn_stream = Decoder::from_zstd_file(path)
1610            .unwrap()
1611            .decode_stream::<dbn::MboMsg>();
1612        let msg = dbn_stream.next().unwrap().unwrap();
1613
1614        let instrument_id = InstrumentId::from("ESM4.GLBX");
1615        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1616        let delta = delta.unwrap();
1617
1618        assert_eq!(delta.instrument_id, instrument_id);
1619        assert_eq!(delta.action, BookAction::Delete);
1620        assert_eq!(delta.order.side, OrderSide::Sell);
1621        assert_eq!(delta.order.price, Price::from("3722.75"));
1622        assert_eq!(delta.order.size, Quantity::from("1"));
1623        assert_eq!(delta.order.order_id, 647_784_973_705);
1624        assert_eq!(delta.flags, 128);
1625        assert_eq!(delta.sequence, 1_170_352);
1626        assert_eq!(delta.ts_event, msg.ts_recv);
1627        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1628        assert_eq!(delta.ts_init, 0);
1629    }
1630
1631    #[rstest]
1632    fn test_decode_mbo_msg_clear_action() {
1633        // Create an MBO message with Clear action (action='R', side='N')
1634        let ts_recv = 1_609_160_400_000_000_000;
1635        let msg = dbn::MboMsg {
1636            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1637            order_id: 0,
1638            price: i64::MAX,
1639            size: 0,
1640            flags: dbn::FlagSet::empty(),
1641            channel_id: 0,
1642            action: 'R' as c_char,
1643            side: 'N' as c_char, // NoOrderSide for Clear
1644            ts_recv,
1645            ts_in_delta: 0,
1646            sequence: 1_000_000,
1647        };
1648
1649        let instrument_id = InstrumentId::from("ESM4.GLBX");
1650        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1651
1652        // Clear messages should produce OrderBookDelta, not TradeTick
1653        assert!(trade.is_none());
1654        let delta = delta.expect("Clear action should produce OrderBookDelta");
1655
1656        assert_eq!(delta.instrument_id, instrument_id);
1657        assert_eq!(delta.action, BookAction::Clear);
1658        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1659        assert_eq!(delta.order.size, Quantity::from("0"));
1660        assert_eq!(delta.order.order_id, 0);
1661        assert_eq!(delta.sequence, 1_000_000);
1662        assert_eq!(delta.ts_event, ts_recv);
1663        assert_eq!(delta.ts_init, 0);
1664        assert!(delta.order.price.is_undefined());
1665        assert_eq!(delta.order.price.precision, 0);
1666    }
1667
1668    #[rstest]
1669    fn test_decode_mbo_msg_price_undef_with_precision() {
1670        // Test that PRICE_UNDEF (i64::MAX) forces precision to 0 even when price_precision is non-zero
1671        let ts_recv = 1_609_160_400_000_000_000;
1672        let msg = dbn::MboMsg {
1673            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1674            order_id: 0,
1675            price: i64::MAX, // PRICE_UNDEF
1676            size: 0,
1677            flags: dbn::FlagSet::empty(),
1678            channel_id: 0,
1679            action: 'R' as c_char, // Clear
1680            side: 'N' as c_char,   // NoOrderSide
1681            ts_recv,
1682            ts_in_delta: 0,
1683            sequence: 0,
1684        };
1685
1686        let instrument_id = InstrumentId::from("ESM4.GLBX");
1687        let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1688        let delta = delta.unwrap();
1689
1690        assert!(delta.order.price.is_undefined());
1691        assert_eq!(delta.order.price.precision, 0);
1692        assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1693    }
1694
1695    #[rstest]
1696    fn test_decode_mbo_msg_no_order_side_update() {
1697        // MBO messages with NoOrderSide are now passed through to the book
1698        // The book will resolve the side from its cache using the order_id
1699        let ts_recv = 1_609_160_400_000_000_000;
1700        let msg = dbn::MboMsg {
1701            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1702            order_id: 123_456_789,
1703            price: 4_800_250_000_000, // $4800.25 with precision 2
1704            size: 1,
1705            flags: dbn::FlagSet::empty(),
1706            channel_id: 1,
1707            action: 'M' as c_char, // Modify/Update action
1708            side: 'N' as c_char,   // NoOrderSide
1709            ts_recv,
1710            ts_in_delta: 0,
1711            sequence: 1_000_000,
1712        };
1713
1714        let instrument_id = InstrumentId::from("ESM4.GLBX");
1715        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1716
1717        // Delta should be created with NoOrderSide (book will resolve it)
1718        assert!(delta.is_some());
1719        assert!(trade.is_none());
1720        let delta = delta.unwrap();
1721        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1722        assert_eq!(delta.order.order_id, 123_456_789);
1723        assert_eq!(delta.action, BookAction::Update);
1724    }
1725
1726    #[rstest]
1727    fn test_decode_mbp1_msg() {
1728        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1729        let mut dbn_stream = Decoder::from_zstd_file(path)
1730            .unwrap()
1731            .decode_stream::<dbn::Mbp1Msg>();
1732        let msg = dbn_stream.next().unwrap().unwrap();
1733
1734        let instrument_id = InstrumentId::from("ESM4.GLBX");
1735        let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1736
1737        assert_eq!(quote.instrument_id, instrument_id);
1738        assert_eq!(quote.bid_price, Price::from("3720.25"));
1739        assert_eq!(quote.ask_price, Price::from("3720.50"));
1740        assert_eq!(quote.bid_size, Quantity::from("24"));
1741        assert_eq!(quote.ask_size, Quantity::from("11"));
1742        assert_eq!(quote.ts_event, msg.ts_recv);
1743        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1744        assert_eq!(quote.ts_init, 0);
1745    }
1746
1747    #[rstest]
1748    fn test_decode_bbo_1s_msg() {
1749        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1750        let mut dbn_stream = Decoder::from_zstd_file(path)
1751            .unwrap()
1752            .decode_stream::<dbn::BboMsg>();
1753        let msg = dbn_stream.next().unwrap().unwrap();
1754
1755        let instrument_id = InstrumentId::from("ESM4.GLBX");
1756        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1757
1758        assert_eq!(quote.instrument_id, instrument_id);
1759        assert_eq!(quote.bid_price, Price::from("3702.25"));
1760        assert_eq!(quote.ask_price, Price::from("3702.75"));
1761        assert_eq!(quote.bid_size, Quantity::from("18"));
1762        assert_eq!(quote.ask_size, Quantity::from("13"));
1763        assert_eq!(quote.ts_event, msg.ts_recv);
1764        assert_eq!(quote.ts_event, 1609113600000000000);
1765        assert_eq!(quote.ts_init, 0);
1766    }
1767
1768    #[rstest]
1769    fn test_decode_bbo_1m_msg() {
1770        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1771        let mut dbn_stream = Decoder::from_zstd_file(path)
1772            .unwrap()
1773            .decode_stream::<dbn::BboMsg>();
1774        let msg = dbn_stream.next().unwrap().unwrap();
1775
1776        let instrument_id = InstrumentId::from("ESM4.GLBX");
1777        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1778
1779        assert_eq!(quote.instrument_id, instrument_id);
1780        assert_eq!(quote.bid_price, Price::from("3702.25"));
1781        assert_eq!(quote.ask_price, Price::from("3702.75"));
1782        assert_eq!(quote.bid_size, Quantity::from("18"));
1783        assert_eq!(quote.ask_size, Quantity::from("13"));
1784        assert_eq!(quote.ts_event, msg.ts_recv);
1785        assert_eq!(quote.ts_event, 1609113600000000000);
1786        assert_eq!(quote.ts_init, 0);
1787    }
1788
1789    #[rstest]
1790    fn test_decode_mbp10_msg() {
1791        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1792        let mut dbn_stream = Decoder::from_zstd_file(path)
1793            .unwrap()
1794            .decode_stream::<dbn::Mbp10Msg>();
1795        let msg = dbn_stream.next().unwrap().unwrap();
1796
1797        let instrument_id = InstrumentId::from("ESM4.GLBX");
1798        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1799
1800        assert_eq!(depth10.instrument_id, instrument_id);
1801        assert_eq!(depth10.bids.len(), 10);
1802        assert_eq!(depth10.asks.len(), 10);
1803        assert_eq!(depth10.bid_counts.len(), 10);
1804        assert_eq!(depth10.ask_counts.len(), 10);
1805        assert_eq!(depth10.flags, 128);
1806        assert_eq!(depth10.sequence, 1_170_352);
1807        assert_eq!(depth10.ts_event, msg.ts_recv);
1808        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1809        assert_eq!(depth10.ts_init, 0);
1810    }
1811
1812    #[rstest]
1813    fn test_decode_trade_msg() {
1814        let path = test_data_path().join("test_data.trades.dbn.zst");
1815        let mut dbn_stream = Decoder::from_zstd_file(path)
1816            .unwrap()
1817            .decode_stream::<dbn::TradeMsg>();
1818        let msg = dbn_stream.next().unwrap().unwrap();
1819
1820        let instrument_id = InstrumentId::from("ESM4.GLBX");
1821        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1822
1823        assert_eq!(trade.instrument_id, instrument_id);
1824        assert_eq!(trade.price, Price::from("3720.25"));
1825        assert_eq!(trade.size, Quantity::from("5"));
1826        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1827        assert_eq!(trade.trade_id.to_string(), "1170380");
1828        assert_eq!(trade.ts_event, msg.ts_recv);
1829        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1830        assert_eq!(trade.ts_init, 0);
1831    }
1832
1833    #[rstest]
1834    fn test_decode_tbbo_msg() {
1835        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1836        let mut dbn_stream = Decoder::from_zstd_file(path)
1837            .unwrap()
1838            .decode_stream::<dbn::Mbp1Msg>();
1839        let msg = dbn_stream.next().unwrap().unwrap();
1840
1841        let instrument_id = InstrumentId::from("ESM4.GLBX");
1842        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1843
1844        assert_eq!(quote.instrument_id, instrument_id);
1845        assert_eq!(quote.bid_price, Price::from("3720.25"));
1846        assert_eq!(quote.ask_price, Price::from("3720.50"));
1847        assert_eq!(quote.bid_size, Quantity::from("26"));
1848        assert_eq!(quote.ask_size, Quantity::from("7"));
1849        assert_eq!(quote.ts_event, msg.ts_recv);
1850        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1851        assert_eq!(quote.ts_init, 0);
1852
1853        assert_eq!(trade.instrument_id, instrument_id);
1854        assert_eq!(trade.price, Price::from("3720.25"));
1855        assert_eq!(trade.size, Quantity::from("5"));
1856        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1857        assert_eq!(trade.trade_id.to_string(), "1170380");
1858        assert_eq!(trade.ts_event, msg.ts_recv);
1859        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1860        assert_eq!(trade.ts_init, 0);
1861    }
1862
1863    #[rstest]
1864    fn test_decode_ohlcv_msg() {
1865        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1866        let mut dbn_stream = Decoder::from_zstd_file(path)
1867            .unwrap()
1868            .decode_stream::<dbn::OhlcvMsg>();
1869        let msg = dbn_stream.next().unwrap().unwrap();
1870
1871        let instrument_id = InstrumentId::from("ESM4.GLBX");
1872        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1873
1874        assert_eq!(
1875            bar.bar_type,
1876            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1877        );
1878        assert_eq!(bar.open, Price::from("372025.00"));
1879        assert_eq!(bar.high, Price::from("372050.00"));
1880        assert_eq!(bar.low, Price::from("372025.00"));
1881        assert_eq!(bar.close, Price::from("372050.00"));
1882        assert_eq!(bar.volume, Quantity::from("57"));
1883        assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); // timestamp_on_close=true
1884        assert_eq!(bar.ts_init, 0); // ts_init was Some(0)
1885    }
1886
1887    #[rstest]
1888    fn test_decode_definition_msg() {
1889        let path = test_data_path().join("test_data.definition.dbn.zst");
1890        let mut dbn_stream = Decoder::from_zstd_file(path)
1891            .unwrap()
1892            .decode_stream::<dbn::InstrumentDefMsg>();
1893        let msg = dbn_stream.next().unwrap().unwrap();
1894
1895        let instrument_id = InstrumentId::from("ESM4.GLBX");
1896        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1897
1898        assert!(result.is_ok());
1899        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1900    }
1901
1902    #[rstest]
1903    fn test_decode_status_msg() {
1904        let path = test_data_path().join("test_data.status.dbn.zst");
1905        let mut dbn_stream = Decoder::from_zstd_file(path)
1906            .unwrap()
1907            .decode_stream::<dbn::StatusMsg>();
1908        let msg = dbn_stream.next().unwrap().unwrap();
1909
1910        let instrument_id = InstrumentId::from("ESM4.GLBX");
1911        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1912
1913        assert_eq!(status.instrument_id, instrument_id);
1914        assert_eq!(status.action, MarketStatusAction::Trading);
1915        assert_eq!(status.ts_event, msg.hd.ts_event);
1916        assert_eq!(status.ts_init, 0);
1917        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1918        assert_eq!(status.trading_event, None);
1919        assert_eq!(status.is_trading, Some(true));
1920        assert_eq!(status.is_quoting, Some(true));
1921        assert_eq!(status.is_short_sell_restricted, None);
1922    }
1923
1924    #[rstest]
1925    fn test_decode_imbalance_msg() {
1926        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1927        let mut dbn_stream = Decoder::from_zstd_file(path)
1928            .unwrap()
1929            .decode_stream::<dbn::ImbalanceMsg>();
1930        let msg = dbn_stream.next().unwrap().unwrap();
1931
1932        let instrument_id = InstrumentId::from("ESM4.GLBX");
1933        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1934
1935        assert_eq!(imbalance.instrument_id, instrument_id);
1936        assert_eq!(imbalance.ref_price, Price::from("229.43"));
1937        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1938        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1939        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1940        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1941        assert_eq!(imbalance.side, OrderSide::Buy);
1942        assert_eq!(imbalance.significant_imbalance, 126);
1943        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1944        assert_eq!(imbalance.ts_recv, msg.ts_recv);
1945        assert_eq!(imbalance.ts_init, 0);
1946    }
1947
1948    #[rstest]
1949    fn test_decode_statistics_msg() {
1950        let path = test_data_path().join("test_data.statistics.dbn.zst");
1951        let mut dbn_stream = Decoder::from_zstd_file(path)
1952            .unwrap()
1953            .decode_stream::<dbn::StatMsg>();
1954        let msg = dbn_stream.next().unwrap().unwrap();
1955
1956        let instrument_id = InstrumentId::from("ESM4.GLBX");
1957        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1958
1959        assert_eq!(statistics.instrument_id, instrument_id);
1960        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1961        assert_eq!(
1962            statistics.update_action,
1963            DatabentoStatisticUpdateAction::Added
1964        );
1965        assert_eq!(statistics.price, Some(Price::from("100.00")));
1966        assert_eq!(statistics.quantity, None);
1967        assert_eq!(statistics.channel_id, 13);
1968        assert_eq!(statistics.stat_flags, 255);
1969        assert_eq!(statistics.sequence, 2);
1970        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1971        assert_eq!(statistics.ts_in_delta, 26961);
1972        assert_eq!(statistics.ts_event, msg.hd.ts_event);
1973        assert_eq!(statistics.ts_recv, msg.ts_recv);
1974        assert_eq!(statistics.ts_init, 0);
1975    }
1976
1977    #[rstest]
1978    fn test_decode_cmbp1_msg() {
1979        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1980        let mut dbn_stream = Decoder::from_zstd_file(path)
1981            .unwrap()
1982            .decode_stream::<dbn::Cmbp1Msg>();
1983        let msg = dbn_stream.next().unwrap().unwrap();
1984
1985        let instrument_id = InstrumentId::from("ESM4.GLBX");
1986        let (quote, trade) = decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1987
1988        assert_eq!(quote.instrument_id, instrument_id);
1989        assert!(quote.bid_price.raw > 0);
1990        assert!(quote.ask_price.raw > 0);
1991        assert!(quote.bid_size.raw > 0);
1992        assert!(quote.ask_size.raw > 0);
1993        assert_eq!(quote.ts_event, msg.ts_recv);
1994        assert_eq!(quote.ts_init, 0);
1995
1996        // Check if trade is present based on action
1997        if msg.action as u8 as char == 'T' {
1998            assert!(trade.is_some());
1999            let trade = trade.unwrap();
2000            assert_eq!(trade.instrument_id, instrument_id);
2001        } else {
2002            assert!(trade.is_none());
2003        }
2004    }
2005
2006    #[rstest]
2007    fn test_decode_cbbo_1s_msg() {
2008        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2009        let mut dbn_stream = Decoder::from_zstd_file(path)
2010            .unwrap()
2011            .decode_stream::<dbn::CbboMsg>();
2012        let msg = dbn_stream.next().unwrap().unwrap();
2013
2014        let instrument_id = InstrumentId::from("ESM4.GLBX");
2015        let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2016
2017        assert_eq!(quote.instrument_id, instrument_id);
2018        assert!(quote.bid_price.raw > 0);
2019        assert!(quote.ask_price.raw > 0);
2020        assert!(quote.bid_size.raw > 0);
2021        assert!(quote.ask_size.raw > 0);
2022        assert_eq!(quote.ts_event, msg.ts_recv);
2023        assert_eq!(quote.ts_init, 0);
2024    }
2025
2026    #[rstest]
2027    fn test_decode_mbp10_msg_with_all_levels() {
2028        let mut msg = dbn::Mbp10Msg::default();
2029        for i in 0..10 {
2030            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2031            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2032            msg.levels[i].bid_sz = 10 + i as u32;
2033            msg.levels[i].ask_sz = 10 + i as u32;
2034            msg.levels[i].bid_ct = 1 + i as u32;
2035            msg.levels[i].ask_ct = 1 + i as u32;
2036        }
2037        msg.ts_recv = 1_609_160_400_000_704_060;
2038
2039        let instrument_id = InstrumentId::from("TEST.VENUE");
2040        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2041
2042        assert!(result.is_ok());
2043        let depth = result.unwrap();
2044        assert_eq!(depth.bids.len(), 10);
2045        assert_eq!(depth.asks.len(), 10);
2046        assert_eq!(depth.bid_counts.len(), 10);
2047        assert_eq!(depth.ask_counts.len(), 10);
2048    }
2049
2050    #[rstest]
2051    fn test_array_conversion_error_handling() {
2052        let mut bids = Vec::new();
2053        let mut asks = Vec::new();
2054
2055        // Intentionally create fewer than DEPTH10_LEN elements
2056        for i in 0..5 {
2057            bids.push(BookOrder::new(
2058                OrderSide::Buy,
2059                Price::from(format!("{}.00", 100 - i)),
2060                Quantity::from(10),
2061                i as u64,
2062            ));
2063            asks.push(BookOrder::new(
2064                OrderSide::Sell,
2065                Price::from(format!("{}.00", 101 + i)),
2066                Quantity::from(10),
2067                i as u64,
2068            ));
2069        }
2070
2071        let result: Result<[BookOrder; DEPTH10_LEN], _> =
2072            bids.try_into().map_err(|v: Vec<BookOrder>| {
2073                anyhow::anyhow!(
2074                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2075                    v.len()
2076                )
2077            });
2078        assert!(result.is_err());
2079        assert!(
2080            result
2081                .unwrap_err()
2082                .to_string()
2083                .contains("Expected exactly 10 bid levels, received 5")
2084        );
2085    }
2086
2087    #[rstest]
2088    fn test_decode_tcbbo_msg() {
2089        // Use cbbo-1s as base since cbbo.dbn.zst was invalid
2090        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2091        let mut dbn_stream = Decoder::from_zstd_file(path)
2092            .unwrap()
2093            .decode_stream::<dbn::CbboMsg>();
2094        let msg = dbn_stream.next().unwrap().unwrap();
2095
2096        // Simulate TCBBO by adding trade data
2097        let mut tcbbo_msg = msg.clone();
2098        tcbbo_msg.price = 3702500000000;
2099        tcbbo_msg.size = 10;
2100
2101        let instrument_id = InstrumentId::from("ESM4.GLBX");
2102        let (quote, trade) =
2103            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2104
2105        assert_eq!(quote.instrument_id, instrument_id);
2106        assert!(quote.bid_price.raw > 0);
2107        assert!(quote.ask_price.raw > 0);
2108        assert!(quote.bid_size.raw > 0);
2109        assert!(quote.ask_size.raw > 0);
2110        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2111        assert_eq!(quote.ts_init, 0);
2112
2113        assert_eq!(trade.instrument_id, instrument_id);
2114        assert_eq!(trade.price, Price::from("3702.50"));
2115        assert_eq!(trade.size, Quantity::from(10));
2116        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2117        assert_eq!(trade.ts_init, 0);
2118    }
2119
2120    #[rstest]
2121    fn test_decode_bar_type() {
2122        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2123        let instrument_id = InstrumentId::from("ESM4.GLBX");
2124
2125        // Test 1-second bar
2126        msg.hd.rtype = 32;
2127        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2128        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2129
2130        // Test 1-minute bar
2131        msg.hd.rtype = 33;
2132        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2133        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2134
2135        // Test 1-hour bar
2136        msg.hd.rtype = 34;
2137        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2138        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2139
2140        // Test 1-day bar
2141        msg.hd.rtype = 35;
2142        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2143        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2144
2145        // Test unsupported rtype
2146        msg.hd.rtype = 99;
2147        let result = decode_bar_type(&msg, instrument_id);
2148        assert!(result.is_err());
2149    }
2150
2151    #[rstest]
2152    fn test_decode_ts_event_adjustment() {
2153        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2154
2155        // Test 1-second bar adjustment
2156        msg.hd.rtype = 32;
2157        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2158        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2159
2160        // Test 1-minute bar adjustment
2161        msg.hd.rtype = 33;
2162        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2163        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2164
2165        // Test 1-hour bar adjustment
2166        msg.hd.rtype = 34;
2167        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2168        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2169
2170        // Test 1-day bar adjustment
2171        msg.hd.rtype = 35;
2172        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2173        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2174
2175        // Test eod bar adjustment (same as 1d)
2176        msg.hd.rtype = 36;
2177        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2178        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2179
2180        // Test unsupported rtype
2181        msg.hd.rtype = 99;
2182        let result = decode_ts_event_adjustment(&msg);
2183        assert!(result.is_err());
2184    }
2185
2186    #[rstest]
2187    fn test_decode_record() {
2188        // Test with MBO message
2189        let path = test_data_path().join("test_data.mbo.dbn.zst");
2190        let decoder = Decoder::from_zstd_file(path).unwrap();
2191        let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2192        let msg = dbn_stream.next().unwrap().unwrap();
2193
2194        let record_ref = dbn::RecordRef::from(msg);
2195        let instrument_id = InstrumentId::from("ESM4.GLBX");
2196
2197        let (data1, data2) =
2198            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2199
2200        assert!(data1.is_some());
2201        assert!(data2.is_none());
2202
2203        // Test with Trade message
2204        let path = test_data_path().join("test_data.trades.dbn.zst");
2205        let decoder = Decoder::from_zstd_file(path).unwrap();
2206        let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2207        let msg = dbn_stream.next().unwrap().unwrap();
2208
2209        let record_ref = dbn::RecordRef::from(msg);
2210
2211        let (data1, data2) =
2212            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2213
2214        assert!(data1.is_some());
2215        assert!(data2.is_none());
2216        assert!(matches!(data1.unwrap(), Data::Trade(_)));
2217    }
2218}