nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND, uuid::UUID4};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24    },
25    enums::{
26        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28    },
29    identifiers::{InstrumentId, TradeId},
30    instruments::{
31        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32    },
33    types::{
34        Currency, Price, Quantity,
35        price::{PRICE_UNDEF, PriceRaw, decode_raw_price_i64},
36    },
37};
38use ustr::Ustr;
39
40use super::{
41    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
42    types::{DatabentoImbalance, DatabentoStatistics},
43};
44
45// SAFETY: Known valid value
46const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
47
48const BAR_SPEC_1S: BarSpecification = BarSpecification {
49    step: STEP_ONE,
50    aggregation: BarAggregation::Second,
51    price_type: PriceType::Last,
52};
53const BAR_SPEC_1M: BarSpecification = BarSpecification {
54    step: STEP_ONE,
55    aggregation: BarAggregation::Minute,
56    price_type: PriceType::Last,
57};
58const BAR_SPEC_1H: BarSpecification = BarSpecification {
59    step: STEP_ONE,
60    aggregation: BarAggregation::Hour,
61    price_type: PriceType::Last,
62};
63const BAR_SPEC_1D: BarSpecification = BarSpecification {
64    step: STEP_ONE,
65    aggregation: BarAggregation::Day,
66    price_type: PriceType::Last,
67};
68
69const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
70const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
71const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
72const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
73
74#[must_use]
75pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
76    match c as u8 as char {
77        'Y' => Some(true),
78        'N' => Some(false),
79        _ => None,
80    }
81}
82
83#[must_use]
84pub const fn parse_order_side(c: c_char) -> OrderSide {
85    match c as u8 as char {
86        'A' => OrderSide::Sell,
87        'B' => OrderSide::Buy,
88        _ => OrderSide::NoOrderSide,
89    }
90}
91
92#[must_use]
93pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
94    match c as u8 as char {
95        'A' => AggressorSide::Seller,
96        'B' => AggressorSide::Buyer,
97        _ => AggressorSide::NoAggressor,
98    }
99}
100
101/// Parses a Databento book action character into a `BookAction` enum.
102///
103/// # Errors
104///
105/// Returns an error if `c` is not a valid `BookAction` character.
106pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
107    match c as u8 as char {
108        'A' => Ok(BookAction::Add),
109        'C' => Ok(BookAction::Delete),
110        'F' => Ok(BookAction::Update),
111        'M' => Ok(BookAction::Update),
112        'R' => Ok(BookAction::Clear),
113        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
114    }
115}
116
117/// Parses a Databento option kind character into an `OptionKind` enum.
118///
119/// # Errors
120///
121/// Returns an error if `c` is not a valid `OptionKind` character.
122pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
123    match c as u8 as char {
124        'C' => Ok(OptionKind::Call),
125        'P' => Ok(OptionKind::Put),
126        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
127    }
128}
129
130fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
131    match value {
132        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
133            tracing::warn!("Unknown currency code '{value}', defaulting to USD");
134            Currency::USD()
135        }),
136        Ok(_) => Currency::USD(),
137        Err(e) => {
138            tracing::error!("Error parsing currency: {e}");
139            Currency::USD()
140        }
141    }
142}
143
144/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
145///
146/// # Errors
147///
148/// Returns an error if `value` has fewer than 3 characters.
149pub fn parse_cfi_iso10926(
150    value: &str,
151) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
152    let chars: Vec<char> = value.chars().collect();
153    if chars.len() < 3 {
154        anyhow::bail!("Value string is too short");
155    }
156
157    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
158    let cfi_category = chars[0];
159    let cfi_group = chars[1];
160    let cfi_attribute1 = chars[2];
161    // let cfi_attribute2 = value[3];
162    // let cfi_attribute3 = value[4];
163    // let cfi_attribute4 = value[5];
164
165    let mut asset_class = match cfi_category {
166        'D' => Some(AssetClass::Debt),
167        'E' => Some(AssetClass::Equity),
168        'S' => None,
169        _ => None,
170    };
171
172    let instrument_class = match cfi_group {
173        'I' => Some(InstrumentClass::Future),
174        _ => None,
175    };
176
177    if cfi_attribute1 == 'I' {
178        asset_class = Some(AssetClass::Index);
179    }
180
181    Ok((asset_class, instrument_class))
182}
183
184/// Parses a Databento status reason code into a human-readable string.
185///
186/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
187///
188/// # Errors
189///
190/// Returns an error if `value` is an invalid status reason code.
191pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
192    let value_str = match value {
193        0 => return Ok(None),
194        1 => "Scheduled",
195        2 => "Surveillance intervention",
196        3 => "Market event",
197        4 => "Instrument activation",
198        5 => "Instrument expiration",
199        6 => "Recovery in process",
200        10 => "Regulatory",
201        11 => "Administrative",
202        12 => "Non-compliance",
203        13 => "Filings not current",
204        14 => "SEC trading suspension",
205        15 => "New issue",
206        16 => "Issue available",
207        17 => "Issues reviewed",
208        18 => "Filing requirements satisfied",
209        30 => "News pending",
210        31 => "News released",
211        32 => "News and resumption times",
212        33 => "News not forthcoming",
213        40 => "Order imbalance",
214        50 => "LULD pause",
215        60 => "Operational",
216        70 => "Additional information requested",
217        80 => "Merger effective",
218        90 => "ETF",
219        100 => "Corporate action",
220        110 => "New Security offering",
221        120 => "Market wide halt level 1",
222        121 => "Market wide halt level 2",
223        122 => "Market wide halt level 3",
224        123 => "Market wide halt carryover",
225        124 => "Market wide halt resumption",
226        130 => "Quotation not available",
227        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
228    };
229
230    Ok(Some(Ustr::from(value_str)))
231}
232
233/// Parses a Databento status trading event code into a human-readable string.
234///
235/// # Errors
236///
237/// Returns an error if `value` is an invalid status trading event code.
238pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
239    let value_str = match value {
240        0 => return Ok(None),
241        1 => "No cancel",
242        2 => "Change trading session",
243        3 => "Implied matching on",
244        4 => "Implied matching off",
245        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
246    };
247
248    Ok(Some(Ustr::from(value_str)))
249}
250
251/// Decodes a raw price from an i64 value and returns the appropriate precision.
252///
253/// If the decoded raw value equals `PRICE_UNDEF`, precision is forced to 0
254/// as required by the `Price` type invariants.
255///
256/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
257/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
258#[inline(always)]
259#[must_use]
260fn decode_raw_price_with_precision(value: i64, precision: u8) -> (PriceRaw, u8) {
261    let raw = if value == i64::MAX {
262        PRICE_UNDEF
263    } else {
264        decode_raw_price_i64(value)
265    };
266
267    // PRICE_UNDEF must always have precision 0
268    let precision = if raw == PRICE_UNDEF { 0 } else { precision };
269    (raw, precision)
270}
271
272/// Decodes a price from the given value, expressed in units of 1e-9.
273#[inline(always)]
274#[must_use]
275pub fn decode_price(value: i64, precision: u8) -> Price {
276    let (raw, precision) = decode_raw_price_with_precision(value, precision);
277    Price::from_raw(raw, precision)
278}
279
280/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
281#[inline(always)]
282#[must_use]
283pub fn decode_price_increment(value: i64, precision: u8) -> Price {
284    match value {
285        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
286        _ => decode_price(value, precision),
287    }
288}
289
290/// Decodes a price from the given optional value, expressed in units of 1e-9.
291#[inline(always)]
292#[must_use]
293pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
294    match value {
295        i64::MAX => None,
296        _ => Some(decode_price(value, precision)),
297    }
298}
299
300/// Decodes a quantity from the given value, expressed in standard whole-number units.
301#[inline(always)]
302#[must_use]
303pub fn decode_quantity(value: u64) -> Quantity {
304    Quantity::from(value)
305}
306
307/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
308#[inline(always)]
309#[must_use]
310pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
311    match value {
312        i64::MAX => None,
313        _ => Some(Quantity::from(value)),
314    }
315}
316
317/// Decodes a multiplier from the given value, expressed in units of 1e-9.
318/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
319///
320/// # Errors
321///
322/// Returns an error if value is negative (invalid multiplier).
323pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
324    match value {
325        0 | i64::MAX => Ok(Quantity::from(1)),
326        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
327        v => {
328            // Work in integers: v is fixed-point with 9 fractional digits.
329            // Build a canonical decimal string without floating-point.
330            let abs = v as u128;
331
332            const SCALE: u128 = 1_000_000_000;
333            let int_part = abs / SCALE;
334            let frac_part = abs % SCALE;
335
336            // Format fractional part with exactly 9 digits, then trim trailing zeros
337            // to keep a canonical representation.
338            if frac_part == 0 {
339                // Pure integer
340                Ok(Quantity::from(int_part as u64))
341            } else {
342                let mut frac_str = format!("{frac_part:09}");
343                while frac_str.ends_with('0') {
344                    frac_str.pop();
345                }
346                let s = format!("{int_part}.{frac_str}");
347                Ok(Quantity::from(s))
348            }
349        }
350    }
351}
352
353/// Decodes a lot size from the given value, expressed in standard whole-number units.
354#[inline(always)]
355#[must_use]
356pub fn decode_lot_size(value: i32) -> Quantity {
357    match value {
358        0 | i32::MAX => Quantity::from(1),
359        value => Quantity::from(value),
360    }
361}
362
363#[inline(always)]
364#[must_use]
365fn is_trade_msg(action: c_char) -> bool {
366    action as u8 as char == 'T'
367}
368
369/// Returns `true` if both bid and ask prices are defined (not `i64::MAX`).
370///
371/// Databento uses `i64::MAX` as a sentinel value for undefined/null prices.
372/// A valid quote requires both sides to be defined.
373#[inline(always)]
374#[must_use]
375fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
376    bid_px != i64::MAX && ask_px != i64::MAX
377}
378
379/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
380///
381/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
382/// whether the message represents an order book update or a trade execution.
383///
384/// # Errors
385///
386/// Returns an error if decoding the MBO message fails.
387pub fn decode_mbo_msg(
388    msg: &dbn::MboMsg,
389    instrument_id: InstrumentId,
390    price_precision: u8,
391    ts_init: Option<UnixNanos>,
392    include_trades: bool,
393) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
394    let side = parse_order_side(msg.side);
395    if is_trade_msg(msg.action) {
396        if include_trades && msg.size > 0 {
397            let price = decode_price(msg.price, price_precision);
398            let size = decode_quantity(msg.size as u64);
399            let aggressor_side = parse_aggressor_side(msg.side);
400            let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
401            let ts_event = msg.ts_recv.into();
402            let ts_init = ts_init.unwrap_or(ts_event);
403
404            let trade = TradeTick::new(
405                instrument_id,
406                price,
407                size,
408                aggressor_side,
409                trade_id,
410                ts_event,
411                ts_init,
412            );
413            return Ok((None, Some(trade)));
414        }
415
416        return Ok((None, None));
417    }
418
419    let action = parse_book_action(msg.action)?;
420    let (raw_price, precision) = decode_raw_price_with_precision(msg.price, price_precision);
421    let price = Price::from_raw(raw_price, precision);
422    let size = decode_quantity(msg.size as u64);
423    let order = BookOrder::new(side, price, size, msg.order_id);
424
425    let ts_event = msg.ts_recv.into();
426    let ts_init = ts_init.unwrap_or(ts_event);
427
428    let delta = OrderBookDelta::new(
429        instrument_id,
430        action,
431        order,
432        msg.flags.raw(),
433        msg.sequence.into(),
434        ts_event,
435        ts_init,
436    );
437
438    Ok((Some(delta), None))
439}
440
441/// Decodes a Databento Trade message into a `TradeTick`.
442///
443/// # Errors
444///
445/// Returns an error if decoding the Trade message fails.
446pub fn decode_trade_msg(
447    msg: &dbn::TradeMsg,
448    instrument_id: InstrumentId,
449    price_precision: u8,
450    ts_init: Option<UnixNanos>,
451) -> anyhow::Result<TradeTick> {
452    let ts_event = msg.ts_recv.into();
453    let ts_init = ts_init.unwrap_or(ts_event);
454
455    let trade = TradeTick::new(
456        instrument_id,
457        decode_price(msg.price, price_precision),
458        decode_quantity(msg.size as u64),
459        parse_aggressor_side(msg.side),
460        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
461        ts_event,
462        ts_init,
463    );
464
465    Ok(trade)
466}
467
468/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
469///
470/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
471/// The trade is always returned.
472///
473/// # Errors
474///
475/// Returns an error if decoding the TBBO message fails.
476pub fn decode_tbbo_msg(
477    msg: &dbn::TbboMsg,
478    instrument_id: InstrumentId,
479    price_precision: u8,
480    ts_init: Option<UnixNanos>,
481) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
482    let top_level = &msg.levels[0];
483    let ts_event = msg.ts_recv.into();
484    let ts_init = ts_init.unwrap_or(ts_event);
485
486    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
487        Some(QuoteTick::new(
488            instrument_id,
489            decode_price(top_level.bid_px, price_precision),
490            decode_price(top_level.ask_px, price_precision),
491            decode_quantity(top_level.bid_sz as u64),
492            decode_quantity(top_level.ask_sz as u64),
493            ts_event,
494            ts_init,
495        ))
496    } else {
497        None
498    };
499
500    let trade = TradeTick::new(
501        instrument_id,
502        decode_price(msg.price, price_precision),
503        decode_quantity(msg.size as u64),
504        parse_aggressor_side(msg.side),
505        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
506        ts_event,
507        ts_init,
508    );
509
510    Ok((maybe_quote, trade))
511}
512
513/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
514///
515/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
516///
517/// # Errors
518///
519/// Returns an error if decoding the MBP1 message fails.
520pub fn decode_mbp1_msg(
521    msg: &dbn::Mbp1Msg,
522    instrument_id: InstrumentId,
523    price_precision: u8,
524    ts_init: Option<UnixNanos>,
525    include_trades: bool,
526) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
527    let top_level = &msg.levels[0];
528    let ts_event = msg.ts_recv.into();
529    let ts_init = ts_init.unwrap_or(ts_event);
530
531    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
532        Some(QuoteTick::new(
533            instrument_id,
534            decode_price(top_level.bid_px, price_precision),
535            decode_price(top_level.ask_px, price_precision),
536            decode_quantity(top_level.bid_sz as u64),
537            decode_quantity(top_level.ask_sz as u64),
538            ts_event,
539            ts_init,
540        ))
541    } else {
542        None
543    };
544
545    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
546        Some(TradeTick::new(
547            instrument_id,
548            decode_price(msg.price, price_precision),
549            decode_quantity(msg.size as u64),
550            parse_aggressor_side(msg.side),
551            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
552            ts_event,
553            ts_init,
554        ))
555    } else {
556        None
557    };
558
559    Ok((maybe_quote, maybe_trade))
560}
561
562/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
563///
564/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
565///
566/// # Errors
567///
568/// Returns an error if decoding the BBO message fails.
569pub fn decode_bbo_msg(
570    msg: &dbn::BboMsg,
571    instrument_id: InstrumentId,
572    price_precision: u8,
573    ts_init: Option<UnixNanos>,
574) -> anyhow::Result<Option<QuoteTick>> {
575    let top_level = &msg.levels[0];
576    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
577        return Ok(None);
578    }
579
580    let ts_event = msg.ts_recv.into();
581    let ts_init = ts_init.unwrap_or(ts_event);
582
583    let quote = QuoteTick::new(
584        instrument_id,
585        decode_price(top_level.bid_px, price_precision),
586        decode_price(top_level.ask_px, price_precision),
587        decode_quantity(top_level.bid_sz as u64),
588        decode_quantity(top_level.ask_sz as u64),
589        ts_event,
590        ts_init,
591    );
592
593    Ok(Some(quote))
594}
595
596/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
597///
598/// # Errors
599///
600/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
601pub fn decode_mbp10_msg(
602    msg: &dbn::Mbp10Msg,
603    instrument_id: InstrumentId,
604    price_precision: u8,
605    ts_init: Option<UnixNanos>,
606) -> anyhow::Result<OrderBookDepth10> {
607    let mut bids = Vec::with_capacity(DEPTH10_LEN);
608    let mut asks = Vec::with_capacity(DEPTH10_LEN);
609    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
610    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
611
612    for level in &msg.levels {
613        let bid_order = BookOrder::new(
614            OrderSide::Buy,
615            decode_price(level.bid_px, price_precision),
616            decode_quantity(level.bid_sz as u64),
617            0,
618        );
619
620        let ask_order = BookOrder::new(
621            OrderSide::Sell,
622            decode_price(level.ask_px, price_precision),
623            decode_quantity(level.ask_sz as u64),
624            0,
625        );
626
627        bids.push(bid_order);
628        asks.push(ask_order);
629        bid_counts.push(level.bid_ct);
630        ask_counts.push(level.ask_ct);
631    }
632
633    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
634        anyhow::anyhow!(
635            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
636            v.len()
637        )
638    })?;
639
640    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
641        anyhow::anyhow!(
642            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
643            v.len()
644        )
645    })?;
646
647    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
648        anyhow::anyhow!(
649            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
650            v.len()
651        )
652    })?;
653
654    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
655        anyhow::anyhow!(
656            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
657            v.len()
658        )
659    })?;
660
661    let ts_event = msg.ts_recv.into();
662    let ts_init = ts_init.unwrap_or(ts_event);
663
664    let depth = OrderBookDepth10::new(
665        instrument_id,
666        bids,
667        asks,
668        bid_counts,
669        ask_counts,
670        msg.flags.raw(),
671        msg.sequence.into(),
672        ts_event,
673        ts_init,
674    );
675
676    Ok(depth)
677}
678
679/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
680///
681/// Returns a tuple containing an optional `QuoteTick` and an optional `TradeTick`.
682/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
683///
684/// # Errors
685///
686/// Returns an error if decoding the CMBP1 message fails.
687pub fn decode_cmbp1_msg(
688    msg: &dbn::Cmbp1Msg,
689    instrument_id: InstrumentId,
690    price_precision: u8,
691    ts_init: Option<UnixNanos>,
692    include_trades: bool,
693) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
694    let top_level = &msg.levels[0];
695    let ts_event = msg.ts_recv.into();
696    let ts_init = ts_init.unwrap_or(ts_event);
697
698    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
699        Some(QuoteTick::new(
700            instrument_id,
701            decode_price(top_level.bid_px, price_precision),
702            decode_price(top_level.ask_px, price_precision),
703            decode_quantity(top_level.bid_sz as u64),
704            decode_quantity(top_level.ask_sz as u64),
705            ts_event,
706            ts_init,
707        ))
708    } else {
709        None
710    };
711
712    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
713        // Use UUID4 for trade ID as CMBP1 doesn't have a sequence field
714        Some(TradeTick::new(
715            instrument_id,
716            decode_price(msg.price, price_precision),
717            decode_quantity(msg.size as u64),
718            parse_aggressor_side(msg.side),
719            TradeId::new(UUID4::new().as_str()),
720            ts_event,
721            ts_init,
722        ))
723    } else {
724        None
725    };
726
727    Ok((maybe_quote, maybe_trade))
728}
729
730/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
731///
732/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
733///
734/// # Errors
735///
736/// Returns an error if decoding the CBBO message fails.
737pub fn decode_cbbo_msg(
738    msg: &dbn::CbboMsg,
739    instrument_id: InstrumentId,
740    price_precision: u8,
741    ts_init: Option<UnixNanos>,
742) -> anyhow::Result<Option<QuoteTick>> {
743    let top_level = &msg.levels[0];
744    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
745        return Ok(None);
746    }
747
748    let ts_event = msg.ts_recv.into();
749    let ts_init = ts_init.unwrap_or(ts_event);
750
751    let quote = QuoteTick::new(
752        instrument_id,
753        decode_price(top_level.bid_px, price_precision),
754        decode_price(top_level.ask_px, price_precision),
755        decode_quantity(top_level.bid_sz as u64),
756        decode_quantity(top_level.ask_sz as u64),
757        ts_event,
758        ts_init,
759    );
760
761    Ok(Some(quote))
762}
763
764/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
765///
766/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
767/// The trade is always returned.
768///
769/// # Errors
770///
771/// Returns an error if decoding the TCBBO message fails.
772pub fn decode_tcbbo_msg(
773    msg: &dbn::CbboMsg,
774    instrument_id: InstrumentId,
775    price_precision: u8,
776    ts_init: Option<UnixNanos>,
777) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
778    let top_level = &msg.levels[0];
779    let ts_event = msg.ts_recv.into();
780    let ts_init = ts_init.unwrap_or(ts_event);
781
782    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
783        Some(QuoteTick::new(
784            instrument_id,
785            decode_price(top_level.bid_px, price_precision),
786            decode_price(top_level.ask_px, price_precision),
787            decode_quantity(top_level.bid_sz as u64),
788            decode_quantity(top_level.ask_sz as u64),
789            ts_event,
790            ts_init,
791        ))
792    } else {
793        None
794    };
795
796    // Use UUID4 for trade ID as TCBBO doesn't have a sequence field
797    let trade = TradeTick::new(
798        instrument_id,
799        decode_price(msg.price, price_precision),
800        decode_quantity(msg.size as u64),
801        parse_aggressor_side(msg.side),
802        TradeId::new(UUID4::new().as_str()),
803        ts_event,
804        ts_init,
805    );
806
807    Ok((maybe_quote, trade))
808}
809
810/// # Errors
811///
812/// Returns an error if `rtype` is not a supported bar aggregation.
813pub fn decode_bar_type(
814    msg: &dbn::OhlcvMsg,
815    instrument_id: InstrumentId,
816) -> anyhow::Result<BarType> {
817    let bar_type = match msg.hd.rtype {
818        32 => {
819            // ohlcv-1s
820            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
821        }
822        33 => {
823            // ohlcv-1m
824            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
825        }
826        34 => {
827            // ohlcv-1h
828            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
829        }
830        35 => {
831            // ohlcv-1d
832            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
833        }
834        36 => {
835            // ohlcv-eod
836            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
837        }
838        _ => anyhow::bail!(
839            "`rtype` is not a supported bar aggregation, was {}",
840            msg.hd.rtype
841        ),
842    };
843
844    Ok(bar_type)
845}
846
847/// # Errors
848///
849/// Returns an error if `rtype` is not a supported bar aggregation.
850pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
851    let adjustment = match msg.hd.rtype {
852        32 => {
853            // ohlcv-1s
854            BAR_CLOSE_ADJUSTMENT_1S
855        }
856        33 => {
857            // ohlcv-1m
858            BAR_CLOSE_ADJUSTMENT_1M
859        }
860        34 => {
861            // ohlcv-1h
862            BAR_CLOSE_ADJUSTMENT_1H
863        }
864        35 | 36 => {
865            // ohlcv-1d and ohlcv-eod
866            BAR_CLOSE_ADJUSTMENT_1D
867        }
868        _ => anyhow::bail!(
869            "`rtype` is not a supported bar aggregation, was {}",
870            msg.hd.rtype
871        ),
872    };
873
874    Ok(adjustment.into())
875}
876
877/// # Errors
878///
879/// Returns an error if decoding the OHLCV message fails.
880pub fn decode_ohlcv_msg(
881    msg: &dbn::OhlcvMsg,
882    instrument_id: InstrumentId,
883    price_precision: u8,
884    ts_init: Option<UnixNanos>,
885    timestamp_on_close: bool,
886) -> anyhow::Result<Bar> {
887    let bar_type = decode_bar_type(msg, instrument_id)?;
888    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
889
890    let ts_event_raw = msg.hd.ts_event.into();
891    let ts_close = ts_event_raw + ts_event_adjustment;
892    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
893
894    let ts_event = if timestamp_on_close {
895        ts_close
896    } else {
897        ts_event_raw
898    };
899
900    let bar = Bar::new(
901        bar_type,
902        decode_price(msg.open, price_precision),
903        decode_price(msg.high, price_precision),
904        decode_price(msg.low, price_precision),
905        decode_price(msg.close, price_precision),
906        decode_quantity(msg.volume),
907        ts_event,
908        ts_init,
909    );
910
911    Ok(bar)
912}
913
914/// Decodes a Databento status message into an `InstrumentStatus` event.
915///
916/// # Errors
917///
918/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
919pub fn decode_status_msg(
920    msg: &dbn::StatusMsg,
921    instrument_id: InstrumentId,
922    ts_init: Option<UnixNanos>,
923) -> anyhow::Result<InstrumentStatus> {
924    let ts_event = msg.hd.ts_event.into();
925    let ts_init = ts_init.unwrap_or(ts_event);
926
927    let action = MarketStatusAction::from_u16(msg.action)
928        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
929
930    let status = InstrumentStatus::new(
931        instrument_id,
932        action,
933        ts_event,
934        ts_init,
935        parse_status_reason(msg.reason)?,
936        parse_status_trading_event(msg.trading_event)?,
937        parse_optional_bool(msg.is_trading),
938        parse_optional_bool(msg.is_quoting),
939        parse_optional_bool(msg.is_short_sell_restricted),
940    );
941
942    Ok(status)
943}
944
945/// # Errors
946///
947/// Returns an error if decoding the record type fails or encounters unsupported message.
948pub fn decode_record(
949    record: &dbn::RecordRef,
950    instrument_id: InstrumentId,
951    price_precision: u8,
952    ts_init: Option<UnixNanos>,
953    include_trades: bool,
954    bars_timestamp_on_close: bool,
955) -> anyhow::Result<(Option<Data>, Option<Data>)> {
956    // Note: TBBO and TCBBO messages provide both quotes and trades.
957    // TBBO is handled explicitly below, while TCBBO is handled by
958    // the CbboMsg branch based on whether it has trade data.
959    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
960        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
961        let result = decode_mbo_msg(
962            msg,
963            instrument_id,
964            price_precision,
965            Some(ts_init),
966            include_trades,
967        )?;
968        match result {
969            (Some(delta), None) => (Some(Data::Delta(delta)), None),
970            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
971            (None, None) => (None, None),
972            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
973        }
974    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
975        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
976        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
977        (Some(Data::Trade(trade)), None)
978    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
979        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
980        let (maybe_quote, maybe_trade) = decode_mbp1_msg(
981            msg,
982            instrument_id,
983            price_precision,
984            Some(ts_init),
985            include_trades,
986        )?;
987        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
988    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
989        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
990        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
991        (maybe_quote.map(Data::Quote), None)
992    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
993        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
994        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
995        (maybe_quote.map(Data::Quote), None)
996    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
997        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
998        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
999        (Some(Data::from(depth)), None)
1000    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1001        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
1002        // it will be set correctly in decode_ohlcv_msg instead
1003        let bar = decode_ohlcv_msg(
1004            msg,
1005            instrument_id,
1006            price_precision,
1007            ts_init,
1008            bars_timestamp_on_close,
1009        )?;
1010        (Some(Data::Bar(bar)), None)
1011    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1012        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1013        let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1014            msg,
1015            instrument_id,
1016            price_precision,
1017            Some(ts_init),
1018            include_trades,
1019        )?;
1020        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1021    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1022        // TBBO always has a trade, quote may be skipped if prices undefined
1023        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1024        let (maybe_quote, trade) =
1025            decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1026        (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1027    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1028        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
1029        if msg.price != i64::MAX && msg.size > 0 {
1030            // TCBBO - has a trade, quote may be skipped if prices undefined
1031            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1032            let (maybe_quote, trade) =
1033                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1034            (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1035        } else {
1036            // Regular CBBO - quote only (may be None if prices undefined)
1037            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1038            let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1039            (maybe_quote.map(Data::Quote), None)
1040        }
1041    } else {
1042        anyhow::bail!("DBN message type is not currently supported")
1043    };
1044
1045    Ok(result)
1046}
1047
1048const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1049    match ts_init {
1050        Some(ts_init) => ts_init,
1051        None => msg_timestamp,
1052    }
1053}
1054
1055/// # Errors
1056///
1057/// Returns an error if decoding the `InstrumentDefMsg` fails.
1058pub fn decode_instrument_def_msg(
1059    msg: &dbn::InstrumentDefMsg,
1060    instrument_id: InstrumentId,
1061    ts_init: Option<UnixNanos>,
1062) -> anyhow::Result<InstrumentAny> {
1063    match msg.instrument_class as u8 as char {
1064        'K' => Ok(InstrumentAny::Equity(decode_equity(
1065            msg,
1066            instrument_id,
1067            ts_init,
1068        )?)),
1069        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1070            msg,
1071            instrument_id,
1072            ts_init,
1073        )?)),
1074        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1075            msg,
1076            instrument_id,
1077            ts_init,
1078        )?)),
1079        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1080            msg,
1081            instrument_id,
1082            ts_init,
1083        )?)),
1084        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1085            msg,
1086            instrument_id,
1087            ts_init,
1088        )?)),
1089        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1090        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1091        _ => anyhow::bail!(
1092            "Unsupported `instrument_class` '{}'",
1093            msg.instrument_class as u8 as char
1094        ),
1095    }
1096}
1097
1098/// Decodes a Databento instrument definition message into an `Equity` instrument.
1099///
1100/// # Errors
1101///
1102/// Returns an error if parsing or constructing `Equity` fails.
1103pub fn decode_equity(
1104    msg: &dbn::InstrumentDefMsg,
1105    instrument_id: InstrumentId,
1106    ts_init: Option<UnixNanos>,
1107) -> anyhow::Result<Equity> {
1108    let currency = parse_currency_or_usd_default(msg.currency());
1109    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1110    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1111    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1112    let ts_init = ts_init.unwrap_or(ts_event);
1113
1114    Ok(Equity::new(
1115        instrument_id,
1116        instrument_id.symbol,
1117        None, // No ISIN available yet
1118        currency,
1119        price_increment.precision,
1120        price_increment,
1121        Some(lot_size),
1122        None, // TBD
1123        None, // TBD
1124        None, // TBD
1125        None, // TBD
1126        None, // TBD
1127        None, // TBD
1128        None, // TBD
1129        None, // TBD
1130        ts_event,
1131        ts_init,
1132    ))
1133}
1134
1135/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1136///
1137/// # Errors
1138///
1139/// Returns an error if parsing or constructing `FuturesContract` fails.
1140pub fn decode_futures_contract(
1141    msg: &dbn::InstrumentDefMsg,
1142    instrument_id: InstrumentId,
1143    ts_init: Option<UnixNanos>,
1144) -> anyhow::Result<FuturesContract> {
1145    let currency = parse_currency_or_usd_default(msg.currency());
1146    let exchange = Ustr::from(msg.exchange()?);
1147    let underlying = Ustr::from(msg.asset()?);
1148    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1149    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1150    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1151    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1152    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1153    let ts_init = ts_init.unwrap_or(ts_event);
1154
1155    FuturesContract::new_checked(
1156        instrument_id,
1157        instrument_id.symbol,
1158        asset_class.unwrap_or(AssetClass::Commodity),
1159        Some(exchange),
1160        underlying,
1161        msg.activation.into(),
1162        msg.expiration.into(),
1163        currency,
1164        price_increment.precision,
1165        price_increment,
1166        multiplier,
1167        lot_size,
1168        None, // TBD
1169        None, // TBD
1170        None, // TBD
1171        None, // TBD
1172        None, // TBD
1173        None, // TBD
1174        None, // TBD
1175        None, // TBD
1176        ts_event,
1177        ts_init,
1178    )
1179}
1180
1181/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1182///
1183/// # Errors
1184///
1185/// Returns an error if parsing or constructing `FuturesSpread` fails.
1186pub fn decode_futures_spread(
1187    msg: &dbn::InstrumentDefMsg,
1188    instrument_id: InstrumentId,
1189    ts_init: Option<UnixNanos>,
1190) -> anyhow::Result<FuturesSpread> {
1191    let exchange = Ustr::from(msg.exchange()?);
1192    let underlying = Ustr::from(msg.asset()?);
1193    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1194    let strategy_type = Ustr::from(msg.secsubtype()?);
1195    let currency = parse_currency_or_usd_default(msg.currency());
1196    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1197    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1198    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1199    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1200    let ts_init = ts_init.unwrap_or(ts_event);
1201
1202    FuturesSpread::new_checked(
1203        instrument_id,
1204        instrument_id.symbol,
1205        asset_class.unwrap_or(AssetClass::Commodity),
1206        Some(exchange),
1207        underlying,
1208        strategy_type,
1209        msg.activation.into(),
1210        msg.expiration.into(),
1211        currency,
1212        price_increment.precision,
1213        price_increment,
1214        multiplier,
1215        lot_size,
1216        None, // TBD
1217        None, // TBD
1218        None, // TBD
1219        None, // TBD
1220        None, // TBD
1221        None, // TBD
1222        None, // TBD
1223        None, // TBD
1224        ts_event,
1225        ts_init,
1226    )
1227}
1228
1229/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1230///
1231/// # Errors
1232///
1233/// Returns an error if parsing or constructing `OptionContract` fails.
1234pub fn decode_option_contract(
1235    msg: &dbn::InstrumentDefMsg,
1236    instrument_id: InstrumentId,
1237    ts_init: Option<UnixNanos>,
1238) -> anyhow::Result<OptionContract> {
1239    let currency = parse_currency_or_usd_default(msg.currency());
1240    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1241    let exchange = Ustr::from(msg.exchange()?);
1242    let underlying = Ustr::from(msg.underlying()?);
1243    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1244        Some(AssetClass::Equity)
1245    } else {
1246        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1247        asset_class
1248    };
1249    let option_kind = parse_option_kind(msg.instrument_class)?;
1250    let strike_price = Price::from_raw(
1251        decode_raw_price_i64(msg.strike_price),
1252        strike_price_currency.precision,
1253    );
1254    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1255    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1256    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1257    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1258    let ts_init = ts_init.unwrap_or(ts_event);
1259
1260    OptionContract::new_checked(
1261        instrument_id,
1262        instrument_id.symbol,
1263        asset_class_opt.unwrap_or(AssetClass::Commodity),
1264        Some(exchange),
1265        underlying,
1266        option_kind,
1267        strike_price,
1268        currency,
1269        msg.activation.into(),
1270        msg.expiration.into(),
1271        price_increment.precision,
1272        price_increment,
1273        multiplier,
1274        lot_size,
1275        None, // TBD
1276        None, // TBD
1277        None, // TBD
1278        None, // TBD
1279        None, // TBD
1280        None, // TBD
1281        None, // TBD
1282        None, // TBD
1283        ts_event,
1284        ts_init,
1285    )
1286}
1287
1288/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1289///
1290/// # Errors
1291///
1292/// Returns an error if parsing or constructing `OptionSpread` fails.
1293pub fn decode_option_spread(
1294    msg: &dbn::InstrumentDefMsg,
1295    instrument_id: InstrumentId,
1296    ts_init: Option<UnixNanos>,
1297) -> anyhow::Result<OptionSpread> {
1298    let exchange = Ustr::from(msg.exchange()?);
1299    let underlying = Ustr::from(msg.underlying()?);
1300    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1301        Some(AssetClass::Equity)
1302    } else {
1303        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1304        asset_class
1305    };
1306    let strategy_type = Ustr::from(msg.secsubtype()?);
1307    let currency = parse_currency_or_usd_default(msg.currency());
1308    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1309    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1310    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1311    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1312    let ts_init = ts_init.unwrap_or(ts_event);
1313
1314    OptionSpread::new_checked(
1315        instrument_id,
1316        instrument_id.symbol,
1317        asset_class_opt.unwrap_or(AssetClass::Commodity),
1318        Some(exchange),
1319        underlying,
1320        strategy_type,
1321        msg.activation.into(),
1322        msg.expiration.into(),
1323        currency,
1324        price_increment.precision,
1325        price_increment,
1326        multiplier,
1327        lot_size,
1328        None, // TBD
1329        None, // TBD
1330        None, // TBD
1331        None, // TBD
1332        None, // TBD
1333        None, // TBD
1334        None, // TBD
1335        None, // TBD
1336        ts_event,
1337        ts_init,
1338    )
1339}
1340
1341/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1342///
1343/// # Errors
1344///
1345/// Returns an error if constructing `DatabentoImbalance` fails.
1346pub fn decode_imbalance_msg(
1347    msg: &dbn::ImbalanceMsg,
1348    instrument_id: InstrumentId,
1349    price_precision: u8,
1350    ts_init: Option<UnixNanos>,
1351) -> anyhow::Result<DatabentoImbalance> {
1352    let ts_event = msg.ts_recv.into();
1353    let ts_init = ts_init.unwrap_or(ts_event);
1354
1355    Ok(DatabentoImbalance::new(
1356        instrument_id,
1357        decode_price(msg.ref_price, price_precision),
1358        decode_price(msg.cont_book_clr_price, price_precision),
1359        decode_price(msg.auct_interest_clr_price, price_precision),
1360        Quantity::new(f64::from(msg.paired_qty), 0),
1361        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1362        parse_order_side(msg.side),
1363        msg.significant_imbalance as c_char,
1364        msg.hd.ts_event.into(),
1365        ts_event,
1366        ts_init,
1367    ))
1368}
1369
1370/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1371///
1372/// # Errors
1373///
1374/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1375/// `msg.update_action` is not a valid enum variant.
1376pub fn decode_statistics_msg(
1377    msg: &dbn::StatMsg,
1378    instrument_id: InstrumentId,
1379    price_precision: u8,
1380    ts_init: Option<UnixNanos>,
1381) -> anyhow::Result<DatabentoStatistics> {
1382    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1383        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1384    let update_action =
1385        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1386            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1387        })?;
1388    let ts_event = msg.ts_recv.into();
1389    let ts_init = ts_init.unwrap_or(ts_event);
1390
1391    Ok(DatabentoStatistics::new(
1392        instrument_id,
1393        stat_type,
1394        update_action,
1395        decode_optional_price(msg.price, price_precision),
1396        decode_optional_quantity(msg.quantity),
1397        msg.channel_id,
1398        msg.stat_flags,
1399        msg.sequence,
1400        msg.ts_ref.into(),
1401        msg.ts_in_delta,
1402        msg.hd.ts_event.into(),
1403        ts_event,
1404        ts_init,
1405    ))
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410    use std::path::{Path, PathBuf};
1411
1412    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1413    use fallible_streaming_iterator::FallibleStreamingIterator;
1414    use nautilus_model::instruments::Instrument;
1415    use rstest::*;
1416
1417    use super::*;
1418
1419    fn test_data_path() -> PathBuf {
1420        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1421    }
1422
1423    #[rstest]
1424    #[case('Y' as c_char, Some(true))]
1425    #[case('N' as c_char, Some(false))]
1426    #[case('X' as c_char, None)]
1427    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1428        assert_eq!(parse_optional_bool(input), expected);
1429    }
1430
1431    #[rstest]
1432    #[case('A' as c_char, OrderSide::Sell)]
1433    #[case('B' as c_char, OrderSide::Buy)]
1434    #[case('X' as c_char, OrderSide::NoOrderSide)]
1435    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1436        assert_eq!(parse_order_side(input), expected);
1437    }
1438
1439    #[rstest]
1440    #[case('A' as c_char, AggressorSide::Seller)]
1441    #[case('B' as c_char, AggressorSide::Buyer)]
1442    #[case('X' as c_char, AggressorSide::NoAggressor)]
1443    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1444        assert_eq!(parse_aggressor_side(input), expected);
1445    }
1446
1447    #[rstest]
1448    #[case('T' as c_char, true)]
1449    #[case('A' as c_char, false)]
1450    #[case('C' as c_char, false)]
1451    #[case('F' as c_char, false)]
1452    #[case('M' as c_char, false)]
1453    #[case('R' as c_char, false)]
1454    fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1455        assert_eq!(is_trade_msg(action), expected);
1456    }
1457
1458    #[rstest]
1459    #[case('A' as c_char, Ok(BookAction::Add))]
1460    #[case('C' as c_char, Ok(BookAction::Delete))]
1461    #[case('F' as c_char, Ok(BookAction::Update))]
1462    #[case('M' as c_char, Ok(BookAction::Update))]
1463    #[case('R' as c_char, Ok(BookAction::Clear))]
1464    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1465    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1466        match parse_book_action(input) {
1467            Ok(action) => assert_eq!(Ok(action), expected),
1468            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1469        }
1470    }
1471
1472    #[rstest]
1473    #[case('C' as c_char, Ok(OptionKind::Call))]
1474    #[case('P' as c_char, Ok(OptionKind::Put))]
1475    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1476    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1477        match parse_option_kind(input) {
1478            Ok(kind) => assert_eq!(Ok(kind), expected),
1479            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1480        }
1481    }
1482
1483    #[rstest]
1484    #[case(Ok("USD"), Currency::USD())]
1485    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1486    #[case(Ok(""), Currency::USD())]
1487    #[case(Err("Error"), Currency::USD())]
1488    fn test_parse_currency_or_usd_default(
1489        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1490        #[case] expected: Currency,
1491    ) {
1492        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1493        assert_eq!(actual, expected);
1494    }
1495
1496    #[rstest]
1497    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1498    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1499    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1500    #[case("XXX", Ok((None, None)))]
1501    #[case("D", Err("Value string is too short"))]
1502    fn test_parse_cfi_iso10926(
1503        #[case] input: &str,
1504        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1505    ) {
1506        match parse_cfi_iso10926(input) {
1507            Ok(result) => assert_eq!(Ok(result), expected),
1508            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1509        }
1510    }
1511
1512    #[rstest]
1513    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1514    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1515    #[case(
1516        10_000_000_000,
1517        2,
1518        Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1519    )]
1520    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1521        let actual = decode_price_increment(value, precision);
1522        assert_eq!(actual, expected);
1523    }
1524
1525    #[rstest]
1526    #[case(i64::MAX, 2, None)] // None for i64::MAX
1527    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1528    #[case(
1529        10_000_000_000,
1530        2,
1531        Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1532    )]
1533    fn test_decode_optional_price(
1534        #[case] value: i64,
1535        #[case] precision: u8,
1536        #[case] expected: Option<Price>,
1537    ) {
1538        let actual = decode_optional_price(value, precision);
1539        assert_eq!(actual, expected);
1540    }
1541
1542    #[rstest]
1543    #[case(i64::MAX, None)] // None for i32::MAX
1544    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1545    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1546    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1547        let actual = decode_optional_quantity(value);
1548        assert_eq!(actual, expected);
1549    }
1550
1551    #[rstest]
1552    #[case(0, Quantity::from(1))] // Default fallback for 0
1553    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1554    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1555    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1556    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1557    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1558    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1559    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1560    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1561    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1562        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1563    }
1564
1565    #[rstest]
1566    #[case(-1_500_000_000)] // Large negative value
1567    #[case(-1)] // Small negative value
1568    #[case(-999_999_999)] // Another negative value
1569    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1570        let result = decode_multiplier(raw);
1571        assert!(result.is_err());
1572        assert!(
1573            result
1574                .unwrap_err()
1575                .to_string()
1576                .contains("Invalid negative multiplier")
1577        );
1578    }
1579
1580    #[rstest]
1581    #[case(100, Quantity::from(100))]
1582    #[case(1000, Quantity::from(1000))]
1583    #[case(5, Quantity::from(5))]
1584    fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1585        assert_eq!(decode_quantity(value), expected);
1586    }
1587
1588    #[rstest]
1589    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1590    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1591    #[case(
1592        10_000_000_000,
1593        2,
1594        Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1595    )]
1596    fn test_decode_price_increment(
1597        #[case] value: i64,
1598        #[case] precision: u8,
1599        #[case] expected: Price,
1600    ) {
1601        assert_eq!(decode_price_increment(value, precision), expected);
1602    }
1603
1604    #[rstest]
1605    #[case(0, Quantity::from(1))] // Default for 0
1606    #[case(i32::MAX, Quantity::from(1))] // Default for MAX
1607    #[case(100, Quantity::from(100))]
1608    #[case(1, Quantity::from(1))]
1609    #[case(1000, Quantity::from(1000))]
1610    fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1611        assert_eq!(decode_lot_size(value), expected);
1612    }
1613
1614    #[rstest]
1615    #[case(0, None)] // None for 0
1616    #[case(1, Some(Ustr::from("Scheduled")))]
1617    #[case(2, Some(Ustr::from("Surveillance intervention")))]
1618    #[case(3, Some(Ustr::from("Market event")))]
1619    #[case(10, Some(Ustr::from("Regulatory")))]
1620    #[case(30, Some(Ustr::from("News pending")))]
1621    #[case(40, Some(Ustr::from("Order imbalance")))]
1622    #[case(50, Some(Ustr::from("LULD pause")))]
1623    #[case(60, Some(Ustr::from("Operational")))]
1624    #[case(100, Some(Ustr::from("Corporate action")))]
1625    #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1626    fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1627        assert_eq!(parse_status_reason(value).unwrap(), expected);
1628    }
1629
1630    #[rstest]
1631    #[case(999)] // Invalid code
1632    fn test_parse_status_reason_invalid(#[case] value: u16) {
1633        assert!(parse_status_reason(value).is_err());
1634    }
1635
1636    #[rstest]
1637    #[case(0, None)] // None for 0
1638    #[case(1, Some(Ustr::from("No cancel")))]
1639    #[case(2, Some(Ustr::from("Change trading session")))]
1640    #[case(3, Some(Ustr::from("Implied matching on")))]
1641    #[case(4, Some(Ustr::from("Implied matching off")))]
1642    fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1643        assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1644    }
1645
1646    #[rstest]
1647    #[case(5)] // Invalid code
1648    #[case(100)] // Invalid code
1649    fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1650        assert!(parse_status_trading_event(value).is_err());
1651    }
1652
1653    #[rstest]
1654    fn test_decode_mbo_msg() {
1655        let path = test_data_path().join("test_data.mbo.dbn.zst");
1656        let mut dbn_stream = Decoder::from_zstd_file(path)
1657            .unwrap()
1658            .decode_stream::<dbn::MboMsg>();
1659        let msg = dbn_stream.next().unwrap().unwrap();
1660
1661        let instrument_id = InstrumentId::from("ESM4.GLBX");
1662        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1663        let delta = delta.unwrap();
1664
1665        assert_eq!(delta.instrument_id, instrument_id);
1666        assert_eq!(delta.action, BookAction::Delete);
1667        assert_eq!(delta.order.side, OrderSide::Sell);
1668        assert_eq!(delta.order.price, Price::from("3722.75"));
1669        assert_eq!(delta.order.size, Quantity::from("1"));
1670        assert_eq!(delta.order.order_id, 647_784_973_705);
1671        assert_eq!(delta.flags, 128);
1672        assert_eq!(delta.sequence, 1_170_352);
1673        assert_eq!(delta.ts_event, msg.ts_recv);
1674        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1675        assert_eq!(delta.ts_init, 0);
1676    }
1677
1678    #[rstest]
1679    fn test_decode_mbo_msg_clear_action() {
1680        // Create an MBO message with Clear action (action='R', side='N')
1681        let ts_recv = 1_609_160_400_000_000_000;
1682        let msg = dbn::MboMsg {
1683            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1684            order_id: 0,
1685            price: i64::MAX,
1686            size: 0,
1687            flags: dbn::FlagSet::empty(),
1688            channel_id: 0,
1689            action: 'R' as c_char,
1690            side: 'N' as c_char, // NoOrderSide for Clear
1691            ts_recv,
1692            ts_in_delta: 0,
1693            sequence: 1_000_000,
1694        };
1695
1696        let instrument_id = InstrumentId::from("ESM4.GLBX");
1697        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1698
1699        // Clear messages should produce OrderBookDelta, not TradeTick
1700        assert!(trade.is_none());
1701        let delta = delta.expect("Clear action should produce OrderBookDelta");
1702
1703        assert_eq!(delta.instrument_id, instrument_id);
1704        assert_eq!(delta.action, BookAction::Clear);
1705        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1706        assert_eq!(delta.order.size, Quantity::from("0"));
1707        assert_eq!(delta.order.order_id, 0);
1708        assert_eq!(delta.sequence, 1_000_000);
1709        assert_eq!(delta.ts_event, ts_recv);
1710        assert_eq!(delta.ts_init, 0);
1711        assert!(delta.order.price.is_undefined());
1712        assert_eq!(delta.order.price.precision, 0);
1713    }
1714
1715    #[rstest]
1716    fn test_decode_mbo_msg_price_undef_with_precision() {
1717        // Test that PRICE_UNDEF (i64::MAX) forces precision to 0 even when price_precision is non-zero
1718        let ts_recv = 1_609_160_400_000_000_000;
1719        let msg = dbn::MboMsg {
1720            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1721            order_id: 0,
1722            price: i64::MAX, // PRICE_UNDEF
1723            size: 0,
1724            flags: dbn::FlagSet::empty(),
1725            channel_id: 0,
1726            action: 'R' as c_char, // Clear
1727            side: 'N' as c_char,   // NoOrderSide
1728            ts_recv,
1729            ts_in_delta: 0,
1730            sequence: 0,
1731        };
1732
1733        let instrument_id = InstrumentId::from("ESM4.GLBX");
1734        let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1735        let delta = delta.unwrap();
1736
1737        assert!(delta.order.price.is_undefined());
1738        assert_eq!(delta.order.price.precision, 0);
1739        assert_eq!(delta.order.price.raw, PRICE_UNDEF);
1740    }
1741
1742    #[rstest]
1743    fn test_decode_mbo_msg_no_order_side_update() {
1744        // MBO messages with NoOrderSide are now passed through to the book
1745        // The book will resolve the side from its cache using the order_id
1746        let ts_recv = 1_609_160_400_000_000_000;
1747        let msg = dbn::MboMsg {
1748            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
1749            order_id: 123_456_789,
1750            price: 4_800_250_000_000, // $4800.25 with precision 2
1751            size: 1,
1752            flags: dbn::FlagSet::empty(),
1753            channel_id: 1,
1754            action: 'M' as c_char, // Modify/Update action
1755            side: 'N' as c_char,   // NoOrderSide
1756            ts_recv,
1757            ts_in_delta: 0,
1758            sequence: 1_000_000,
1759        };
1760
1761        let instrument_id = InstrumentId::from("ESM4.GLBX");
1762        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1763
1764        // Delta should be created with NoOrderSide (book will resolve it)
1765        assert!(delta.is_some());
1766        assert!(trade.is_none());
1767        let delta = delta.unwrap();
1768        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
1769        assert_eq!(delta.order.order_id, 123_456_789);
1770        assert_eq!(delta.action, BookAction::Update);
1771    }
1772
1773    #[rstest]
1774    fn test_decode_mbp1_msg() {
1775        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1776        let mut dbn_stream = Decoder::from_zstd_file(path)
1777            .unwrap()
1778            .decode_stream::<dbn::Mbp1Msg>();
1779        let msg = dbn_stream.next().unwrap().unwrap();
1780
1781        let instrument_id = InstrumentId::from("ESM4.GLBX");
1782        let (maybe_quote, _) =
1783            decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1784        let quote = maybe_quote.expect("Expected valid quote");
1785
1786        assert_eq!(quote.instrument_id, instrument_id);
1787        assert_eq!(quote.bid_price, Price::from("3720.25"));
1788        assert_eq!(quote.ask_price, Price::from("3720.50"));
1789        assert_eq!(quote.bid_size, Quantity::from("24"));
1790        assert_eq!(quote.ask_size, Quantity::from("11"));
1791        assert_eq!(quote.ts_event, msg.ts_recv);
1792        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1793        assert_eq!(quote.ts_init, 0);
1794    }
1795
1796    #[rstest]
1797    fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
1798        let ts_recv = 1_609_160_400_000_000_000;
1799        let msg = dbn::Mbp1Msg {
1800            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1801            price: 3_720_250_000_000, // Valid trade price
1802            size: 5,
1803            action: 'A' as c_char,
1804            side: 'B' as c_char,
1805            flags: dbn::FlagSet::empty(),
1806            depth: 0,
1807            ts_recv,
1808            ts_in_delta: 0,
1809            sequence: 1_170_352,
1810            levels: [dbn::BidAskPair {
1811                bid_px: 3_720_250_000_000, // Valid bid price
1812                ask_px: i64::MAX,          // Undefined ask price
1813                bid_sz: 24,
1814                ask_sz: 0,
1815                bid_ct: 1,
1816                ask_ct: 0,
1817            }],
1818        };
1819
1820        let instrument_id = InstrumentId::from("ESM4.GLBX");
1821        let (maybe_quote, _) =
1822            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1823
1824        // Quote should be None because ask price is undefined
1825        assert!(maybe_quote.is_none());
1826    }
1827
1828    #[rstest]
1829    fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
1830        let ts_recv = 1_609_160_400_000_000_000;
1831        let msg = dbn::Mbp1Msg {
1832            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1833            price: 3_720_500_000_000, // Valid trade price
1834            size: 5,
1835            action: 'A' as c_char,
1836            side: 'A' as c_char,
1837            flags: dbn::FlagSet::empty(),
1838            depth: 0,
1839            ts_recv,
1840            ts_in_delta: 0,
1841            sequence: 1_170_352,
1842            levels: [dbn::BidAskPair {
1843                bid_px: i64::MAX,          // Undefined bid price
1844                ask_px: 3_720_500_000_000, // Valid ask price
1845                bid_sz: 0,
1846                ask_sz: 11,
1847                bid_ct: 0,
1848                ask_ct: 1,
1849            }],
1850        };
1851
1852        let instrument_id = InstrumentId::from("ESM4.GLBX");
1853        let (maybe_quote, _) =
1854            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
1855
1856        // Quote should be None because bid price is undefined
1857        assert!(maybe_quote.is_none());
1858    }
1859
1860    #[rstest]
1861    fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
1862        let ts_recv = 1_609_160_400_000_000_000;
1863        let msg = dbn::Mbp1Msg {
1864            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
1865            price: 3_720_250_000_000, // Valid trade price
1866            size: 5,
1867            action: 'T' as c_char, // Trade action
1868            side: 'A' as c_char,
1869            flags: dbn::FlagSet::empty(),
1870            depth: 0,
1871            ts_recv,
1872            ts_in_delta: 0,
1873            sequence: 1_170_352,
1874            levels: [dbn::BidAskPair {
1875                bid_px: i64::MAX, // Undefined bid
1876                ask_px: i64::MAX, // Undefined ask
1877                bid_sz: 0,
1878                ask_sz: 0,
1879                bid_ct: 0,
1880                ask_ct: 0,
1881            }],
1882        };
1883
1884        let instrument_id = InstrumentId::from("ESM4.GLBX");
1885        let (maybe_quote, maybe_trade) =
1886            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
1887
1888        // Quote should be None because both prices are undefined
1889        assert!(maybe_quote.is_none());
1890
1891        // Trade should still be present
1892        let trade = maybe_trade.expect("Expected trade");
1893        assert_eq!(trade.instrument_id, instrument_id);
1894        assert_eq!(trade.price, Price::from("3720.25"));
1895        assert_eq!(trade.size, Quantity::from("5"));
1896    }
1897
1898    #[rstest]
1899    fn test_decode_bbo_1s_msg() {
1900        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1901        let mut dbn_stream = Decoder::from_zstd_file(path)
1902            .unwrap()
1903            .decode_stream::<dbn::BboMsg>();
1904        let msg = dbn_stream.next().unwrap().unwrap();
1905
1906        let instrument_id = InstrumentId::from("ESM4.GLBX");
1907        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1908        let quote = maybe_quote.expect("Expected valid quote");
1909
1910        assert_eq!(quote.instrument_id, instrument_id);
1911        assert_eq!(quote.bid_price, Price::from("3702.25"));
1912        assert_eq!(quote.ask_price, Price::from("3702.75"));
1913        assert_eq!(quote.bid_size, Quantity::from("18"));
1914        assert_eq!(quote.ask_size, Quantity::from("13"));
1915        assert_eq!(quote.ts_event, msg.ts_recv);
1916        assert_eq!(quote.ts_event, 1609113600000000000);
1917        assert_eq!(quote.ts_init, 0);
1918    }
1919
1920    #[rstest]
1921    fn test_decode_bbo_1m_msg() {
1922        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1923        let mut dbn_stream = Decoder::from_zstd_file(path)
1924            .unwrap()
1925            .decode_stream::<dbn::BboMsg>();
1926        let msg = dbn_stream.next().unwrap().unwrap();
1927
1928        let instrument_id = InstrumentId::from("ESM4.GLBX");
1929        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1930        let quote = maybe_quote.expect("Expected valid quote");
1931
1932        assert_eq!(quote.instrument_id, instrument_id);
1933        assert_eq!(quote.bid_price, Price::from("3702.25"));
1934        assert_eq!(quote.ask_price, Price::from("3702.75"));
1935        assert_eq!(quote.bid_size, Quantity::from("18"));
1936        assert_eq!(quote.ask_size, Quantity::from("13"));
1937        assert_eq!(quote.ts_event, msg.ts_recv);
1938        assert_eq!(quote.ts_event, 1609113600000000000);
1939        assert_eq!(quote.ts_init, 0);
1940    }
1941
1942    #[rstest]
1943    fn test_decode_mbp10_msg() {
1944        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1945        let mut dbn_stream = Decoder::from_zstd_file(path)
1946            .unwrap()
1947            .decode_stream::<dbn::Mbp10Msg>();
1948        let msg = dbn_stream.next().unwrap().unwrap();
1949
1950        let instrument_id = InstrumentId::from("ESM4.GLBX");
1951        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1952
1953        assert_eq!(depth10.instrument_id, instrument_id);
1954        assert_eq!(depth10.bids.len(), 10);
1955        assert_eq!(depth10.asks.len(), 10);
1956        assert_eq!(depth10.bid_counts.len(), 10);
1957        assert_eq!(depth10.ask_counts.len(), 10);
1958        assert_eq!(depth10.flags, 128);
1959        assert_eq!(depth10.sequence, 1_170_352);
1960        assert_eq!(depth10.ts_event, msg.ts_recv);
1961        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1962        assert_eq!(depth10.ts_init, 0);
1963    }
1964
1965    #[rstest]
1966    fn test_decode_trade_msg() {
1967        let path = test_data_path().join("test_data.trades.dbn.zst");
1968        let mut dbn_stream = Decoder::from_zstd_file(path)
1969            .unwrap()
1970            .decode_stream::<dbn::TradeMsg>();
1971        let msg = dbn_stream.next().unwrap().unwrap();
1972
1973        let instrument_id = InstrumentId::from("ESM4.GLBX");
1974        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1975
1976        assert_eq!(trade.instrument_id, instrument_id);
1977        assert_eq!(trade.price, Price::from("3720.25"));
1978        assert_eq!(trade.size, Quantity::from("5"));
1979        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1980        assert_eq!(trade.trade_id.to_string(), "1170380");
1981        assert_eq!(trade.ts_event, msg.ts_recv);
1982        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1983        assert_eq!(trade.ts_init, 0);
1984    }
1985
1986    #[rstest]
1987    fn test_decode_tbbo_msg() {
1988        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1989        let mut dbn_stream = Decoder::from_zstd_file(path)
1990            .unwrap()
1991            .decode_stream::<dbn::Mbp1Msg>();
1992        let msg = dbn_stream.next().unwrap().unwrap();
1993
1994        let instrument_id = InstrumentId::from("ESM4.GLBX");
1995        let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1996        let quote = maybe_quote.expect("Expected valid quote");
1997
1998        assert_eq!(quote.instrument_id, instrument_id);
1999        assert_eq!(quote.bid_price, Price::from("3720.25"));
2000        assert_eq!(quote.ask_price, Price::from("3720.50"));
2001        assert_eq!(quote.bid_size, Quantity::from("26"));
2002        assert_eq!(quote.ask_size, Quantity::from("7"));
2003        assert_eq!(quote.ts_event, msg.ts_recv);
2004        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2005        assert_eq!(quote.ts_init, 0);
2006
2007        assert_eq!(trade.instrument_id, instrument_id);
2008        assert_eq!(trade.price, Price::from("3720.25"));
2009        assert_eq!(trade.size, Quantity::from("5"));
2010        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2011        assert_eq!(trade.trade_id.to_string(), "1170380");
2012        assert_eq!(trade.ts_event, msg.ts_recv);
2013        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2014        assert_eq!(trade.ts_init, 0);
2015    }
2016
2017    #[rstest]
2018    fn test_decode_ohlcv_msg() {
2019        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2020        let mut dbn_stream = Decoder::from_zstd_file(path)
2021            .unwrap()
2022            .decode_stream::<dbn::OhlcvMsg>();
2023        let msg = dbn_stream.next().unwrap().unwrap();
2024
2025        let instrument_id = InstrumentId::from("ESM4.GLBX");
2026        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2027
2028        assert_eq!(
2029            bar.bar_type,
2030            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2031        );
2032        assert_eq!(bar.open, Price::from("372025.00"));
2033        assert_eq!(bar.high, Price::from("372050.00"));
2034        assert_eq!(bar.low, Price::from("372025.00"));
2035        assert_eq!(bar.close, Price::from("372050.00"));
2036        assert_eq!(bar.volume, Quantity::from("57"));
2037        assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); // timestamp_on_close=true
2038        assert_eq!(bar.ts_init, 0); // ts_init was Some(0)
2039    }
2040
2041    #[rstest]
2042    fn test_decode_definition_msg() {
2043        let path = test_data_path().join("test_data.definition.dbn.zst");
2044        let mut dbn_stream = Decoder::from_zstd_file(path)
2045            .unwrap()
2046            .decode_stream::<dbn::InstrumentDefMsg>();
2047        let msg = dbn_stream.next().unwrap().unwrap();
2048
2049        let instrument_id = InstrumentId::from("ESM4.GLBX");
2050        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2051
2052        assert!(result.is_ok());
2053        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2054    }
2055
2056    #[rstest]
2057    fn test_decode_status_msg() {
2058        let path = test_data_path().join("test_data.status.dbn.zst");
2059        let mut dbn_stream = Decoder::from_zstd_file(path)
2060            .unwrap()
2061            .decode_stream::<dbn::StatusMsg>();
2062        let msg = dbn_stream.next().unwrap().unwrap();
2063
2064        let instrument_id = InstrumentId::from("ESM4.GLBX");
2065        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2066
2067        assert_eq!(status.instrument_id, instrument_id);
2068        assert_eq!(status.action, MarketStatusAction::Trading);
2069        assert_eq!(status.ts_event, msg.hd.ts_event);
2070        assert_eq!(status.ts_init, 0);
2071        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2072        assert_eq!(status.trading_event, None);
2073        assert_eq!(status.is_trading, Some(true));
2074        assert_eq!(status.is_quoting, Some(true));
2075        assert_eq!(status.is_short_sell_restricted, None);
2076    }
2077
2078    #[rstest]
2079    fn test_decode_imbalance_msg() {
2080        let path = test_data_path().join("test_data.imbalance.dbn.zst");
2081        let mut dbn_stream = Decoder::from_zstd_file(path)
2082            .unwrap()
2083            .decode_stream::<dbn::ImbalanceMsg>();
2084        let msg = dbn_stream.next().unwrap().unwrap();
2085
2086        let instrument_id = InstrumentId::from("ESM4.GLBX");
2087        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2088
2089        assert_eq!(imbalance.instrument_id, instrument_id);
2090        assert_eq!(imbalance.ref_price, Price::from("229.43"));
2091        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2092        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2093        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2094        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2095        assert_eq!(imbalance.side, OrderSide::Buy);
2096        assert_eq!(imbalance.significant_imbalance, 126);
2097        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2098        assert_eq!(imbalance.ts_recv, msg.ts_recv);
2099        assert_eq!(imbalance.ts_init, 0);
2100    }
2101
2102    #[rstest]
2103    fn test_decode_statistics_msg() {
2104        let path = test_data_path().join("test_data.statistics.dbn.zst");
2105        let mut dbn_stream = Decoder::from_zstd_file(path)
2106            .unwrap()
2107            .decode_stream::<dbn::StatMsg>();
2108        let msg = dbn_stream.next().unwrap().unwrap();
2109
2110        let instrument_id = InstrumentId::from("ESM4.GLBX");
2111        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2112
2113        assert_eq!(statistics.instrument_id, instrument_id);
2114        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2115        assert_eq!(
2116            statistics.update_action,
2117            DatabentoStatisticUpdateAction::Added
2118        );
2119        assert_eq!(statistics.price, Some(Price::from("100.00")));
2120        assert_eq!(statistics.quantity, None);
2121        assert_eq!(statistics.channel_id, 13);
2122        assert_eq!(statistics.stat_flags, 255);
2123        assert_eq!(statistics.sequence, 2);
2124        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2125        assert_eq!(statistics.ts_in_delta, 26961);
2126        assert_eq!(statistics.ts_event, msg.hd.ts_event);
2127        assert_eq!(statistics.ts_recv, msg.ts_recv);
2128        assert_eq!(statistics.ts_init, 0);
2129    }
2130
2131    #[rstest]
2132    fn test_decode_cmbp1_msg() {
2133        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2134        let mut dbn_stream = Decoder::from_zstd_file(path)
2135            .unwrap()
2136            .decode_stream::<dbn::Cmbp1Msg>();
2137        let msg = dbn_stream.next().unwrap().unwrap();
2138
2139        let instrument_id = InstrumentId::from("ESM4.GLBX");
2140        let (maybe_quote, trade) =
2141            decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2142        let quote = maybe_quote.expect("Expected valid quote");
2143
2144        assert_eq!(quote.instrument_id, instrument_id);
2145        assert!(quote.bid_price.raw > 0);
2146        assert!(quote.ask_price.raw > 0);
2147        assert!(quote.bid_size.raw > 0);
2148        assert!(quote.ask_size.raw > 0);
2149        assert_eq!(quote.ts_event, msg.ts_recv);
2150        assert_eq!(quote.ts_init, 0);
2151
2152        // Check if trade is present based on action
2153        if is_trade_msg(msg.action) {
2154            assert!(trade.is_some());
2155            let trade = trade.unwrap();
2156            assert_eq!(trade.instrument_id, instrument_id);
2157        } else {
2158            assert!(trade.is_none());
2159        }
2160    }
2161
2162    #[rstest]
2163    fn test_decode_cbbo_1s_msg() {
2164        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2165        let mut dbn_stream = Decoder::from_zstd_file(path)
2166            .unwrap()
2167            .decode_stream::<dbn::CbboMsg>();
2168        let msg = dbn_stream.next().unwrap().unwrap();
2169
2170        let instrument_id = InstrumentId::from("ESM4.GLBX");
2171        let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2172        let quote = maybe_quote.expect("Expected valid quote");
2173
2174        assert_eq!(quote.instrument_id, instrument_id);
2175        assert!(quote.bid_price.raw > 0);
2176        assert!(quote.ask_price.raw > 0);
2177        assert!(quote.bid_size.raw > 0);
2178        assert!(quote.ask_size.raw > 0);
2179        assert_eq!(quote.ts_event, msg.ts_recv);
2180        assert_eq!(quote.ts_init, 0);
2181    }
2182
2183    #[rstest]
2184    fn test_decode_mbp10_msg_with_all_levels() {
2185        let mut msg = dbn::Mbp10Msg::default();
2186        for i in 0..10 {
2187            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2188            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2189            msg.levels[i].bid_sz = 10 + i as u32;
2190            msg.levels[i].ask_sz = 10 + i as u32;
2191            msg.levels[i].bid_ct = 1 + i as u32;
2192            msg.levels[i].ask_ct = 1 + i as u32;
2193        }
2194        msg.ts_recv = 1_609_160_400_000_704_060;
2195
2196        let instrument_id = InstrumentId::from("TEST.VENUE");
2197        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2198
2199        assert!(result.is_ok());
2200        let depth = result.unwrap();
2201        assert_eq!(depth.bids.len(), 10);
2202        assert_eq!(depth.asks.len(), 10);
2203        assert_eq!(depth.bid_counts.len(), 10);
2204        assert_eq!(depth.ask_counts.len(), 10);
2205    }
2206
2207    #[rstest]
2208    fn test_array_conversion_error_handling() {
2209        let mut bids = Vec::new();
2210        let mut asks = Vec::new();
2211
2212        // Intentionally create fewer than DEPTH10_LEN elements
2213        for i in 0..5 {
2214            bids.push(BookOrder::new(
2215                OrderSide::Buy,
2216                Price::from(format!("{}.00", 100 - i)),
2217                Quantity::from(10),
2218                i as u64,
2219            ));
2220            asks.push(BookOrder::new(
2221                OrderSide::Sell,
2222                Price::from(format!("{}.00", 101 + i)),
2223                Quantity::from(10),
2224                i as u64,
2225            ));
2226        }
2227
2228        let result: Result<[BookOrder; DEPTH10_LEN], _> =
2229            bids.try_into().map_err(|v: Vec<BookOrder>| {
2230                anyhow::anyhow!(
2231                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2232                    v.len()
2233                )
2234            });
2235        assert!(result.is_err());
2236        assert!(
2237            result
2238                .unwrap_err()
2239                .to_string()
2240                .contains("Expected exactly 10 bid levels, received 5")
2241        );
2242    }
2243
2244    #[rstest]
2245    fn test_decode_tcbbo_msg() {
2246        // Use cbbo-1s as base since cbbo.dbn.zst was invalid
2247        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2248        let mut dbn_stream = Decoder::from_zstd_file(path)
2249            .unwrap()
2250            .decode_stream::<dbn::CbboMsg>();
2251        let msg = dbn_stream.next().unwrap().unwrap();
2252
2253        // Simulate TCBBO by adding trade data
2254        let mut tcbbo_msg = msg.clone();
2255        tcbbo_msg.price = 3702500000000;
2256        tcbbo_msg.size = 10;
2257
2258        let instrument_id = InstrumentId::from("ESM4.GLBX");
2259        let (maybe_quote, trade) =
2260            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2261        let quote = maybe_quote.expect("Expected valid quote");
2262
2263        assert_eq!(quote.instrument_id, instrument_id);
2264        assert!(quote.bid_price.raw > 0);
2265        assert!(quote.ask_price.raw > 0);
2266        assert!(quote.bid_size.raw > 0);
2267        assert!(quote.ask_size.raw > 0);
2268        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2269        assert_eq!(quote.ts_init, 0);
2270
2271        assert_eq!(trade.instrument_id, instrument_id);
2272        assert_eq!(trade.price, Price::from("3702.50"));
2273        assert_eq!(trade.size, Quantity::from(10));
2274        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2275        assert_eq!(trade.ts_init, 0);
2276    }
2277
2278    #[rstest]
2279    fn test_decode_bar_type() {
2280        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2281        let instrument_id = InstrumentId::from("ESM4.GLBX");
2282
2283        // Test 1-second bar
2284        msg.hd.rtype = 32;
2285        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2286        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2287
2288        // Test 1-minute bar
2289        msg.hd.rtype = 33;
2290        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2291        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2292
2293        // Test 1-hour bar
2294        msg.hd.rtype = 34;
2295        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2296        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2297
2298        // Test 1-day bar
2299        msg.hd.rtype = 35;
2300        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2301        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2302
2303        // Test unsupported rtype
2304        msg.hd.rtype = 99;
2305        let result = decode_bar_type(&msg, instrument_id);
2306        assert!(result.is_err());
2307    }
2308
2309    #[rstest]
2310    fn test_decode_ts_event_adjustment() {
2311        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2312
2313        // Test 1-second bar adjustment
2314        msg.hd.rtype = 32;
2315        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2316        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2317
2318        // Test 1-minute bar adjustment
2319        msg.hd.rtype = 33;
2320        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2321        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2322
2323        // Test 1-hour bar adjustment
2324        msg.hd.rtype = 34;
2325        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2326        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2327
2328        // Test 1-day bar adjustment
2329        msg.hd.rtype = 35;
2330        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2331        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2332
2333        // Test eod bar adjustment (same as 1d)
2334        msg.hd.rtype = 36;
2335        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2336        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2337
2338        // Test unsupported rtype
2339        msg.hd.rtype = 99;
2340        let result = decode_ts_event_adjustment(&msg);
2341        assert!(result.is_err());
2342    }
2343
2344    #[rstest]
2345    fn test_decode_record() {
2346        // Test with MBO message
2347        let path = test_data_path().join("test_data.mbo.dbn.zst");
2348        let decoder = Decoder::from_zstd_file(path).unwrap();
2349        let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2350        let msg = dbn_stream.next().unwrap().unwrap();
2351
2352        let record_ref = dbn::RecordRef::from(msg);
2353        let instrument_id = InstrumentId::from("ESM4.GLBX");
2354
2355        let (data1, data2) =
2356            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2357
2358        assert!(data1.is_some());
2359        assert!(data2.is_none());
2360
2361        // Test with Trade message
2362        let path = test_data_path().join("test_data.trades.dbn.zst");
2363        let decoder = Decoder::from_zstd_file(path).unwrap();
2364        let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2365        let msg = dbn_stream.next().unwrap().unwrap();
2366
2367        let record_ref = dbn::RecordRef::from(msg);
2368
2369        let (data1, data2) =
2370            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2371
2372        assert!(data1.is_some());
2373        assert!(data2.is_none());
2374        assert!(matches!(data1.unwrap(), Data::Trade(_)));
2375    }
2376}