nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24    },
25    enums::{
26        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28    },
29    identifiers::{InstrumentId, TradeId},
30    instruments::{
31        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32    },
33    types::{
34        Currency, Price, Quantity,
35        price::{PRICE_UNDEF, decode_raw_price_i64},
36    },
37};
38use ustr::Ustr;
39
40use super::{
41    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
42    types::{DatabentoImbalance, DatabentoStatistics},
43};
44
45// SAFETY: Known valid value
46const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
47
48const BAR_SPEC_1S: BarSpecification = BarSpecification {
49    step: STEP_ONE,
50    aggregation: BarAggregation::Second,
51    price_type: PriceType::Last,
52};
53const BAR_SPEC_1M: BarSpecification = BarSpecification {
54    step: STEP_ONE,
55    aggregation: BarAggregation::Minute,
56    price_type: PriceType::Last,
57};
58const BAR_SPEC_1H: BarSpecification = BarSpecification {
59    step: STEP_ONE,
60    aggregation: BarAggregation::Hour,
61    price_type: PriceType::Last,
62};
63const BAR_SPEC_1D: BarSpecification = BarSpecification {
64    step: STEP_ONE,
65    aggregation: BarAggregation::Day,
66    price_type: PriceType::Last,
67};
68
69const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
70const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
71const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
72const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
73
74#[must_use]
75pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
76    match c as u8 as char {
77        'Y' => Some(true),
78        'N' => Some(false),
79        _ => None,
80    }
81}
82
83#[must_use]
84pub const fn parse_order_side(c: c_char) -> OrderSide {
85    match c as u8 as char {
86        'A' => OrderSide::Sell,
87        'B' => OrderSide::Buy,
88        _ => OrderSide::NoOrderSide,
89    }
90}
91
92#[must_use]
93pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
94    match c as u8 as char {
95        'A' => AggressorSide::Seller,
96        'B' => AggressorSide::Buyer,
97        _ => AggressorSide::NoAggressor,
98    }
99}
100
101/// Parses a Databento book action character into a `BookAction` enum.
102///
103/// # Errors
104///
105/// Returns an error if `c` is not a valid `BookAction` character.
106pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
107    match c as u8 as char {
108        'A' => Ok(BookAction::Add),
109        'C' => Ok(BookAction::Delete),
110        'F' => Ok(BookAction::Update),
111        'M' => Ok(BookAction::Update),
112        'R' => Ok(BookAction::Clear),
113        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
114    }
115}
116
117/// Parses a Databento option kind character into an `OptionKind` enum.
118///
119/// # Errors
120///
121/// Returns an error if `c` is not a valid `OptionKind` character.
122pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
123    match c as u8 as char {
124        'C' => Ok(OptionKind::Call),
125        'P' => Ok(OptionKind::Put),
126        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
127    }
128}
129
130fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
131    match value {
132        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
133            tracing::warn!("Unknown currency code '{value}', defaulting to USD");
134            Currency::USD()
135        }),
136        Ok(_) => Currency::USD(),
137        Err(e) => {
138            tracing::error!("Error parsing currency: {e}");
139            Currency::USD()
140        }
141    }
142}
143
144/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
145///
146/// # Errors
147///
148/// Returns an error if `value` has fewer than 3 characters.
149pub fn parse_cfi_iso10926(
150    value: &str,
151) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
152    let chars: Vec<char> = value.chars().collect();
153    if chars.len() < 3 {
154        anyhow::bail!("Value string is too short");
155    }
156
157    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
158    let cfi_category = chars[0];
159    let cfi_group = chars[1];
160    let cfi_attribute1 = chars[2];
161    // let cfi_attribute2 = value[3];
162    // let cfi_attribute3 = value[4];
163    // let cfi_attribute4 = value[5];
164
165    let mut asset_class = match cfi_category {
166        'D' => Some(AssetClass::Debt),
167        'E' => Some(AssetClass::Equity),
168        'S' => None,
169        _ => None,
170    };
171
172    let instrument_class = match cfi_group {
173        'I' => Some(InstrumentClass::Future),
174        _ => None,
175    };
176
177    if cfi_attribute1 == 'I' {
178        asset_class = Some(AssetClass::Index);
179    }
180
181    Ok((asset_class, instrument_class))
182}
183
184/// Parses a Databento status reason code into a human-readable string.
185///
186/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
187///
188/// # Errors
189///
190/// Returns an error if `value` is an invalid status reason code.
191pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
192    let value_str = match value {
193        0 => return Ok(None),
194        1 => "Scheduled",
195        2 => "Surveillance intervention",
196        3 => "Market event",
197        4 => "Instrument activation",
198        5 => "Instrument expiration",
199        6 => "Recovery in process",
200        10 => "Regulatory",
201        11 => "Administrative",
202        12 => "Non-compliance",
203        13 => "Filings not current",
204        14 => "SEC trading suspension",
205        15 => "New issue",
206        16 => "Issue available",
207        17 => "Issues reviewed",
208        18 => "Filing requirements satisfied",
209        30 => "News pending",
210        31 => "News released",
211        32 => "News and resumption times",
212        33 => "News not forthcoming",
213        40 => "Order imbalance",
214        50 => "LULD pause",
215        60 => "Operational",
216        70 => "Additional information requested",
217        80 => "Merger effective",
218        90 => "ETF",
219        100 => "Corporate action",
220        110 => "New Security offering",
221        120 => "Market wide halt level 1",
222        121 => "Market wide halt level 2",
223        122 => "Market wide halt level 3",
224        123 => "Market wide halt carryover",
225        124 => "Market wide halt resumption",
226        130 => "Quotation not available",
227        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
228    };
229
230    Ok(Some(Ustr::from(value_str)))
231}
232
233/// Parses a Databento status trading event code into a human-readable string.
234///
235/// # Errors
236///
237/// Returns an error if `value` is an invalid status trading event code.
238pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
239    let value_str = match value {
240        0 => return Ok(None),
241        1 => "No cancel",
242        2 => "Change trading session",
243        3 => "Implied matching on",
244        4 => "Implied matching off",
245        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
246    };
247
248    Ok(Some(Ustr::from(value_str)))
249}
250
251/// Decodes a price from the given value, expressed in units of 1e-9.
252#[must_use]
253pub fn decode_price(value: i64, precision: u8) -> Price {
254    Price::from_raw(decode_raw_price_i64(value), precision)
255}
256
257/// Decodes a quantity from the given value, expressed in standard whole-number units.
258#[must_use]
259pub fn decode_quantity(value: u64) -> Quantity {
260    Quantity::from(value)
261}
262
263/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
264#[must_use]
265pub fn decode_price_increment(value: i64, precision: u8) -> Price {
266    match value {
267        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
268        _ => decode_price(value, precision),
269    }
270}
271
272/// Decodes a price from the given optional value, expressed in units of 1e-9.
273#[must_use]
274pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
275    match value {
276        i64::MAX => None,
277        _ => Some(decode_price(value, precision)),
278    }
279}
280
281/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
282#[must_use]
283pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
284    match value {
285        i64::MAX => None,
286        _ => Some(Quantity::from(value)),
287    }
288}
289
290/// Decodes a multiplier from the given value, expressed in units of 1e-9.
291/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
292///
293/// # Errors
294///
295/// Returns an error if value is negative (invalid multiplier).
296pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
297    match value {
298        0 | i64::MAX => Ok(Quantity::from(1)),
299        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
300        v => {
301            // Work in integers: v is fixed-point with 9 fractional digits.
302            // Build a canonical decimal string without floating-point.
303            let abs = v as u128;
304
305            const SCALE: u128 = 1_000_000_000;
306            let int_part = abs / SCALE;
307            let frac_part = abs % SCALE;
308
309            // Format fractional part with exactly 9 digits, then trim trailing zeros
310            // to keep a canonical representation.
311            if frac_part == 0 {
312                // Pure integer
313                Ok(Quantity::from(int_part as u64))
314            } else {
315                let mut frac_str = format!("{frac_part:09}");
316                while frac_str.ends_with('0') {
317                    frac_str.pop();
318                }
319                let s = format!("{int_part}.{frac_str}");
320                Ok(Quantity::from(s))
321            }
322        }
323    }
324}
325
326/// Decodes a lot size from the given value, expressed in standard whole-number units.
327#[must_use]
328pub fn decode_lot_size(value: i32) -> Quantity {
329    match value {
330        0 | i32::MAX => Quantity::from(1),
331        value => Quantity::from(value),
332    }
333}
334
335#[must_use]
336fn is_trade_msg(action: c_char) -> bool {
337    action as u8 as char == 'T'
338}
339
340/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
341///
342/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
343/// whether the message represents an order book update or a trade execution.
344///
345/// # Errors
346///
347/// Returns an error if decoding the MBO message fails.
348pub fn decode_mbo_msg(
349    msg: &dbn::MboMsg,
350    instrument_id: InstrumentId,
351    price_precision: u8,
352    ts_init: Option<UnixNanos>,
353    include_trades: bool,
354) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
355    let side = parse_order_side(msg.side);
356    if is_trade_msg(msg.action) {
357        if include_trades && msg.size > 0 {
358            let price = Price::from_raw(decode_raw_price_i64(msg.price), price_precision);
359            let size = Quantity::from(msg.size);
360            let aggressor_side = parse_aggressor_side(msg.side);
361            let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
362            let ts_event = msg.ts_recv.into();
363            let ts_init = ts_init.unwrap_or(ts_event);
364
365            let trade = TradeTick::new(
366                instrument_id,
367                price,
368                size,
369                aggressor_side,
370                trade_id,
371                ts_event,
372                ts_init,
373            );
374            return Ok((None, Some(trade)));
375        }
376
377        return Ok((None, None));
378    }
379
380    let action = parse_book_action(msg.action)?;
381    let price = if msg.price == i64::MAX {
382        Price::from_raw(PRICE_UNDEF, 0)
383    } else {
384        Price::from_raw(decode_raw_price_i64(msg.price), price_precision)
385    };
386    let size = Quantity::from(msg.size);
387    let order = BookOrder::new(side, price, size, msg.order_id);
388
389    let ts_event = msg.ts_recv.into();
390    let ts_init = ts_init.unwrap_or(ts_event);
391
392    let delta = OrderBookDelta::new(
393        instrument_id,
394        action,
395        order,
396        msg.flags.raw(),
397        msg.sequence.into(),
398        ts_event,
399        ts_init,
400    );
401
402    Ok((Some(delta), None))
403}
404
405/// Decodes a Databento Trade message into a `TradeTick`.
406///
407/// # Errors
408///
409/// Returns an error if decoding the Trade message fails.
410pub fn decode_trade_msg(
411    msg: &dbn::TradeMsg,
412    instrument_id: InstrumentId,
413    price_precision: u8,
414    ts_init: Option<UnixNanos>,
415) -> anyhow::Result<TradeTick> {
416    let ts_event = msg.ts_recv.into();
417    let ts_init = ts_init.unwrap_or(ts_event);
418
419    let trade = TradeTick::new(
420        instrument_id,
421        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
422        Quantity::from(msg.size),
423        parse_aggressor_side(msg.side),
424        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
425        ts_event,
426        ts_init,
427    );
428
429    Ok(trade)
430}
431
432/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
433///
434/// # Errors
435///
436/// Returns an error if decoding the TBBO message fails.
437pub fn decode_tbbo_msg(
438    msg: &dbn::TbboMsg,
439    instrument_id: InstrumentId,
440    price_precision: u8,
441    ts_init: Option<UnixNanos>,
442) -> anyhow::Result<(QuoteTick, TradeTick)> {
443    let top_level = &msg.levels[0];
444    let ts_event = msg.ts_recv.into();
445    let ts_init = ts_init.unwrap_or(ts_event);
446
447    let quote = QuoteTick::new(
448        instrument_id,
449        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
450        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
451        Quantity::from(top_level.bid_sz),
452        Quantity::from(top_level.ask_sz),
453        ts_event,
454        ts_init,
455    );
456
457    let trade = TradeTick::new(
458        instrument_id,
459        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
460        Quantity::from(msg.size),
461        parse_aggressor_side(msg.side),
462        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
463        ts_event,
464        ts_init,
465    );
466
467    Ok((quote, trade))
468}
469
470/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
471///
472/// # Errors
473///
474/// Returns an error if decoding the MBP1 message fails.
475pub fn decode_mbp1_msg(
476    msg: &dbn::Mbp1Msg,
477    instrument_id: InstrumentId,
478    price_precision: u8,
479    ts_init: Option<UnixNanos>,
480    include_trades: bool,
481) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
482    let top_level = &msg.levels[0];
483    let ts_event = msg.ts_recv.into();
484    let ts_init = ts_init.unwrap_or(ts_event);
485
486    let quote = QuoteTick::new(
487        instrument_id,
488        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
489        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
490        Quantity::from(top_level.bid_sz),
491        Quantity::from(top_level.ask_sz),
492        ts_event,
493        ts_init,
494    );
495
496    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
497        Some(TradeTick::new(
498            instrument_id,
499            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
500            Quantity::from(msg.size),
501            parse_aggressor_side(msg.side),
502            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
503            ts_event,
504            ts_init,
505        ))
506    } else {
507        None
508    };
509
510    Ok((quote, maybe_trade))
511}
512
513/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
514///
515/// # Errors
516///
517/// Returns an error if decoding the BBO message fails.
518pub fn decode_bbo_msg(
519    msg: &dbn::BboMsg,
520    instrument_id: InstrumentId,
521    price_precision: u8,
522    ts_init: Option<UnixNanos>,
523) -> anyhow::Result<QuoteTick> {
524    let top_level = &msg.levels[0];
525    let ts_event = msg.ts_recv.into();
526    let ts_init = ts_init.unwrap_or(ts_event);
527
528    let quote = QuoteTick::new(
529        instrument_id,
530        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
531        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
532        Quantity::from(top_level.bid_sz),
533        Quantity::from(top_level.ask_sz),
534        ts_event,
535        ts_init,
536    );
537
538    Ok(quote)
539}
540
541/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
542///
543/// # Errors
544///
545/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
546pub fn decode_mbp10_msg(
547    msg: &dbn::Mbp10Msg,
548    instrument_id: InstrumentId,
549    price_precision: u8,
550    ts_init: Option<UnixNanos>,
551) -> anyhow::Result<OrderBookDepth10> {
552    let mut bids = Vec::with_capacity(DEPTH10_LEN);
553    let mut asks = Vec::with_capacity(DEPTH10_LEN);
554    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
555    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
556
557    for level in &msg.levels {
558        let bid_order = BookOrder::new(
559            OrderSide::Buy,
560            Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
561            Quantity::from(level.bid_sz),
562            0,
563        );
564
565        let ask_order = BookOrder::new(
566            OrderSide::Sell,
567            Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
568            Quantity::from(level.ask_sz),
569            0,
570        );
571
572        bids.push(bid_order);
573        asks.push(ask_order);
574        bid_counts.push(level.bid_ct);
575        ask_counts.push(level.ask_ct);
576    }
577
578    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
579        anyhow::anyhow!(
580            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
581            v.len()
582        )
583    })?;
584
585    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
586        anyhow::anyhow!(
587            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
588            v.len()
589        )
590    })?;
591
592    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
593        anyhow::anyhow!(
594            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
595            v.len()
596        )
597    })?;
598
599    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
600        anyhow::anyhow!(
601            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
602            v.len()
603        )
604    })?;
605
606    let ts_event = msg.ts_recv.into();
607    let ts_init = ts_init.unwrap_or(ts_event);
608
609    let depth = OrderBookDepth10::new(
610        instrument_id,
611        bids,
612        asks,
613        bid_counts,
614        ask_counts,
615        msg.flags.raw(),
616        msg.sequence.into(),
617        ts_event,
618        ts_init,
619    );
620
621    Ok(depth)
622}
623
624/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
625///
626/// Returns a tuple containing a `QuoteTick` and an optional `TradeTick` based on the message content.
627///
628/// # Errors
629///
630/// Returns an error if decoding the CMBP1 message fails.
631pub fn decode_cmbp1_msg(
632    msg: &dbn::Cmbp1Msg,
633    instrument_id: InstrumentId,
634    price_precision: u8,
635    ts_init: Option<UnixNanos>,
636    include_trades: bool,
637) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
638    let top_level = &msg.levels[0];
639    let ts_event = msg.ts_recv.into();
640    let ts_init = ts_init.unwrap_or(ts_event);
641
642    let quote = QuoteTick::new(
643        instrument_id,
644        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
645        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
646        Quantity::from(top_level.bid_sz),
647        Quantity::from(top_level.ask_sz),
648        ts_event,
649        ts_init,
650    );
651
652    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
653        // Use UUID4 for trade ID as CMBP1 doesn't have a sequence field
654        Some(TradeTick::new(
655            instrument_id,
656            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
657            Quantity::from(msg.size),
658            parse_aggressor_side(msg.side),
659            TradeId::new(UUID4::new().to_string()),
660            ts_event,
661            ts_init,
662        ))
663    } else {
664        None
665    };
666
667    Ok((quote, maybe_trade))
668}
669
670/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
671///
672/// Returns a `QuoteTick` representing the consolidated best bid and offer.
673///
674/// # Errors
675///
676/// Returns an error if decoding the CBBO message fails.
677pub fn decode_cbbo_msg(
678    msg: &dbn::CbboMsg,
679    instrument_id: InstrumentId,
680    price_precision: u8,
681    ts_init: Option<UnixNanos>,
682) -> anyhow::Result<QuoteTick> {
683    let top_level = &msg.levels[0];
684    let ts_event = msg.ts_recv.into();
685    let ts_init = ts_init.unwrap_or(ts_event);
686
687    let quote = QuoteTick::new(
688        instrument_id,
689        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
690        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
691        Quantity::from(top_level.bid_sz),
692        Quantity::from(top_level.ask_sz),
693        ts_event,
694        ts_init,
695    );
696
697    Ok(quote)
698}
699
700/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
701///
702/// Returns a tuple containing both a `QuoteTick` and a `TradeTick`.
703///
704/// # Errors
705///
706/// Returns an error if decoding the TCBBO message fails.
707pub fn decode_tcbbo_msg(
708    msg: &dbn::CbboMsg,
709    instrument_id: InstrumentId,
710    price_precision: u8,
711    ts_init: Option<UnixNanos>,
712) -> anyhow::Result<(QuoteTick, TradeTick)> {
713    let top_level = &msg.levels[0];
714    let ts_event = msg.ts_recv.into();
715    let ts_init = ts_init.unwrap_or(ts_event);
716
717    let quote = QuoteTick::new(
718        instrument_id,
719        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
720        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
721        Quantity::from(top_level.bid_sz),
722        Quantity::from(top_level.ask_sz),
723        ts_event,
724        ts_init,
725    );
726
727    // Use UUID4 for trade ID as TCBBO doesn't have a sequence field
728    let trade = TradeTick::new(
729        instrument_id,
730        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
731        Quantity::from(msg.size),
732        parse_aggressor_side(msg.side),
733        TradeId::new(UUID4::new().to_string()),
734        ts_event,
735        ts_init,
736    );
737
738    Ok((quote, trade))
739}
740
741/// # Errors
742///
743/// Returns an error if `rtype` is not a supported bar aggregation.
744pub fn decode_bar_type(
745    msg: &dbn::OhlcvMsg,
746    instrument_id: InstrumentId,
747) -> anyhow::Result<BarType> {
748    let bar_type = match msg.hd.rtype {
749        32 => {
750            // ohlcv-1s
751            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
752        }
753        33 => {
754            // ohlcv-1m
755            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
756        }
757        34 => {
758            // ohlcv-1h
759            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
760        }
761        35 => {
762            // ohlcv-1d
763            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
764        }
765        36 => {
766            // ohlcv-eod
767            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
768        }
769        _ => anyhow::bail!(
770            "`rtype` is not a supported bar aggregation, was {}",
771            msg.hd.rtype
772        ),
773    };
774
775    Ok(bar_type)
776}
777
778/// # Errors
779///
780/// Returns an error if `rtype` is not a supported bar aggregation.
781pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
782    let adjustment = match msg.hd.rtype {
783        32 => {
784            // ohlcv-1s
785            BAR_CLOSE_ADJUSTMENT_1S
786        }
787        33 => {
788            // ohlcv-1m
789            BAR_CLOSE_ADJUSTMENT_1M
790        }
791        34 => {
792            // ohlcv-1h
793            BAR_CLOSE_ADJUSTMENT_1H
794        }
795        35 | 36 => {
796            // ohlcv-1d and ohlcv-eod
797            BAR_CLOSE_ADJUSTMENT_1D
798        }
799        _ => anyhow::bail!(
800            "`rtype` is not a supported bar aggregation, was {}",
801            msg.hd.rtype
802        ),
803    };
804
805    Ok(adjustment.into())
806}
807
808/// # Errors
809///
810/// Returns an error if decoding the OHLCV message fails.
811pub fn decode_ohlcv_msg(
812    msg: &dbn::OhlcvMsg,
813    instrument_id: InstrumentId,
814    price_precision: u8,
815    ts_init: Option<UnixNanos>,
816    timestamp_on_close: bool,
817) -> anyhow::Result<Bar> {
818    let bar_type = decode_bar_type(msg, instrument_id)?;
819    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
820
821    let ts_event_raw = msg.hd.ts_event.into();
822    let ts_close = ts_event_raw + ts_event_adjustment;
823    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
824
825    let ts_event = if timestamp_on_close {
826        ts_close
827    } else {
828        ts_event_raw
829    };
830
831    let bar = Bar::new(
832        bar_type,
833        Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
834        Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
835        Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
836        Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
837        Quantity::from(msg.volume),
838        ts_event,
839        ts_init,
840    );
841
842    Ok(bar)
843}
844
845/// Decodes a Databento status message into an `InstrumentStatus` event.
846///
847/// # Errors
848///
849/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
850pub fn decode_status_msg(
851    msg: &dbn::StatusMsg,
852    instrument_id: InstrumentId,
853    ts_init: Option<UnixNanos>,
854) -> anyhow::Result<InstrumentStatus> {
855    let ts_event = msg.hd.ts_event.into();
856    let ts_init = ts_init.unwrap_or(ts_event);
857
858    let action = MarketStatusAction::from_u16(msg.action)
859        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
860
861    let status = InstrumentStatus::new(
862        instrument_id,
863        action,
864        ts_event,
865        ts_init,
866        parse_status_reason(msg.reason)?,
867        parse_status_trading_event(msg.trading_event)?,
868        parse_optional_bool(msg.is_trading),
869        parse_optional_bool(msg.is_quoting),
870        parse_optional_bool(msg.is_short_sell_restricted),
871    );
872
873    Ok(status)
874}
875
876/// # Errors
877///
878/// Returns an error if decoding the record type fails or encounters unsupported message.
879pub fn decode_record(
880    record: &dbn::RecordRef,
881    instrument_id: InstrumentId,
882    price_precision: u8,
883    ts_init: Option<UnixNanos>,
884    include_trades: bool,
885    bars_timestamp_on_close: bool,
886) -> anyhow::Result<(Option<Data>, Option<Data>)> {
887    // Note: TBBO and TCBBO messages provide both quotes and trades.
888    // TBBO is handled explicitly below, while TCBBO is handled by
889    // the CbboMsg branch based on whether it has trade data.
890    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
891        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
892        let result = decode_mbo_msg(
893            msg,
894            instrument_id,
895            price_precision,
896            Some(ts_init),
897            include_trades,
898        )?;
899        match result {
900            (Some(delta), None) => (Some(Data::Delta(delta)), None),
901            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
902            (None, None) => (None, None),
903            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
904        }
905    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
906        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
907        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
908        (Some(Data::Trade(trade)), None)
909    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
910        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
911        let result = decode_mbp1_msg(
912            msg,
913            instrument_id,
914            price_precision,
915            Some(ts_init),
916            include_trades,
917        )?;
918        match result {
919            (quote, None) => (Some(Data::Quote(quote)), None),
920            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
921        }
922    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
923        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
924        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
925        (Some(Data::Quote(quote)), None)
926    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
927        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
928        let quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
929        (Some(Data::Quote(quote)), None)
930    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
931        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
932        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
933        (Some(Data::from(depth)), None)
934    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
935        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
936        // it will be set correctly in decode_ohlcv_msg instead
937        let bar = decode_ohlcv_msg(
938            msg,
939            instrument_id,
940            price_precision,
941            ts_init,
942            bars_timestamp_on_close,
943        )?;
944        (Some(Data::Bar(bar)), None)
945    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
946        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
947        let result = decode_cmbp1_msg(
948            msg,
949            instrument_id,
950            price_precision,
951            Some(ts_init),
952            include_trades,
953        )?;
954        match result {
955            (quote, None) => (Some(Data::Quote(quote)), None),
956            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
957        }
958    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
959        // TBBO always has both quote and trade
960        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
961        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
962        (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
963    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
964        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
965        if msg.price != i64::MAX && msg.size > 0 {
966            // TCBBO - has both quote and trade
967            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
968            let (quote, trade) =
969                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
970            (Some(Data::Quote(quote)), Some(Data::Trade(trade)))
971        } else {
972            // Regular CBBO - quote only
973            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
974            let quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
975            (Some(Data::Quote(quote)), None)
976        }
977    } else {
978        anyhow::bail!("DBN message type is not currently supported")
979    };
980
981    Ok(result)
982}
983
984const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
985    match ts_init {
986        Some(ts_init) => ts_init,
987        None => msg_timestamp,
988    }
989}
990
991/// # Errors
992///
993/// Returns an error if decoding the `InstrumentDefMsg` fails.
994pub fn decode_instrument_def_msg(
995    msg: &dbn::InstrumentDefMsg,
996    instrument_id: InstrumentId,
997    ts_init: Option<UnixNanos>,
998) -> anyhow::Result<InstrumentAny> {
999    match msg.instrument_class as u8 as char {
1000        'K' => Ok(InstrumentAny::Equity(decode_equity(
1001            msg,
1002            instrument_id,
1003            ts_init,
1004        )?)),
1005        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1006            msg,
1007            instrument_id,
1008            ts_init,
1009        )?)),
1010        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1011            msg,
1012            instrument_id,
1013            ts_init,
1014        )?)),
1015        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1016            msg,
1017            instrument_id,
1018            ts_init,
1019        )?)),
1020        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1021            msg,
1022            instrument_id,
1023            ts_init,
1024        )?)),
1025        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1026        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1027        _ => anyhow::bail!(
1028            "Unsupported `instrument_class` '{}'",
1029            msg.instrument_class as u8 as char
1030        ),
1031    }
1032}
1033
1034/// Decodes a Databento instrument definition message into an `Equity` instrument.
1035///
1036/// # Errors
1037///
1038/// Returns an error if parsing or constructing `Equity` fails.
1039pub fn decode_equity(
1040    msg: &dbn::InstrumentDefMsg,
1041    instrument_id: InstrumentId,
1042    ts_init: Option<UnixNanos>,
1043) -> anyhow::Result<Equity> {
1044    let currency = parse_currency_or_usd_default(msg.currency());
1045    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1046    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1047    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1048    let ts_init = ts_init.unwrap_or(ts_event);
1049
1050    Ok(Equity::new(
1051        instrument_id,
1052        instrument_id.symbol,
1053        None, // No ISIN available yet
1054        currency,
1055        price_increment.precision,
1056        price_increment,
1057        Some(lot_size),
1058        None, // TBD
1059        None, // TBD
1060        None, // TBD
1061        None, // TBD
1062        None, // TBD
1063        None, // TBD
1064        None, // TBD
1065        None, // TBD
1066        ts_event,
1067        ts_init,
1068    ))
1069}
1070
1071/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1072///
1073/// # Errors
1074///
1075/// Returns an error if parsing or constructing `FuturesContract` fails.
1076pub fn decode_futures_contract(
1077    msg: &dbn::InstrumentDefMsg,
1078    instrument_id: InstrumentId,
1079    ts_init: Option<UnixNanos>,
1080) -> anyhow::Result<FuturesContract> {
1081    let currency = parse_currency_or_usd_default(msg.currency());
1082    let exchange = Ustr::from(msg.exchange()?);
1083    let underlying = Ustr::from(msg.asset()?);
1084    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1085    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1086    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1087    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1088    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1089    let ts_init = ts_init.unwrap_or(ts_event);
1090
1091    FuturesContract::new_checked(
1092        instrument_id,
1093        instrument_id.symbol,
1094        asset_class.unwrap_or(AssetClass::Commodity),
1095        Some(exchange),
1096        underlying,
1097        msg.activation.into(),
1098        msg.expiration.into(),
1099        currency,
1100        price_increment.precision,
1101        price_increment,
1102        multiplier,
1103        lot_size,
1104        None, // TBD
1105        None, // TBD
1106        None, // TBD
1107        None, // TBD
1108        None, // TBD
1109        None, // TBD
1110        None, // TBD
1111        None, // TBD
1112        ts_event,
1113        ts_init,
1114    )
1115}
1116
1117/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1118///
1119/// # Errors
1120///
1121/// Returns an error if parsing or constructing `FuturesSpread` fails.
1122pub fn decode_futures_spread(
1123    msg: &dbn::InstrumentDefMsg,
1124    instrument_id: InstrumentId,
1125    ts_init: Option<UnixNanos>,
1126) -> anyhow::Result<FuturesSpread> {
1127    let exchange = Ustr::from(msg.exchange()?);
1128    let underlying = Ustr::from(msg.asset()?);
1129    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1130    let strategy_type = Ustr::from(msg.secsubtype()?);
1131    let currency = parse_currency_or_usd_default(msg.currency());
1132    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1133    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1134    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1135    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1136    let ts_init = ts_init.unwrap_or(ts_event);
1137
1138    FuturesSpread::new_checked(
1139        instrument_id,
1140        instrument_id.symbol,
1141        asset_class.unwrap_or(AssetClass::Commodity),
1142        Some(exchange),
1143        underlying,
1144        strategy_type,
1145        msg.activation.into(),
1146        msg.expiration.into(),
1147        currency,
1148        price_increment.precision,
1149        price_increment,
1150        multiplier,
1151        lot_size,
1152        None, // TBD
1153        None, // TBD
1154        None, // TBD
1155        None, // TBD
1156        None, // TBD
1157        None, // TBD
1158        None, // TBD
1159        None, // TBD
1160        ts_event,
1161        ts_init,
1162    )
1163}
1164
1165/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1166///
1167/// # Errors
1168///
1169/// Returns an error if parsing or constructing `OptionContract` fails.
1170pub fn decode_option_contract(
1171    msg: &dbn::InstrumentDefMsg,
1172    instrument_id: InstrumentId,
1173    ts_init: Option<UnixNanos>,
1174) -> anyhow::Result<OptionContract> {
1175    let currency = parse_currency_or_usd_default(msg.currency());
1176    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1177    let exchange = Ustr::from(msg.exchange()?);
1178    let underlying = Ustr::from(msg.underlying()?);
1179    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1180        Some(AssetClass::Equity)
1181    } else {
1182        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1183        asset_class
1184    };
1185    let option_kind = parse_option_kind(msg.instrument_class)?;
1186    let strike_price = Price::from_raw(
1187        decode_raw_price_i64(msg.strike_price),
1188        strike_price_currency.precision,
1189    );
1190    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1191    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1192    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1193    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1194    let ts_init = ts_init.unwrap_or(ts_event);
1195
1196    OptionContract::new_checked(
1197        instrument_id,
1198        instrument_id.symbol,
1199        asset_class_opt.unwrap_or(AssetClass::Commodity),
1200        Some(exchange),
1201        underlying,
1202        option_kind,
1203        strike_price,
1204        currency,
1205        msg.activation.into(),
1206        msg.expiration.into(),
1207        price_increment.precision,
1208        price_increment,
1209        multiplier,
1210        lot_size,
1211        None, // TBD
1212        None, // TBD
1213        None, // TBD
1214        None, // TBD
1215        None, // TBD
1216        None, // TBD
1217        None, // TBD
1218        None, // TBD
1219        ts_event,
1220        ts_init,
1221    )
1222}
1223
1224/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1225///
1226/// # Errors
1227///
1228/// Returns an error if parsing or constructing `OptionSpread` fails.
1229pub fn decode_option_spread(
1230    msg: &dbn::InstrumentDefMsg,
1231    instrument_id: InstrumentId,
1232    ts_init: Option<UnixNanos>,
1233) -> anyhow::Result<OptionSpread> {
1234    let exchange = Ustr::from(msg.exchange()?);
1235    let underlying = Ustr::from(msg.underlying()?);
1236    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1237        Some(AssetClass::Equity)
1238    } else {
1239        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1240        asset_class
1241    };
1242    let strategy_type = Ustr::from(msg.secsubtype()?);
1243    let currency = parse_currency_or_usd_default(msg.currency());
1244    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1245    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1246    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1247    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1248    let ts_init = ts_init.unwrap_or(ts_event);
1249
1250    OptionSpread::new_checked(
1251        instrument_id,
1252        instrument_id.symbol,
1253        asset_class_opt.unwrap_or(AssetClass::Commodity),
1254        Some(exchange),
1255        underlying,
1256        strategy_type,
1257        msg.activation.into(),
1258        msg.expiration.into(),
1259        currency,
1260        price_increment.precision,
1261        price_increment,
1262        multiplier,
1263        lot_size,
1264        None, // TBD
1265        None, // TBD
1266        None, // TBD
1267        None, // TBD
1268        None, // TBD
1269        None, // TBD
1270        None, // TBD
1271        None, // TBD
1272        ts_event,
1273        ts_init,
1274    )
1275}
1276
1277/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1278///
1279/// # Errors
1280///
1281/// Returns an error if constructing `DatabentoImbalance` fails.
1282pub fn decode_imbalance_msg(
1283    msg: &dbn::ImbalanceMsg,
1284    instrument_id: InstrumentId,
1285    price_precision: u8,
1286    ts_init: Option<UnixNanos>,
1287) -> anyhow::Result<DatabentoImbalance> {
1288    let ts_event = msg.ts_recv.into();
1289    let ts_init = ts_init.unwrap_or(ts_event);
1290
1291    Ok(DatabentoImbalance::new(
1292        instrument_id,
1293        Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1294        Price::from_raw(
1295            decode_raw_price_i64(msg.cont_book_clr_price),
1296            price_precision,
1297        ),
1298        Price::from_raw(
1299            decode_raw_price_i64(msg.auct_interest_clr_price),
1300            price_precision,
1301        ),
1302        Quantity::new(f64::from(msg.paired_qty), 0),
1303        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1304        parse_order_side(msg.side),
1305        msg.significant_imbalance as c_char,
1306        msg.hd.ts_event.into(),
1307        ts_event,
1308        ts_init,
1309    ))
1310}
1311
1312/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1313///
1314/// # Errors
1315///
1316/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1317/// `msg.update_action` is not a valid enum variant.
1318pub fn decode_statistics_msg(
1319    msg: &dbn::StatMsg,
1320    instrument_id: InstrumentId,
1321    price_precision: u8,
1322    ts_init: Option<UnixNanos>,
1323) -> anyhow::Result<DatabentoStatistics> {
1324    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1325        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1326    let update_action =
1327        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1328            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1329        })?;
1330    let ts_event = msg.ts_recv.into();
1331    let ts_init = ts_init.unwrap_or(ts_event);
1332
1333    Ok(DatabentoStatistics::new(
1334        instrument_id,
1335        stat_type,
1336        update_action,
1337        decode_optional_price(msg.price, price_precision),
1338        decode_optional_quantity(msg.quantity),
1339        msg.channel_id,
1340        msg.stat_flags,
1341        msg.sequence,
1342        msg.ts_ref.into(),
1343        msg.ts_in_delta,
1344        msg.hd.ts_event.into(),
1345        ts_event,
1346        ts_init,
1347    ))
1348}
1349
1350////////////////////////////////////////////////////////////////////////////////
1351// Tests
1352////////////////////////////////////////////////////////////////////////////////
1353#[cfg(test)]
1354mod tests {
1355    use std::path::{Path, PathBuf};
1356
1357    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1358    use fallible_streaming_iterator::FallibleStreamingIterator;
1359    use nautilus_model::instruments::Instrument;
1360    use rstest::*;
1361
1362    use super::*;
1363
1364    fn test_data_path() -> PathBuf {
1365        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1366    }
1367
1368    #[rstest]
1369    #[case('Y' as c_char, Some(true))]
1370    #[case('N' as c_char, Some(false))]
1371    #[case('X' as c_char, None)]
1372    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1373        assert_eq!(parse_optional_bool(input), expected);
1374    }
1375
1376    #[rstest]
1377    #[case('A' as c_char, OrderSide::Sell)]
1378    #[case('B' as c_char, OrderSide::Buy)]
1379    #[case('X' as c_char, OrderSide::NoOrderSide)]
1380    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1381        assert_eq!(parse_order_side(input), expected);
1382    }
1383
1384    #[rstest]
1385    #[case('A' as c_char, AggressorSide::Seller)]
1386    #[case('B' as c_char, AggressorSide::Buyer)]
1387    #[case('X' as c_char, AggressorSide::NoAggressor)]
1388    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1389        assert_eq!(parse_aggressor_side(input), expected);
1390    }
1391
1392    #[rstest]
1393    #[case('T' as c_char, true)]
1394    #[case('A' as c_char, false)]
1395    #[case('C' as c_char, false)]
1396    #[case('F' as c_char, false)]
1397    #[case('M' as c_char, false)]
1398    #[case('R' as c_char, false)]
1399    fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1400        assert_eq!(is_trade_msg(action), expected);
1401    }
1402
1403    #[rstest]
1404    #[case('A' as c_char, Ok(BookAction::Add))]
1405    #[case('C' as c_char, Ok(BookAction::Delete))]
1406    #[case('F' as c_char, Ok(BookAction::Update))]
1407    #[case('M' as c_char, Ok(BookAction::Update))]
1408    #[case('R' as c_char, Ok(BookAction::Clear))]
1409    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1410    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1411        match parse_book_action(input) {
1412            Ok(action) => assert_eq!(Ok(action), expected),
1413            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1414        }
1415    }
1416
1417    #[rstest]
1418    #[case('C' as c_char, Ok(OptionKind::Call))]
1419    #[case('P' as c_char, Ok(OptionKind::Put))]
1420    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1421    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1422        match parse_option_kind(input) {
1423            Ok(kind) => assert_eq!(Ok(kind), expected),
1424            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1425        }
1426    }
1427
1428    #[rstest]
1429    #[case(Ok("USD"), Currency::USD())]
1430    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1431    #[case(Ok(""), Currency::USD())]
1432    #[case(Err("Error"), Currency::USD())]
1433    fn test_parse_currency_or_usd_default(
1434        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1435        #[case] expected: Currency,
1436    ) {
1437        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1438        assert_eq!(actual, expected);
1439    }
1440
1441    #[rstest]
1442    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1443    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1444    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1445    #[case("XXX", Ok((None, None)))]
1446    #[case("D", Err("Value string is too short"))]
1447    fn test_parse_cfi_iso10926(
1448        #[case] input: &str,
1449        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1450    ) {
1451        match parse_cfi_iso10926(input) {
1452            Ok(result) => assert_eq!(Ok(result), expected),
1453            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1454        }
1455    }
1456
1457    #[rstest]
1458    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1459    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1460    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1461    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1462        let actual = decode_price_increment(value, precision);
1463        assert_eq!(actual, expected);
1464    }
1465
1466    #[rstest]
1467    #[case(i64::MAX, 2, None)] // None for i64::MAX
1468    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1469    #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] // Arbitrary valid price
1470    fn test_decode_optional_price(
1471        #[case] value: i64,
1472        #[case] precision: u8,
1473        #[case] expected: Option<Price>,
1474    ) {
1475        let actual = decode_optional_price(value, precision);
1476        assert_eq!(actual, expected);
1477    }
1478
1479    #[rstest]
1480    #[case(i64::MAX, None)] // None for i32::MAX
1481    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1482    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1483    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1484        let actual = decode_optional_quantity(value);
1485        assert_eq!(actual, expected);
1486    }
1487
1488    #[rstest]
1489    #[case(0, Quantity::from(1))] // Default fallback for 0
1490    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1491    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1492    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1493    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1494    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1495    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1496    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1497    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1498    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1499        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1500    }
1501
1502    #[rstest]
1503    #[case(-1_500_000_000)] // Large negative value
1504    #[case(-1)] // Small negative value
1505    #[case(-999_999_999)] // Another negative value
1506    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1507        let result = decode_multiplier(raw);
1508        assert!(result.is_err());
1509        assert!(
1510            result
1511                .unwrap_err()
1512                .to_string()
1513                .contains("Invalid negative multiplier")
1514        );
1515    }
1516
1517    #[rstest]
1518    #[case(100, Quantity::from(100))]
1519    #[case(1000, Quantity::from(1000))]
1520    #[case(5, Quantity::from(5))]
1521    fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1522        assert_eq!(decode_quantity(value), expected);
1523    }
1524
1525    #[rstest]
1526    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1527    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1528    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1529    fn test_decode_price_increment(
1530        #[case] value: i64,
1531        #[case] precision: u8,
1532        #[case] expected: Price,
1533    ) {
1534        assert_eq!(decode_price_increment(value, precision), expected);
1535    }
1536
1537    #[rstest]
1538    #[case(0, Quantity::from(1))] // Default for 0
1539    #[case(i32::MAX, Quantity::from(1))] // Default for MAX
1540    #[case(100, Quantity::from(100))]
1541    #[case(1, Quantity::from(1))]
1542    #[case(1000, Quantity::from(1000))]
1543    fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1544        assert_eq!(decode_lot_size(value), expected);
1545    }
1546
1547    #[rstest]
1548    #[case(0, None)] // None for 0
1549    #[case(1, Some(Ustr::from("Scheduled")))]
1550    #[case(2, Some(Ustr::from("Surveillance intervention")))]
1551    #[case(3, Some(Ustr::from("Market event")))]
1552    #[case(10, Some(Ustr::from("Regulatory")))]
1553    #[case(30, Some(Ustr::from("News pending")))]
1554    #[case(40, Some(Ustr::from("Order imbalance")))]
1555    #[case(50, Some(Ustr::from("LULD pause")))]
1556    #[case(60, Some(Ustr::from("Operational")))]
1557    #[case(100, Some(Ustr::from("Corporate action")))]
1558    #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1559    fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1560        assert_eq!(parse_status_reason(value).unwrap(), expected);
1561    }
1562
1563    #[rstest]
1564    #[case(999)] // Invalid code
1565    fn test_parse_status_reason_invalid(#[case] value: u16) {
1566        assert!(parse_status_reason(value).is_err());
1567    }
1568
1569    #[rstest]
1570    #[case(0, None)] // None for 0
1571    #[case(1, Some(Ustr::from("No cancel")))]
1572    #[case(2, Some(Ustr::from("Change trading session")))]
1573    #[case(3, Some(Ustr::from("Implied matching on")))]
1574    #[case(4, Some(Ustr::from("Implied matching off")))]
1575    fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1576        assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1577    }
1578
1579    #[rstest]
1580    #[case(5)] // Invalid code
1581    #[case(100)] // Invalid code
1582    fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1583        assert!(parse_status_trading_event(value).is_err());
1584    }
1585
1586    #[rstest]
1587    fn test_decode_mbo_msg() {
1588        let path = test_data_path().join("test_data.mbo.dbn.zst");
1589        let mut dbn_stream = Decoder::from_zstd_file(path)
1590            .unwrap()
1591            .decode_stream::<dbn::MboMsg>();
1592        let msg = dbn_stream.next().unwrap().unwrap();
1593
1594        let instrument_id = InstrumentId::from("ESM4.GLBX");
1595        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1596        let delta = delta.unwrap();
1597
1598        assert_eq!(delta.instrument_id, instrument_id);
1599        assert_eq!(delta.action, BookAction::Delete);
1600        assert_eq!(delta.order.side, OrderSide::Sell);
1601        assert_eq!(delta.order.price, Price::from("3722.75"));
1602        assert_eq!(delta.order.size, Quantity::from("1"));
1603        assert_eq!(delta.order.order_id, 647_784_973_705);
1604        assert_eq!(delta.flags, 128);
1605        assert_eq!(delta.sequence, 1_170_352);
1606        assert_eq!(delta.ts_event, msg.ts_recv);
1607        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1608        assert_eq!(delta.ts_init, 0);
1609    }
1610
1611    #[rstest]
1612    fn test_decode_mbo_msg_clear_action() {
1613        // Create an MBO message with Clear action (action='R', side='N')
1614        let ts_recv = 1_609_160_400_000_000_000;
1615        let msg = dbn::MboMsg {
1616            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1617            order_id: 0,
1618            price: i64::MAX,
1619            size: 0,
1620            flags: dbn::FlagSet::empty(),
1621            channel_id: 0,
1622            action: 'R' as c_char,
1623            side: 'N' as c_char, // NoOrderSide for Clear
1624            ts_recv,
1625            ts_in_delta: 0,
1626            sequence: 1_000_000,
1627        };
1628
1629        let instrument_id = InstrumentId::from("ESM4.GLBX");
1630        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1631
1632        // Clear messages should produce OrderBookDelta, not TradeTick
1633        assert!(trade.is_none());
1634        let delta = delta.expect("Clear action should produce OrderBookDelta");
1635
1636        assert_eq!(delta.instrument_id, instrument_id);
1637        assert_eq!(delta.action, BookAction::Clear);
1638        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1639        assert_eq!(delta.order.size, Quantity::from("0"));
1640        assert_eq!(delta.order.order_id, 0);
1641        assert_eq!(delta.sequence, 1_000_000);
1642        assert_eq!(delta.ts_event, ts_recv);
1643        assert_eq!(delta.ts_init, 0);
1644    }
1645
1646    #[rstest]
1647    fn test_decode_mbo_msg_no_order_side_update() {
1648        // MBO messages with NoOrderSide are now passed through to the book
1649        // The book will resolve the side from its cache using the order_id
1650        let ts_recv = 1_609_160_400_000_000_000;
1651        let msg = dbn::MboMsg {
1652            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1653            order_id: 123_456_789,
1654            price: 4_800_250_000_000, // $4800.25 with precision 2
1655            size: 1,
1656            flags: dbn::FlagSet::empty(),
1657            channel_id: 1,
1658            action: 'M' as c_char, // Modify/Update action
1659            side: 'N' as c_char,   // NoOrderSide
1660            ts_recv,
1661            ts_in_delta: 0,
1662            sequence: 1_000_000,
1663        };
1664
1665        let instrument_id = InstrumentId::from("ESM4.GLBX");
1666        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1667
1668        // Delta should be created with NoOrderSide (book will resolve it)
1669        assert!(delta.is_some());
1670        assert!(trade.is_none());
1671        let delta = delta.unwrap();
1672        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1673        assert_eq!(delta.order.order_id, 123_456_789);
1674        assert_eq!(delta.action, BookAction::Update);
1675    }
1676
1677    #[rstest]
1678    fn test_decode_mbp1_msg() {
1679        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1680        let mut dbn_stream = Decoder::from_zstd_file(path)
1681            .unwrap()
1682            .decode_stream::<dbn::Mbp1Msg>();
1683        let msg = dbn_stream.next().unwrap().unwrap();
1684
1685        let instrument_id = InstrumentId::from("ESM4.GLBX");
1686        let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1687
1688        assert_eq!(quote.instrument_id, instrument_id);
1689        assert_eq!(quote.bid_price, Price::from("3720.25"));
1690        assert_eq!(quote.ask_price, Price::from("3720.50"));
1691        assert_eq!(quote.bid_size, Quantity::from("24"));
1692        assert_eq!(quote.ask_size, Quantity::from("11"));
1693        assert_eq!(quote.ts_event, msg.ts_recv);
1694        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1695        assert_eq!(quote.ts_init, 0);
1696    }
1697
1698    #[rstest]
1699    fn test_decode_bbo_1s_msg() {
1700        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1701        let mut dbn_stream = Decoder::from_zstd_file(path)
1702            .unwrap()
1703            .decode_stream::<dbn::BboMsg>();
1704        let msg = dbn_stream.next().unwrap().unwrap();
1705
1706        let instrument_id = InstrumentId::from("ESM4.GLBX");
1707        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1708
1709        assert_eq!(quote.instrument_id, instrument_id);
1710        assert_eq!(quote.bid_price, Price::from("3702.25"));
1711        assert_eq!(quote.ask_price, Price::from("3702.75"));
1712        assert_eq!(quote.bid_size, Quantity::from("18"));
1713        assert_eq!(quote.ask_size, Quantity::from("13"));
1714        assert_eq!(quote.ts_event, msg.ts_recv);
1715        assert_eq!(quote.ts_event, 1609113600000000000);
1716        assert_eq!(quote.ts_init, 0);
1717    }
1718
1719    #[rstest]
1720    fn test_decode_bbo_1m_msg() {
1721        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1722        let mut dbn_stream = Decoder::from_zstd_file(path)
1723            .unwrap()
1724            .decode_stream::<dbn::BboMsg>();
1725        let msg = dbn_stream.next().unwrap().unwrap();
1726
1727        let instrument_id = InstrumentId::from("ESM4.GLBX");
1728        let quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1729
1730        assert_eq!(quote.instrument_id, instrument_id);
1731        assert_eq!(quote.bid_price, Price::from("3702.25"));
1732        assert_eq!(quote.ask_price, Price::from("3702.75"));
1733        assert_eq!(quote.bid_size, Quantity::from("18"));
1734        assert_eq!(quote.ask_size, Quantity::from("13"));
1735        assert_eq!(quote.ts_event, msg.ts_recv);
1736        assert_eq!(quote.ts_event, 1609113600000000000);
1737        assert_eq!(quote.ts_init, 0);
1738    }
1739
1740    #[rstest]
1741    fn test_decode_mbp10_msg() {
1742        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1743        let mut dbn_stream = Decoder::from_zstd_file(path)
1744            .unwrap()
1745            .decode_stream::<dbn::Mbp10Msg>();
1746        let msg = dbn_stream.next().unwrap().unwrap();
1747
1748        let instrument_id = InstrumentId::from("ESM4.GLBX");
1749        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1750
1751        assert_eq!(depth10.instrument_id, instrument_id);
1752        assert_eq!(depth10.bids.len(), 10);
1753        assert_eq!(depth10.asks.len(), 10);
1754        assert_eq!(depth10.bid_counts.len(), 10);
1755        assert_eq!(depth10.ask_counts.len(), 10);
1756        assert_eq!(depth10.flags, 128);
1757        assert_eq!(depth10.sequence, 1_170_352);
1758        assert_eq!(depth10.ts_event, msg.ts_recv);
1759        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1760        assert_eq!(depth10.ts_init, 0);
1761    }
1762
1763    #[rstest]
1764    fn test_decode_trade_msg() {
1765        let path = test_data_path().join("test_data.trades.dbn.zst");
1766        let mut dbn_stream = Decoder::from_zstd_file(path)
1767            .unwrap()
1768            .decode_stream::<dbn::TradeMsg>();
1769        let msg = dbn_stream.next().unwrap().unwrap();
1770
1771        let instrument_id = InstrumentId::from("ESM4.GLBX");
1772        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1773
1774        assert_eq!(trade.instrument_id, instrument_id);
1775        assert_eq!(trade.price, Price::from("3720.25"));
1776        assert_eq!(trade.size, Quantity::from("5"));
1777        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1778        assert_eq!(trade.trade_id.to_string(), "1170380");
1779        assert_eq!(trade.ts_event, msg.ts_recv);
1780        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1781        assert_eq!(trade.ts_init, 0);
1782    }
1783
1784    #[rstest]
1785    fn test_decode_tbbo_msg() {
1786        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1787        let mut dbn_stream = Decoder::from_zstd_file(path)
1788            .unwrap()
1789            .decode_stream::<dbn::Mbp1Msg>();
1790        let msg = dbn_stream.next().unwrap().unwrap();
1791
1792        let instrument_id = InstrumentId::from("ESM4.GLBX");
1793        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1794
1795        assert_eq!(quote.instrument_id, instrument_id);
1796        assert_eq!(quote.bid_price, Price::from("3720.25"));
1797        assert_eq!(quote.ask_price, Price::from("3720.50"));
1798        assert_eq!(quote.bid_size, Quantity::from("26"));
1799        assert_eq!(quote.ask_size, Quantity::from("7"));
1800        assert_eq!(quote.ts_event, msg.ts_recv);
1801        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1802        assert_eq!(quote.ts_init, 0);
1803
1804        assert_eq!(trade.instrument_id, instrument_id);
1805        assert_eq!(trade.price, Price::from("3720.25"));
1806        assert_eq!(trade.size, Quantity::from("5"));
1807        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1808        assert_eq!(trade.trade_id.to_string(), "1170380");
1809        assert_eq!(trade.ts_event, msg.ts_recv);
1810        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1811        assert_eq!(trade.ts_init, 0);
1812    }
1813
1814    #[rstest]
1815    fn test_decode_ohlcv_msg() {
1816        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1817        let mut dbn_stream = Decoder::from_zstd_file(path)
1818            .unwrap()
1819            .decode_stream::<dbn::OhlcvMsg>();
1820        let msg = dbn_stream.next().unwrap().unwrap();
1821
1822        let instrument_id = InstrumentId::from("ESM4.GLBX");
1823        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1824
1825        assert_eq!(
1826            bar.bar_type,
1827            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1828        );
1829        assert_eq!(bar.open, Price::from("372025.00"));
1830        assert_eq!(bar.high, Price::from("372050.00"));
1831        assert_eq!(bar.low, Price::from("372025.00"));
1832        assert_eq!(bar.close, Price::from("372050.00"));
1833        assert_eq!(bar.volume, Quantity::from("57"));
1834        assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); // timestamp_on_close=true
1835        assert_eq!(bar.ts_init, 0); // ts_init was Some(0)
1836    }
1837
1838    #[rstest]
1839    fn test_decode_definition_msg() {
1840        let path = test_data_path().join("test_data.definition.dbn.zst");
1841        let mut dbn_stream = Decoder::from_zstd_file(path)
1842            .unwrap()
1843            .decode_stream::<dbn::InstrumentDefMsg>();
1844        let msg = dbn_stream.next().unwrap().unwrap();
1845
1846        let instrument_id = InstrumentId::from("ESM4.GLBX");
1847        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
1848
1849        assert!(result.is_ok());
1850        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1851    }
1852
1853    #[rstest]
1854    fn test_decode_status_msg() {
1855        let path = test_data_path().join("test_data.status.dbn.zst");
1856        let mut dbn_stream = Decoder::from_zstd_file(path)
1857            .unwrap()
1858            .decode_stream::<dbn::StatusMsg>();
1859        let msg = dbn_stream.next().unwrap().unwrap();
1860
1861        let instrument_id = InstrumentId::from("ESM4.GLBX");
1862        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
1863
1864        assert_eq!(status.instrument_id, instrument_id);
1865        assert_eq!(status.action, MarketStatusAction::Trading);
1866        assert_eq!(status.ts_event, msg.hd.ts_event);
1867        assert_eq!(status.ts_init, 0);
1868        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1869        assert_eq!(status.trading_event, None);
1870        assert_eq!(status.is_trading, Some(true));
1871        assert_eq!(status.is_quoting, Some(true));
1872        assert_eq!(status.is_short_sell_restricted, None);
1873    }
1874
1875    #[rstest]
1876    fn test_decode_imbalance_msg() {
1877        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1878        let mut dbn_stream = Decoder::from_zstd_file(path)
1879            .unwrap()
1880            .decode_stream::<dbn::ImbalanceMsg>();
1881        let msg = dbn_stream.next().unwrap().unwrap();
1882
1883        let instrument_id = InstrumentId::from("ESM4.GLBX");
1884        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1885
1886        assert_eq!(imbalance.instrument_id, instrument_id);
1887        assert_eq!(imbalance.ref_price, Price::from("229.43"));
1888        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1889        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1890        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1891        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1892        assert_eq!(imbalance.side, OrderSide::Buy);
1893        assert_eq!(imbalance.significant_imbalance, 126);
1894        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1895        assert_eq!(imbalance.ts_recv, msg.ts_recv);
1896        assert_eq!(imbalance.ts_init, 0);
1897    }
1898
1899    #[rstest]
1900    fn test_decode_statistics_msg() {
1901        let path = test_data_path().join("test_data.statistics.dbn.zst");
1902        let mut dbn_stream = Decoder::from_zstd_file(path)
1903            .unwrap()
1904            .decode_stream::<dbn::StatMsg>();
1905        let msg = dbn_stream.next().unwrap().unwrap();
1906
1907        let instrument_id = InstrumentId::from("ESM4.GLBX");
1908        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1909
1910        assert_eq!(statistics.instrument_id, instrument_id);
1911        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1912        assert_eq!(
1913            statistics.update_action,
1914            DatabentoStatisticUpdateAction::Added
1915        );
1916        assert_eq!(statistics.price, Some(Price::from("100.00")));
1917        assert_eq!(statistics.quantity, None);
1918        assert_eq!(statistics.channel_id, 13);
1919        assert_eq!(statistics.stat_flags, 255);
1920        assert_eq!(statistics.sequence, 2);
1921        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1922        assert_eq!(statistics.ts_in_delta, 26961);
1923        assert_eq!(statistics.ts_event, msg.hd.ts_event);
1924        assert_eq!(statistics.ts_recv, msg.ts_recv);
1925        assert_eq!(statistics.ts_init, 0);
1926    }
1927
1928    #[rstest]
1929    fn test_decode_cmbp1_msg() {
1930        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
1931        let mut dbn_stream = Decoder::from_zstd_file(path)
1932            .unwrap()
1933            .decode_stream::<dbn::Cmbp1Msg>();
1934        let msg = dbn_stream.next().unwrap().unwrap();
1935
1936        let instrument_id = InstrumentId::from("ESM4.GLBX");
1937        let (quote, trade) = decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
1938
1939        assert_eq!(quote.instrument_id, instrument_id);
1940        assert!(quote.bid_price.raw > 0);
1941        assert!(quote.ask_price.raw > 0);
1942        assert!(quote.bid_size.raw > 0);
1943        assert!(quote.ask_size.raw > 0);
1944        assert_eq!(quote.ts_event, msg.ts_recv);
1945        assert_eq!(quote.ts_init, 0);
1946
1947        // Check if trade is present based on action
1948        if msg.action as u8 as char == 'T' {
1949            assert!(trade.is_some());
1950            let trade = trade.unwrap();
1951            assert_eq!(trade.instrument_id, instrument_id);
1952        } else {
1953            assert!(trade.is_none());
1954        }
1955    }
1956
1957    #[rstest]
1958    fn test_decode_cbbo_1s_msg() {
1959        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
1960        let mut dbn_stream = Decoder::from_zstd_file(path)
1961            .unwrap()
1962            .decode_stream::<dbn::CbboMsg>();
1963        let msg = dbn_stream.next().unwrap().unwrap();
1964
1965        let instrument_id = InstrumentId::from("ESM4.GLBX");
1966        let quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1967
1968        assert_eq!(quote.instrument_id, instrument_id);
1969        assert!(quote.bid_price.raw > 0);
1970        assert!(quote.ask_price.raw > 0);
1971        assert!(quote.bid_size.raw > 0);
1972        assert!(quote.ask_size.raw > 0);
1973        assert_eq!(quote.ts_event, msg.ts_recv);
1974        assert_eq!(quote.ts_init, 0);
1975    }
1976
1977    #[rstest]
1978    fn test_decode_mbp10_msg_with_all_levels() {
1979        let mut msg = dbn::Mbp10Msg::default();
1980        for i in 0..10 {
1981            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
1982            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
1983            msg.levels[i].bid_sz = 10 + i as u32;
1984            msg.levels[i].ask_sz = 10 + i as u32;
1985            msg.levels[i].bid_ct = 1 + i as u32;
1986            msg.levels[i].ask_ct = 1 + i as u32;
1987        }
1988        msg.ts_recv = 1_609_160_400_000_704_060;
1989
1990        let instrument_id = InstrumentId::from("TEST.VENUE");
1991        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
1992
1993        assert!(result.is_ok());
1994        let depth = result.unwrap();
1995        assert_eq!(depth.bids.len(), 10);
1996        assert_eq!(depth.asks.len(), 10);
1997        assert_eq!(depth.bid_counts.len(), 10);
1998        assert_eq!(depth.ask_counts.len(), 10);
1999    }
2000
2001    #[rstest]
2002    fn test_array_conversion_error_handling() {
2003        let mut bids = Vec::new();
2004        let mut asks = Vec::new();
2005
2006        // Intentionally create fewer than DEPTH10_LEN elements
2007        for i in 0..5 {
2008            bids.push(BookOrder::new(
2009                OrderSide::Buy,
2010                Price::from(format!("{}.00", 100 - i)),
2011                Quantity::from(10),
2012                i as u64,
2013            ));
2014            asks.push(BookOrder::new(
2015                OrderSide::Sell,
2016                Price::from(format!("{}.00", 101 + i)),
2017                Quantity::from(10),
2018                i as u64,
2019            ));
2020        }
2021
2022        let result: Result<[BookOrder; DEPTH10_LEN], _> =
2023            bids.try_into().map_err(|v: Vec<BookOrder>| {
2024                anyhow::anyhow!(
2025                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2026                    v.len()
2027                )
2028            });
2029        assert!(result.is_err());
2030        assert!(
2031            result
2032                .unwrap_err()
2033                .to_string()
2034                .contains("Expected exactly 10 bid levels, received 5")
2035        );
2036    }
2037
2038    #[rstest]
2039    fn test_decode_tcbbo_msg() {
2040        // Use cbbo-1s as base since cbbo.dbn.zst was invalid
2041        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2042        let mut dbn_stream = Decoder::from_zstd_file(path)
2043            .unwrap()
2044            .decode_stream::<dbn::CbboMsg>();
2045        let msg = dbn_stream.next().unwrap().unwrap();
2046
2047        // Simulate TCBBO by adding trade data
2048        let mut tcbbo_msg = msg.clone();
2049        tcbbo_msg.price = 3702500000000;
2050        tcbbo_msg.size = 10;
2051
2052        let instrument_id = InstrumentId::from("ESM4.GLBX");
2053        let (quote, trade) =
2054            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2055
2056        assert_eq!(quote.instrument_id, instrument_id);
2057        assert!(quote.bid_price.raw > 0);
2058        assert!(quote.ask_price.raw > 0);
2059        assert!(quote.bid_size.raw > 0);
2060        assert!(quote.ask_size.raw > 0);
2061        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2062        assert_eq!(quote.ts_init, 0);
2063
2064        assert_eq!(trade.instrument_id, instrument_id);
2065        assert_eq!(trade.price, Price::from("3702.50"));
2066        assert_eq!(trade.size, Quantity::from(10));
2067        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2068        assert_eq!(trade.ts_init, 0);
2069    }
2070
2071    #[rstest]
2072    fn test_decode_bar_type() {
2073        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2074        let instrument_id = InstrumentId::from("ESM4.GLBX");
2075
2076        // Test 1-second bar
2077        msg.hd.rtype = 32;
2078        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2079        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2080
2081        // Test 1-minute bar
2082        msg.hd.rtype = 33;
2083        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2084        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2085
2086        // Test 1-hour bar
2087        msg.hd.rtype = 34;
2088        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2089        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2090
2091        // Test 1-day bar
2092        msg.hd.rtype = 35;
2093        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2094        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2095
2096        // Test unsupported rtype
2097        msg.hd.rtype = 99;
2098        let result = decode_bar_type(&msg, instrument_id);
2099        assert!(result.is_err());
2100    }
2101
2102    #[rstest]
2103    fn test_decode_ts_event_adjustment() {
2104        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2105
2106        // Test 1-second bar adjustment
2107        msg.hd.rtype = 32;
2108        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2109        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2110
2111        // Test 1-minute bar adjustment
2112        msg.hd.rtype = 33;
2113        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2114        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2115
2116        // Test 1-hour bar adjustment
2117        msg.hd.rtype = 34;
2118        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2119        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2120
2121        // Test 1-day bar adjustment
2122        msg.hd.rtype = 35;
2123        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2124        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2125
2126        // Test eod bar adjustment (same as 1d)
2127        msg.hd.rtype = 36;
2128        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2129        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2130
2131        // Test unsupported rtype
2132        msg.hd.rtype = 99;
2133        let result = decode_ts_event_adjustment(&msg);
2134        assert!(result.is_err());
2135    }
2136
2137    #[rstest]
2138    fn test_decode_record() {
2139        // Test with MBO message
2140        let path = test_data_path().join("test_data.mbo.dbn.zst");
2141        let decoder = Decoder::from_zstd_file(path).unwrap();
2142        let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2143        let msg = dbn_stream.next().unwrap().unwrap();
2144
2145        let record_ref = dbn::RecordRef::from(msg);
2146        let instrument_id = InstrumentId::from("ESM4.GLBX");
2147
2148        let (data1, data2) =
2149            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2150
2151        assert!(data1.is_some());
2152        assert!(data2.is_none());
2153
2154        // Test with Trade message
2155        let path = test_data_path().join("test_data.trades.dbn.zst");
2156        let decoder = Decoder::from_zstd_file(path).unwrap();
2157        let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2158        let msg = dbn_stream.next().unwrap().unwrap();
2159
2160        let record_ref = dbn::RecordRef::from(msg);
2161
2162        let (data1, data2) =
2163            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2164
2165        assert!(data1.is_some());
2166        assert!(data2.is_none());
2167        assert!(matches!(data1.unwrap(), Data::Trade(_)));
2168    }
2169}