nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp, ffi::c_char, num::NonZeroUsize};
17
18use databento::dbn::{self};
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
23        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24    },
25    enums::{
26        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
27        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
28    },
29    identifiers::{InstrumentId, TradeId},
30    instruments::{
31        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
32    },
33    types::{Currency, Price, Quantity, price::decode_raw_price_i64},
34};
35use ustr::Ustr;
36
37use super::{
38    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
39    types::{DatabentoImbalance, DatabentoStatistics},
40};
41
42const DATABENTO_FIXED_SCALAR: f64 = 1_000_000_000.0;
43
44// SAFETY: Known valid value
45const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
46
47const BAR_SPEC_1S: BarSpecification = BarSpecification {
48    step: STEP_ONE,
49    aggregation: BarAggregation::Second,
50    price_type: PriceType::Last,
51};
52const BAR_SPEC_1M: BarSpecification = BarSpecification {
53    step: STEP_ONE,
54    aggregation: BarAggregation::Minute,
55    price_type: PriceType::Last,
56};
57const BAR_SPEC_1H: BarSpecification = BarSpecification {
58    step: STEP_ONE,
59    aggregation: BarAggregation::Hour,
60    price_type: PriceType::Last,
61};
62const BAR_SPEC_1D: BarSpecification = BarSpecification {
63    step: STEP_ONE,
64    aggregation: BarAggregation::Day,
65    price_type: PriceType::Last,
66};
67
68const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
69const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
70const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
71const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
72
73#[must_use]
74pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
75    match c as u8 as char {
76        'Y' => Some(true),
77        'N' => Some(false),
78        _ => None,
79    }
80}
81
82#[must_use]
83pub const fn parse_order_side(c: c_char) -> OrderSide {
84    match c as u8 as char {
85        'A' => OrderSide::Sell,
86        'B' => OrderSide::Buy,
87        _ => OrderSide::NoOrderSide,
88    }
89}
90
91#[must_use]
92pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
93    match c as u8 as char {
94        'A' => AggressorSide::Seller,
95        'B' => AggressorSide::Buyer,
96        _ => AggressorSide::NoAggressor,
97    }
98}
99
100pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
101    match c as u8 as char {
102        'A' => Ok(BookAction::Add),
103        'C' => Ok(BookAction::Delete),
104        'F' => Ok(BookAction::Update),
105        'M' => Ok(BookAction::Update),
106        'R' => Ok(BookAction::Clear),
107        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
108    }
109}
110
111pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
112    match c as u8 as char {
113        'C' => Ok(OptionKind::Call),
114        'P' => Ok(OptionKind::Put),
115        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
116    }
117}
118
119fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
120    match value {
121        Ok(value) if !value.is_empty() => {
122            Currency::try_from_str(value).unwrap_or_else(Currency::USD)
123        }
124        Ok(_) => Currency::USD(),
125        Err(e) => {
126            log::error!("Error parsing currency: {e}");
127            Currency::USD()
128        }
129    }
130}
131
132pub fn parse_cfi_iso10926(
133    value: &str,
134) -> anyhow::Result<(Option<AssetClass>, Option<InstrumentClass>)> {
135    let chars: Vec<char> = value.chars().collect();
136    if chars.len() < 3 {
137        anyhow::bail!("Value string is too short");
138    }
139
140    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
141    let cfi_category = chars[0];
142    let cfi_group = chars[1];
143    let cfi_attribute1 = chars[2];
144    // let cfi_attribute2 = value[3];
145    // let cfi_attribute3 = value[4];
146    // let cfi_attribute4 = value[5];
147
148    let mut asset_class = match cfi_category {
149        'D' => Some(AssetClass::Debt),
150        'E' => Some(AssetClass::Equity),
151        'S' => None,
152        _ => None,
153    };
154
155    let instrument_class = match cfi_group {
156        'I' => Some(InstrumentClass::Future),
157        _ => None,
158    };
159
160    if cfi_attribute1 == 'I' {
161        asset_class = Some(AssetClass::Index);
162    }
163
164    Ok((asset_class, instrument_class))
165}
166
167// https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons
168pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
169    let value_str = match value {
170        0 => return Ok(None),
171        1 => "Scheduled",
172        2 => "Surveillance intervention",
173        3 => "Market event",
174        4 => "Instrument activation",
175        5 => "Instrument expiration",
176        6 => "Recovery in process",
177        10 => "Regulatory",
178        11 => "Administrative",
179        12 => "Non-compliance",
180        13 => "Filings not current",
181        14 => "SEC trading suspension",
182        15 => "New issue",
183        16 => "Issue available",
184        17 => "Issues reviewed",
185        18 => "Filing requirements satisfied",
186        30 => "News pending",
187        31 => "News released",
188        32 => "News and resumption times",
189        33 => "News not forthcoming",
190        40 => "Order imbalance",
191        50 => "LULD pause",
192        60 => "Operational",
193        70 => "Additional information requested",
194        80 => "Merger effective",
195        90 => "ETF",
196        100 => "Corporate action",
197        110 => "New Security offering",
198        120 => "Market wide halt level 1",
199        121 => "Market wide halt level 2",
200        122 => "Market wide halt level 3",
201        123 => "Market wide halt carryover",
202        124 => "Market wide halt resumption",
203        130 => "Quotation not available",
204        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
205    };
206
207    Ok(Some(Ustr::from(value_str)))
208}
209
210pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
211    let value_str = match value {
212        0 => return Ok(None),
213        1 => "No cancel",
214        2 => "Change trading session",
215        3 => "Implied matching on",
216        4 => "Implied matching off",
217        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
218    };
219
220    Ok(Some(Ustr::from(value_str)))
221}
222
223/// Decodes a price from the given value, expressed in units of 1e-9.
224#[must_use]
225pub fn decode_price(value: i64, precision: u8) -> Price {
226    Price::from_raw(decode_raw_price_i64(value), precision)
227}
228
229/// Decodes a quantity from the given value, expressed in standard whole-number units.
230#[must_use]
231pub fn decode_quantity(value: u64) -> Quantity {
232    Quantity::from(value)
233}
234
235/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
236#[must_use]
237pub fn decode_price_increment(value: i64, precision: u8) -> Price {
238    match value {
239        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
240        _ => decode_price(value, precision),
241    }
242}
243
244/// Decodes a price from the given optional value, expressed in units of 1e-9.
245#[must_use]
246pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
247    match value {
248        i64::MAX => None,
249        _ => Some(decode_price(value, precision)),
250    }
251}
252
253/// Decodes a quantity from the given optional value, expressed in standard whole-number units.
254#[must_use]
255pub fn decode_optional_quantity(value: i32) -> Option<Quantity> {
256    match value {
257        i32::MAX => None,
258        _ => Some(Quantity::from(value)),
259    }
260}
261
262/// Decodes a multiplier from the given value, expressed in units of 1e-9.
263#[must_use]
264pub fn decode_multiplier(value: i64) -> Quantity {
265    match value {
266        0 | i64::MAX => Quantity::from(1),
267        _ => Quantity::from(format!("{}", value as f64 / DATABENTO_FIXED_SCALAR)),
268    }
269}
270
271/// Decodes a lot size from the given value, expressed in standard whole-number units.
272#[must_use]
273pub fn decode_lot_size(value: i32) -> Quantity {
274    match value {
275        0 | i32::MAX => Quantity::from(1),
276        value => Quantity::from(value),
277    }
278}
279
280pub fn decode_equity_v1(
281    msg: &dbn::compat::InstrumentDefMsgV1,
282    instrument_id: InstrumentId,
283    ts_init: UnixNanos,
284) -> anyhow::Result<Equity> {
285    let currency = parse_currency_or_usd_default(msg.currency());
286    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
287    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
288    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
289
290    Equity::new_checked(
291        instrument_id,
292        instrument_id.symbol,
293        None, // No ISIN available yet
294        currency,
295        price_increment.precision,
296        price_increment,
297        Some(lot_size),
298        None, // TBD
299        None, // TBD
300        None, // TBD
301        None, // TBD
302        None, // TBD
303        None, // TBD
304        None, // TBD
305        None, // TBD
306        ts_event,
307        ts_init,
308    )
309}
310
311pub fn decode_futures_contract_v1(
312    msg: &dbn::compat::InstrumentDefMsgV1,
313    instrument_id: InstrumentId,
314    ts_init: UnixNanos,
315) -> anyhow::Result<FuturesContract> {
316    let currency = parse_currency_or_usd_default(msg.currency());
317    let exchange = Ustr::from(msg.exchange()?);
318    let underlying = Ustr::from(msg.asset()?);
319    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
320    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
321    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
322    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
323    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
324
325    FuturesContract::new_checked(
326        instrument_id,
327        instrument_id.symbol,
328        asset_class.unwrap_or(AssetClass::Commodity),
329        Some(exchange),
330        underlying,
331        msg.activation.into(),
332        msg.expiration.into(),
333        currency,
334        price_increment.precision,
335        price_increment,
336        multiplier,
337        lot_size,
338        None, // TBD
339        None, // TBD
340        None, // TBD
341        None, // TBD
342        None, // TBD
343        None, // TBD
344        None, // TBD
345        None, // TBD
346        ts_event,
347        ts_init,
348    )
349}
350
351pub fn decode_futures_spread_v1(
352    msg: &dbn::compat::InstrumentDefMsgV1,
353    instrument_id: InstrumentId,
354    ts_init: UnixNanos,
355) -> anyhow::Result<FuturesSpread> {
356    let exchange = Ustr::from(msg.exchange()?);
357    let underlying = Ustr::from(msg.asset()?);
358    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
359    let strategy_type = Ustr::from(msg.secsubtype()?);
360    let currency = parse_currency_or_usd_default(msg.currency());
361    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
362    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
363    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
364    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
365
366    FuturesSpread::new_checked(
367        instrument_id,
368        instrument_id.symbol,
369        asset_class.unwrap_or(AssetClass::Commodity),
370        Some(exchange),
371        underlying,
372        strategy_type,
373        msg.activation.into(),
374        msg.expiration.into(),
375        currency,
376        price_increment.precision,
377        price_increment,
378        multiplier,
379        lot_size,
380        None, // TBD
381        None, // TBD
382        None, // TBD
383        None, // TBD
384        None, // TBD
385        None, // TBD
386        None, // TBD
387        None, // TBD
388        ts_event,
389        ts_init,
390    )
391}
392
393pub fn decode_option_contract_v1(
394    msg: &dbn::compat::InstrumentDefMsgV1,
395    instrument_id: InstrumentId,
396    ts_init: UnixNanos,
397) -> anyhow::Result<OptionContract> {
398    let currency = parse_currency_or_usd_default(msg.currency());
399    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
400    let exchange = Ustr::from(msg.exchange()?);
401    let underlying = Ustr::from(msg.underlying()?);
402    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
403        Some(AssetClass::Equity)
404    } else {
405        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
406        asset_class
407    };
408    let option_kind = parse_option_kind(msg.instrument_class)?;
409    let strike_price = Price::from_raw(
410        decode_raw_price_i64(msg.strike_price),
411        strike_price_currency.precision,
412    );
413    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
414    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
415    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
416    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
417
418    OptionContract::new_checked(
419        instrument_id,
420        instrument_id.symbol,
421        asset_class_opt.unwrap_or(AssetClass::Commodity),
422        Some(exchange),
423        underlying,
424        option_kind,
425        strike_price,
426        currency,
427        msg.activation.into(),
428        msg.expiration.into(),
429        price_increment.precision,
430        price_increment,
431        multiplier,
432        lot_size,
433        None, // TBD
434        None, // TBD
435        None, // TBD
436        None, // TBD
437        None, // TBD
438        None, // TBD
439        None, // TBD
440        None, // TBD
441        ts_event,
442        ts_init,
443    )
444}
445
446pub fn decode_option_spread_v1(
447    msg: &dbn::compat::InstrumentDefMsgV1,
448    instrument_id: InstrumentId,
449    ts_init: UnixNanos,
450) -> anyhow::Result<OptionSpread> {
451    let currency = parse_currency_or_usd_default(msg.currency());
452    let exchange = Ustr::from(msg.exchange()?);
453    let underlying = Ustr::from(msg.underlying()?);
454    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
455        Some(AssetClass::Equity)
456    } else {
457        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
458        asset_class
459    };
460    let strategy_type = Ustr::from(msg.secsubtype()?);
461    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
462    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
463    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
464    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
465
466    OptionSpread::new_checked(
467        instrument_id,
468        instrument_id.symbol,
469        asset_class_opt.unwrap_or(AssetClass::Commodity),
470        Some(exchange),
471        underlying,
472        strategy_type,
473        msg.activation.into(),
474        msg.expiration.into(),
475        currency,
476        price_increment.precision,
477        price_increment,
478        multiplier,
479        lot_size,
480        None, // TBD
481        None, // TBD
482        None, // TBD
483        None, // TBD
484        None, // TBD
485        None, // TBD
486        None, // TBD
487        None, // TBD
488        ts_event,
489        ts_init,
490    )
491}
492
493#[must_use]
494fn is_trade_msg(order_side: OrderSide, action: c_char) -> bool {
495    order_side == OrderSide::NoOrderSide || action as u8 as char == 'T'
496}
497
498pub fn decode_mbo_msg(
499    msg: &dbn::MboMsg,
500    instrument_id: InstrumentId,
501    price_precision: u8,
502    ts_init: UnixNanos,
503    include_trades: bool,
504) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
505    let side = parse_order_side(msg.side);
506    if is_trade_msg(side, msg.action) {
507        if include_trades {
508            let trade = TradeTick::new(
509                instrument_id,
510                Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
511                Quantity::from(msg.size),
512                parse_aggressor_side(msg.side),
513                TradeId::new(itoa::Buffer::new().format(msg.sequence)),
514                msg.ts_recv.into(),
515                ts_init,
516            );
517            return Ok((None, Some(trade)));
518        }
519
520        return Ok((None, None));
521    }
522
523    let order = BookOrder::new(
524        side,
525        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
526        Quantity::from(msg.size),
527        msg.order_id,
528    );
529
530    let delta = OrderBookDelta::new(
531        instrument_id,
532        parse_book_action(msg.action)?,
533        order,
534        msg.flags.raw(),
535        msg.sequence.into(),
536        msg.ts_recv.into(),
537        ts_init,
538    );
539
540    Ok((Some(delta), None))
541}
542
543pub fn decode_trade_msg(
544    msg: &dbn::TradeMsg,
545    instrument_id: InstrumentId,
546    price_precision: u8,
547    ts_init: UnixNanos,
548) -> anyhow::Result<TradeTick> {
549    let trade = TradeTick::new(
550        instrument_id,
551        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
552        Quantity::from(msg.size),
553        parse_aggressor_side(msg.side),
554        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
555        msg.ts_recv.into(),
556        ts_init,
557    );
558
559    Ok(trade)
560}
561
562pub fn decode_tbbo_msg(
563    msg: &dbn::TbboMsg,
564    instrument_id: InstrumentId,
565    price_precision: u8,
566    ts_init: UnixNanos,
567) -> anyhow::Result<(QuoteTick, TradeTick)> {
568    let top_level = &msg.levels[0];
569    let quote = QuoteTick::new(
570        instrument_id,
571        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
572        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
573        Quantity::from(top_level.bid_sz),
574        Quantity::from(top_level.ask_sz),
575        msg.ts_recv.into(),
576        ts_init,
577    );
578
579    let trade = TradeTick::new(
580        instrument_id,
581        Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
582        Quantity::from(msg.size),
583        parse_aggressor_side(msg.side),
584        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
585        msg.ts_recv.into(),
586        ts_init,
587    );
588
589    Ok((quote, trade))
590}
591
592pub fn decode_mbp1_msg(
593    msg: &dbn::Mbp1Msg,
594    instrument_id: InstrumentId,
595    price_precision: u8,
596    ts_init: UnixNanos,
597    include_trades: bool,
598) -> anyhow::Result<(QuoteTick, Option<TradeTick>)> {
599    let top_level = &msg.levels[0];
600    let quote = QuoteTick::new(
601        instrument_id,
602        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
603        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
604        Quantity::from(top_level.bid_sz),
605        Quantity::from(top_level.ask_sz),
606        msg.ts_recv.into(),
607        ts_init,
608    );
609
610    let maybe_trade = if include_trades && msg.action as u8 as char == 'T' {
611        Some(TradeTick::new(
612            instrument_id,
613            Price::from_raw(decode_raw_price_i64(msg.price), price_precision),
614            Quantity::from(msg.size),
615            parse_aggressor_side(msg.side),
616            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
617            msg.ts_recv.into(),
618            ts_init,
619        ))
620    } else {
621        None
622    };
623
624    Ok((quote, maybe_trade))
625}
626
627pub fn decode_bbo_msg(
628    msg: &dbn::BboMsg,
629    instrument_id: InstrumentId,
630    price_precision: u8,
631    ts_init: UnixNanos,
632) -> anyhow::Result<QuoteTick> {
633    let top_level = &msg.levels[0];
634    let quote = QuoteTick::new(
635        instrument_id,
636        Price::from_raw(decode_raw_price_i64(top_level.bid_px), price_precision),
637        Price::from_raw(decode_raw_price_i64(top_level.ask_px), price_precision),
638        Quantity::from(top_level.bid_sz),
639        Quantity::from(top_level.ask_sz),
640        msg.ts_recv.into(),
641        ts_init,
642    );
643
644    Ok(quote)
645}
646
647pub fn decode_mbp10_msg(
648    msg: &dbn::Mbp10Msg,
649    instrument_id: InstrumentId,
650    price_precision: u8,
651    ts_init: UnixNanos,
652) -> anyhow::Result<OrderBookDepth10> {
653    let mut bids = Vec::with_capacity(DEPTH10_LEN);
654    let mut asks = Vec::with_capacity(DEPTH10_LEN);
655    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
656    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
657
658    for level in &msg.levels {
659        let bid_order = BookOrder::new(
660            OrderSide::Buy,
661            Price::from_raw(decode_raw_price_i64(level.bid_px), price_precision),
662            Quantity::from(level.bid_sz),
663            0,
664        );
665
666        let ask_order = BookOrder::new(
667            OrderSide::Sell,
668            Price::from_raw(decode_raw_price_i64(level.ask_px), price_precision),
669            Quantity::from(level.ask_sz),
670            0,
671        );
672
673        bids.push(bid_order);
674        asks.push(ask_order);
675        bid_counts.push(level.bid_ct);
676        ask_counts.push(level.ask_ct);
677    }
678
679    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().expect("`bids` length != 10");
680    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().expect("`asks` length != 10");
681    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().expect("`bid_counts` length != 10");
682    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().expect("`ask_counts` length != 10");
683
684    let depth = OrderBookDepth10::new(
685        instrument_id,
686        bids,
687        asks,
688        bid_counts,
689        ask_counts,
690        msg.flags.raw(),
691        msg.sequence.into(),
692        msg.ts_recv.into(),
693        ts_init,
694    );
695
696    Ok(depth)
697}
698
699pub fn decode_bar_type(
700    msg: &dbn::OhlcvMsg,
701    instrument_id: InstrumentId,
702) -> anyhow::Result<BarType> {
703    let bar_type = match msg.hd.rtype {
704        32 => {
705            // ohlcv-1s
706            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
707        }
708        33 => {
709            //  ohlcv-1m
710            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
711        }
712        34 => {
713            // ohlcv-1h
714            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
715        }
716        35 => {
717            // ohlcv-1d
718            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
719        }
720        _ => anyhow::bail!(
721            "`rtype` is not a supported bar aggregation, was {}",
722            msg.hd.rtype
723        ),
724    };
725
726    Ok(bar_type)
727}
728
729pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
730    let adjustment = match msg.hd.rtype {
731        32 => {
732            // ohlcv-1s
733            BAR_CLOSE_ADJUSTMENT_1S
734        }
735        33 => {
736            //  ohlcv-1m
737            BAR_CLOSE_ADJUSTMENT_1M
738        }
739        34 => {
740            //  ohlcv-1h
741            BAR_CLOSE_ADJUSTMENT_1H
742        }
743        35 => {
744            // ohlcv-1d
745            BAR_CLOSE_ADJUSTMENT_1D
746        }
747        _ => anyhow::bail!(
748            "`rtype` is not a supported bar aggregation, was {}",
749            msg.hd.rtype
750        ),
751    };
752
753    Ok(adjustment.into())
754}
755
756pub fn decode_ohlcv_msg(
757    msg: &dbn::OhlcvMsg,
758    instrument_id: InstrumentId,
759    price_precision: u8,
760    ts_init: UnixNanos,
761) -> anyhow::Result<Bar> {
762    let bar_type = decode_bar_type(msg, instrument_id)?;
763    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
764
765    // Adjust `ts_event` from open to close of bar
766    let ts_event = UnixNanos::from(msg.hd.ts_event);
767    let ts_init = cmp::max(ts_init, ts_event) + ts_event_adjustment;
768
769    let bar = Bar::new(
770        bar_type,
771        Price::from_raw(decode_raw_price_i64(msg.open), price_precision),
772        Price::from_raw(decode_raw_price_i64(msg.high), price_precision),
773        Price::from_raw(decode_raw_price_i64(msg.low), price_precision),
774        Price::from_raw(decode_raw_price_i64(msg.close), price_precision),
775        Quantity::from(msg.volume),
776        ts_event,
777        ts_init,
778    );
779
780    Ok(bar)
781}
782
783pub fn decode_status_msg(
784    msg: &dbn::StatusMsg,
785    instrument_id: InstrumentId,
786    ts_init: UnixNanos,
787) -> anyhow::Result<InstrumentStatus> {
788    let status = InstrumentStatus::new(
789        instrument_id,
790        MarketStatusAction::from_u16(msg.action).expect("Invalid `MarketStatusAction`"),
791        msg.hd.ts_event.into(),
792        ts_init,
793        parse_status_reason(msg.reason)?,
794        parse_status_trading_event(msg.trading_event)?,
795        parse_optional_bool(msg.is_trading),
796        parse_optional_bool(msg.is_quoting),
797        parse_optional_bool(msg.is_short_sell_restricted),
798    );
799
800    Ok(status)
801}
802
803pub fn decode_record(
804    record: &dbn::RecordRef,
805    instrument_id: InstrumentId,
806    price_precision: u8,
807    ts_init: Option<UnixNanos>,
808    include_trades: bool,
809) -> anyhow::Result<(Option<Data>, Option<Data>)> {
810    // We don't handle `TbboMsg` here as Nautilus separates this schema
811    // into quotes and trades when loading, and the live client will
812    // never subscribe to `tbbo`.
813    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
814        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
815        let result = decode_mbo_msg(msg, instrument_id, price_precision, ts_init, include_trades)?;
816        match result {
817            (Some(delta), None) => (Some(Data::Delta(delta)), None),
818            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
819            (None, None) => (None, None),
820            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
821        }
822    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
823        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
824        let trade = decode_trade_msg(msg, instrument_id, price_precision, ts_init)?;
825        (Some(Data::Trade(trade)), None)
826    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
827        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
828        let result = decode_mbp1_msg(msg, instrument_id, price_precision, ts_init, include_trades)?;
829        match result {
830            (quote, None) => (Some(Data::Quote(quote)), None),
831            (quote, Some(trade)) => (Some(Data::Quote(quote)), Some(Data::Trade(trade))),
832        }
833    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
834        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
835        let quote = decode_bbo_msg(msg, instrument_id, price_precision, ts_init)?;
836        (Some(Data::Quote(quote)), None)
837    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
838        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
839        let quote = decode_bbo_msg(msg, instrument_id, price_precision, ts_init)?;
840        (Some(Data::Quote(quote)), None)
841    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
842        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
843        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, ts_init)?;
844        (Some(Data::from(depth)), None)
845    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
846        let ts_init = determine_timestamp(ts_init, msg.hd.ts_event.into());
847        let bar = decode_ohlcv_msg(msg, instrument_id, price_precision, ts_init)?;
848        (Some(Data::Bar(bar)), None)
849    } else {
850        anyhow::bail!("DBN message type is not currently supported")
851    };
852
853    Ok(result)
854}
855
856const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
857    match ts_init {
858        Some(ts_init) => ts_init,
859        None => msg_timestamp,
860    }
861}
862
863pub fn decode_instrument_def_msg_v1(
864    msg: &dbn::compat::InstrumentDefMsgV1,
865    instrument_id: InstrumentId,
866    ts_init: UnixNanos,
867) -> anyhow::Result<InstrumentAny> {
868    match msg.instrument_class as u8 as char {
869        'K' => Ok(InstrumentAny::Equity(decode_equity_v1(
870            msg,
871            instrument_id,
872            ts_init,
873        )?)),
874        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract_v1(
875            msg,
876            instrument_id,
877            ts_init,
878        )?)),
879        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread_v1(
880            msg,
881            instrument_id,
882            ts_init,
883        )?)),
884        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract_v1(
885            msg,
886            instrument_id,
887            ts_init,
888        )?)),
889        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread_v1(
890            msg,
891            instrument_id,
892            ts_init,
893        )?)),
894        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
895        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
896        _ => anyhow::bail!(
897            "Unsupported `instrument_class` '{}'",
898            msg.instrument_class as u8 as char
899        ),
900    }
901}
902
903pub fn decode_instrument_def_msg(
904    msg: &dbn::InstrumentDefMsg,
905    instrument_id: InstrumentId,
906    ts_init: UnixNanos,
907) -> anyhow::Result<InstrumentAny> {
908    match msg.instrument_class as u8 as char {
909        'K' => Ok(InstrumentAny::Equity(decode_equity(
910            msg,
911            instrument_id,
912            ts_init,
913        )?)),
914        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
915            msg,
916            instrument_id,
917            ts_init,
918        )?)),
919        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
920            msg,
921            instrument_id,
922            ts_init,
923        )?)),
924        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
925            msg,
926            instrument_id,
927            ts_init,
928        )?)),
929        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
930            msg,
931            instrument_id,
932            ts_init,
933        )?)),
934        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
935        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
936        _ => anyhow::bail!(
937            "Unsupported `instrument_class` '{}'",
938            msg.instrument_class as u8 as char
939        ),
940    }
941}
942
943pub fn decode_equity(
944    msg: &dbn::InstrumentDefMsg,
945    instrument_id: InstrumentId,
946    ts_init: UnixNanos,
947) -> anyhow::Result<Equity> {
948    let currency = parse_currency_or_usd_default(msg.currency());
949    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
950    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
951    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
952
953    Ok(Equity::new(
954        instrument_id,
955        instrument_id.symbol,
956        None, // No ISIN available yet
957        currency,
958        price_increment.precision,
959        price_increment,
960        Some(lot_size),
961        None, // TBD
962        None, // TBD
963        None, // TBD
964        None, // TBD
965        None, // TBD
966        None, // TBD
967        None, // TBD
968        None, // TBD
969        ts_event,
970        ts_init,
971    ))
972}
973
974pub fn decode_futures_contract(
975    msg: &dbn::InstrumentDefMsg,
976    instrument_id: InstrumentId,
977    ts_init: UnixNanos,
978) -> anyhow::Result<FuturesContract> {
979    let currency = parse_currency_or_usd_default(msg.currency());
980    let exchange = Ustr::from(msg.exchange()?);
981    let underlying = Ustr::from(msg.asset()?);
982    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
983    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
984    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
985    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
986    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
987
988    FuturesContract::new_checked(
989        instrument_id,
990        instrument_id.symbol,
991        asset_class.unwrap_or(AssetClass::Commodity),
992        Some(exchange),
993        underlying,
994        msg.activation.into(),
995        msg.expiration.into(),
996        currency,
997        price_increment.precision,
998        price_increment,
999        multiplier,
1000        lot_size,
1001        None, // TBD
1002        None, // TBD
1003        None, // TBD
1004        None, // TBD
1005        None, // TBD
1006        None, // TBD
1007        None, // TBD
1008        None, // TBD
1009        ts_event,
1010        ts_init,
1011    )
1012}
1013
1014pub fn decode_futures_spread(
1015    msg: &dbn::InstrumentDefMsg,
1016    instrument_id: InstrumentId,
1017    ts_init: UnixNanos,
1018) -> anyhow::Result<FuturesSpread> {
1019    let exchange = Ustr::from(msg.exchange()?);
1020    let underlying = Ustr::from(msg.asset()?);
1021    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1022    let strategy_type = Ustr::from(msg.secsubtype()?);
1023    let currency = parse_currency_or_usd_default(msg.currency());
1024    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1025    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1026    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1027    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1028
1029    FuturesSpread::new_checked(
1030        instrument_id,
1031        instrument_id.symbol,
1032        asset_class.unwrap_or(AssetClass::Commodity),
1033        Some(exchange),
1034        underlying,
1035        strategy_type,
1036        msg.activation.into(),
1037        msg.expiration.into(),
1038        currency,
1039        price_increment.precision,
1040        price_increment,
1041        multiplier,
1042        lot_size,
1043        None, // TBD
1044        None, // TBD
1045        None, // TBD
1046        None, // TBD
1047        None, // TBD
1048        None, // TBD
1049        None, // TBD
1050        None, // TBD
1051        ts_event,
1052        ts_init,
1053    )
1054}
1055
1056pub fn decode_option_contract(
1057    msg: &dbn::InstrumentDefMsg,
1058    instrument_id: InstrumentId,
1059    ts_init: UnixNanos,
1060) -> anyhow::Result<OptionContract> {
1061    let currency = parse_currency_or_usd_default(msg.currency());
1062    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1063    let exchange = Ustr::from(msg.exchange()?);
1064    let underlying = Ustr::from(msg.underlying()?);
1065    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1066        Some(AssetClass::Equity)
1067    } else {
1068        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1069        asset_class
1070    };
1071    let option_kind = parse_option_kind(msg.instrument_class)?;
1072    let strike_price = Price::from_raw(
1073        decode_raw_price_i64(msg.strike_price),
1074        strike_price_currency.precision,
1075    );
1076    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1077    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1078    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1079    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1080
1081    OptionContract::new_checked(
1082        instrument_id,
1083        instrument_id.symbol,
1084        asset_class_opt.unwrap_or(AssetClass::Commodity),
1085        Some(exchange),
1086        underlying,
1087        option_kind,
1088        strike_price,
1089        currency,
1090        msg.activation.into(),
1091        msg.expiration.into(),
1092        price_increment.precision,
1093        price_increment,
1094        multiplier,
1095        lot_size,
1096        None, // TBD
1097        None, // TBD
1098        None, // TBD
1099        None, // TBD
1100        None, // TBD
1101        None, // TBD
1102        None, // TBD
1103        None, // TBD
1104        ts_event,
1105        ts_init,
1106    )
1107}
1108
1109pub fn decode_option_spread(
1110    msg: &dbn::InstrumentDefMsg,
1111    instrument_id: InstrumentId,
1112    ts_init: UnixNanos,
1113) -> anyhow::Result<OptionSpread> {
1114    let exchange = Ustr::from(msg.exchange()?);
1115    let underlying = Ustr::from(msg.underlying()?);
1116    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1117        Some(AssetClass::Equity)
1118    } else {
1119        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?)?;
1120        asset_class
1121    };
1122    let strategy_type = Ustr::from(msg.secsubtype()?);
1123    let currency = parse_currency_or_usd_default(msg.currency());
1124    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1125    let multiplier = decode_multiplier(msg.unit_of_measure_qty);
1126    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1127    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1128
1129    OptionSpread::new_checked(
1130        instrument_id,
1131        instrument_id.symbol,
1132        asset_class_opt.unwrap_or(AssetClass::Commodity),
1133        Some(exchange),
1134        underlying,
1135        strategy_type,
1136        msg.activation.into(),
1137        msg.expiration.into(),
1138        currency,
1139        price_increment.precision,
1140        price_increment,
1141        multiplier,
1142        lot_size,
1143        None, // TBD
1144        None, // TBD
1145        None, // TBD
1146        None, // TBD
1147        None, // TBD
1148        None, // TBD
1149        None, // TBD
1150        None, // TBD
1151        ts_event,
1152        ts_init,
1153    )
1154}
1155
1156pub fn decode_imbalance_msg(
1157    msg: &dbn::ImbalanceMsg,
1158    instrument_id: InstrumentId,
1159    price_precision: u8,
1160    ts_init: UnixNanos,
1161) -> anyhow::Result<DatabentoImbalance> {
1162    DatabentoImbalance::new(
1163        instrument_id,
1164        Price::from_raw(decode_raw_price_i64(msg.ref_price), price_precision),
1165        Price::from_raw(
1166            decode_raw_price_i64(msg.cont_book_clr_price),
1167            price_precision,
1168        ),
1169        Price::from_raw(
1170            decode_raw_price_i64(msg.auct_interest_clr_price),
1171            price_precision,
1172        ),
1173        Quantity::new(f64::from(msg.paired_qty), 0),
1174        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1175        parse_order_side(msg.side),
1176        msg.significant_imbalance as c_char,
1177        msg.hd.ts_event.into(),
1178        msg.ts_recv.into(),
1179        ts_init,
1180    )
1181}
1182
1183pub fn decode_statistics_msg(
1184    msg: &dbn::StatMsg,
1185    instrument_id: InstrumentId,
1186    price_precision: u8,
1187    ts_init: UnixNanos,
1188) -> anyhow::Result<DatabentoStatistics> {
1189    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1190        .expect("Invalid value for `stat_type`");
1191    let update_action = DatabentoStatisticUpdateAction::from_u8(msg.update_action)
1192        .expect("Invalid value for `update_action`");
1193
1194    DatabentoStatistics::new(
1195        instrument_id,
1196        stat_type,
1197        update_action,
1198        decode_optional_price(msg.price, price_precision),
1199        decode_optional_quantity(msg.quantity),
1200        msg.channel_id,
1201        msg.stat_flags,
1202        msg.sequence,
1203        msg.ts_ref.into(),
1204        msg.ts_in_delta,
1205        msg.hd.ts_event.into(),
1206        msg.ts_recv.into(),
1207        ts_init,
1208    )
1209}
1210
1211////////////////////////////////////////////////////////////////////////////////
1212// Tests
1213////////////////////////////////////////////////////////////////////////////////
1214#[cfg(test)]
1215mod tests {
1216    use std::path::{Path, PathBuf};
1217
1218    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1219    use fallible_streaming_iterator::FallibleStreamingIterator;
1220    use rstest::*;
1221
1222    use super::*;
1223
1224    fn test_data_path() -> PathBuf {
1225        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1226    }
1227
1228    #[rstest]
1229    #[case('Y' as c_char, Some(true))]
1230    #[case('N' as c_char, Some(false))]
1231    #[case('X' as c_char, None)]
1232    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1233        assert_eq!(parse_optional_bool(input), expected);
1234    }
1235
1236    #[rstest]
1237    #[case('A' as c_char, OrderSide::Sell)]
1238    #[case('B' as c_char, OrderSide::Buy)]
1239    #[case('X' as c_char, OrderSide::NoOrderSide)]
1240    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1241        assert_eq!(parse_order_side(input), expected);
1242    }
1243
1244    #[rstest]
1245    #[case('A' as c_char, AggressorSide::Seller)]
1246    #[case('B' as c_char, AggressorSide::Buyer)]
1247    #[case('X' as c_char, AggressorSide::NoAggressor)]
1248    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1249        assert_eq!(parse_aggressor_side(input), expected);
1250    }
1251
1252    #[rstest]
1253    #[case('A' as c_char, Ok(BookAction::Add))]
1254    #[case('C' as c_char, Ok(BookAction::Delete))]
1255    #[case('F' as c_char, Ok(BookAction::Update))]
1256    #[case('M' as c_char, Ok(BookAction::Update))]
1257    #[case('R' as c_char, Ok(BookAction::Clear))]
1258    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1259    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1260        match parse_book_action(input) {
1261            Ok(action) => assert_eq!(Ok(action), expected),
1262            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1263        }
1264    }
1265
1266    #[rstest]
1267    #[case('C' as c_char, Ok(OptionKind::Call))]
1268    #[case('P' as c_char, Ok(OptionKind::Put))]
1269    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1270    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1271        match parse_option_kind(input) {
1272            Ok(kind) => assert_eq!(Ok(kind), expected),
1273            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1274        }
1275    }
1276
1277    #[rstest]
1278    #[case(Ok("USD"), Currency::USD())]
1279    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1280    #[case(Ok(""), Currency::USD())]
1281    #[case(Err("Error"), Currency::USD())]
1282    fn test_parse_currency_or_usd_default(
1283        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1284        #[case] expected: Currency,
1285    ) {
1286        let actual = parse_currency_or_usd_default(
1287            input.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
1288        );
1289        assert_eq!(actual, expected);
1290    }
1291
1292    #[rstest]
1293    #[case("DII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1294    #[case("EII", Ok((Some(AssetClass::Index), Some(InstrumentClass::Future))))]
1295    #[case("EIA", Ok((Some(AssetClass::Equity), Some(InstrumentClass::Future))))]
1296    #[case("XXX", Ok((None, None)))]
1297    #[case("D", Err("Value string is too short"))]
1298    fn test_parse_cfi_iso10926(
1299        #[case] input: &str,
1300        #[case] expected: Result<(Option<AssetClass>, Option<InstrumentClass>), &'static str>,
1301    ) {
1302        match parse_cfi_iso10926(input) {
1303            Ok(result) => assert_eq!(Ok(result), expected),
1304            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1305        }
1306    }
1307
1308    #[rstest]
1309    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1310    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1311    #[case(1000000, 2, Price::from_raw(decode_raw_price_i64(1000000), 2))] // Arbitrary valid price
1312    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1313        let actual = decode_price_increment(value, precision);
1314        assert_eq!(actual, expected);
1315    }
1316
1317    #[rstest]
1318    #[case(i64::MAX, 2, None)] // None for i64::MAX
1319    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1320    #[case(1000000, 2, Some(Price::from_raw(decode_raw_price_i64(1000000), 2)))] // Arbitrary valid price
1321    fn test_decode_optional_price(
1322        #[case] value: i64,
1323        #[case] precision: u8,
1324        #[case] expected: Option<Price>,
1325    ) {
1326        let actual = decode_optional_price(value, precision);
1327        assert_eq!(actual, expected);
1328    }
1329
1330    #[rstest]
1331    #[case(i32::MAX, None)] // None for i32::MAX
1332    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1333    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1334    fn test_decode_optional_quantity(#[case] value: i32, #[case] expected: Option<Quantity>) {
1335        let actual = decode_optional_quantity(value);
1336        assert_eq!(actual, expected);
1337    }
1338
1339    #[rstest]
1340    fn test_decode_mbo_msg() {
1341        let path = test_data_path().join("test_data.mbo.dbn.zst");
1342        let mut dbn_stream = Decoder::from_zstd_file(path)
1343            .unwrap()
1344            .decode_stream::<dbn::MboMsg>();
1345        let msg = dbn_stream.next().unwrap().unwrap();
1346
1347        let instrument_id = InstrumentId::from("ESM4.GLBX");
1348        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, 0.into(), false).unwrap();
1349        let delta = delta.unwrap();
1350
1351        assert_eq!(delta.instrument_id, instrument_id);
1352        assert_eq!(delta.action, BookAction::Delete);
1353        assert_eq!(delta.order.side, OrderSide::Sell);
1354        assert_eq!(delta.order.price, Price::from("3722.75"));
1355        assert_eq!(delta.order.size, Quantity::from("1"));
1356        assert_eq!(delta.order.order_id, 647_784_973_705);
1357        assert_eq!(delta.flags, 128);
1358        assert_eq!(delta.sequence, 1_170_352);
1359        assert_eq!(delta.ts_event, msg.ts_recv);
1360        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
1361        assert_eq!(delta.ts_init, 0);
1362    }
1363
1364    #[rstest]
1365    fn test_decode_mbp1_msg() {
1366        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
1367        let mut dbn_stream = Decoder::from_zstd_file(path)
1368            .unwrap()
1369            .decode_stream::<dbn::Mbp1Msg>();
1370        let msg = dbn_stream.next().unwrap().unwrap();
1371
1372        let instrument_id = InstrumentId::from("ESM4.GLBX");
1373        let (quote, _) = decode_mbp1_msg(msg, instrument_id, 2, 0.into(), false).unwrap();
1374
1375        assert_eq!(quote.instrument_id, instrument_id);
1376        assert_eq!(quote.bid_price, Price::from("3720.25"));
1377        assert_eq!(quote.ask_price, Price::from("3720.50"));
1378        assert_eq!(quote.bid_size, Quantity::from("24"));
1379        assert_eq!(quote.ask_size, Quantity::from("11"));
1380        assert_eq!(quote.ts_event, msg.ts_recv);
1381        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
1382        assert_eq!(quote.ts_init, 0);
1383    }
1384
1385    #[rstest]
1386    fn test_decode_bbo_1s_msg() {
1387        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
1388        let mut dbn_stream = Decoder::from_zstd_file(path)
1389            .unwrap()
1390            .decode_stream::<dbn::BboMsg>();
1391        let msg = dbn_stream.next().unwrap().unwrap();
1392
1393        let instrument_id = InstrumentId::from("ESM4.GLBX");
1394        let quote = decode_bbo_msg(msg, instrument_id, 2, 0.into()).unwrap();
1395
1396        assert_eq!(quote.instrument_id, instrument_id);
1397        assert_eq!(quote.bid_price, Price::from("5199.50"));
1398        assert_eq!(quote.ask_price, Price::from("5199.75"));
1399        assert_eq!(quote.bid_size, Quantity::from("26"));
1400        assert_eq!(quote.ask_size, Quantity::from("23"));
1401        assert_eq!(quote.ts_event, msg.ts_recv);
1402        assert_eq!(quote.ts_event, 1715248801000000000);
1403        assert_eq!(quote.ts_init, 0);
1404    }
1405
1406    #[rstest]
1407    fn test_decode_bbo_1m_msg() {
1408        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
1409        let mut dbn_stream = Decoder::from_zstd_file(path)
1410            .unwrap()
1411            .decode_stream::<dbn::BboMsg>();
1412        let msg = dbn_stream.next().unwrap().unwrap();
1413
1414        let instrument_id = InstrumentId::from("ESM4.GLBX");
1415        let quote = decode_bbo_msg(msg, instrument_id, 2, 0.into()).unwrap();
1416
1417        assert_eq!(quote.instrument_id, instrument_id);
1418        assert_eq!(quote.bid_price, Price::from("5199.50"));
1419        assert_eq!(quote.ask_price, Price::from("5199.75"));
1420        assert_eq!(quote.bid_size, Quantity::from("33"));
1421        assert_eq!(quote.ask_size, Quantity::from("17"));
1422        assert_eq!(quote.ts_event, msg.ts_recv);
1423        assert_eq!(quote.ts_event, 1715248800000000000);
1424        assert_eq!(quote.ts_init, 0);
1425    }
1426
1427    #[rstest]
1428    fn test_decode_mbp10_msg() {
1429        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
1430        let mut dbn_stream = Decoder::from_zstd_file(path)
1431            .unwrap()
1432            .decode_stream::<dbn::Mbp10Msg>();
1433        let msg = dbn_stream.next().unwrap().unwrap();
1434
1435        let instrument_id = InstrumentId::from("ESM4.GLBX");
1436        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, 0.into()).unwrap();
1437
1438        assert_eq!(depth10.instrument_id, instrument_id);
1439        assert_eq!(depth10.bids.len(), 10);
1440        assert_eq!(depth10.asks.len(), 10);
1441        assert_eq!(depth10.bid_counts.len(), 10);
1442        assert_eq!(depth10.ask_counts.len(), 10);
1443        assert_eq!(depth10.flags, 128);
1444        assert_eq!(depth10.sequence, 1_170_352);
1445        assert_eq!(depth10.ts_event, msg.ts_recv);
1446        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
1447        assert_eq!(depth10.ts_init, 0);
1448    }
1449
1450    #[rstest]
1451    fn test_decode_trade_msg() {
1452        let path = test_data_path().join("test_data.trades.dbn.zst");
1453        let mut dbn_stream = Decoder::from_zstd_file(path)
1454            .unwrap()
1455            .decode_stream::<dbn::TradeMsg>();
1456        let msg = dbn_stream.next().unwrap().unwrap();
1457
1458        let instrument_id = InstrumentId::from("ESM4.GLBX");
1459        let trade = decode_trade_msg(msg, instrument_id, 2, 0.into()).unwrap();
1460
1461        assert_eq!(trade.instrument_id, instrument_id);
1462        assert_eq!(trade.price, Price::from("3720.25"));
1463        assert_eq!(trade.size, Quantity::from("5"));
1464        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1465        assert_eq!(trade.trade_id.to_string(), "1170380");
1466        assert_eq!(trade.ts_event, msg.ts_recv);
1467        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1468        assert_eq!(trade.ts_init, 0);
1469    }
1470
1471    #[rstest]
1472    fn test_decode_tbbo_msg() {
1473        let path = test_data_path().join("test_data.tbbo.dbn.zst");
1474        let mut dbn_stream = Decoder::from_zstd_file(path)
1475            .unwrap()
1476            .decode_stream::<dbn::Mbp1Msg>();
1477        let msg = dbn_stream.next().unwrap().unwrap();
1478
1479        let instrument_id = InstrumentId::from("ESM4.GLBX");
1480        let (quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, 0.into()).unwrap();
1481
1482        assert_eq!(quote.instrument_id, instrument_id);
1483        assert_eq!(quote.bid_price, Price::from("3720.25"));
1484        assert_eq!(quote.ask_price, Price::from("3720.50"));
1485        assert_eq!(quote.bid_size, Quantity::from("26"));
1486        assert_eq!(quote.ask_size, Quantity::from("7"));
1487        assert_eq!(quote.ts_event, msg.ts_recv);
1488        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
1489        assert_eq!(quote.ts_init, 0);
1490
1491        assert_eq!(trade.instrument_id, instrument_id);
1492        assert_eq!(trade.price, Price::from("3720.25"));
1493        assert_eq!(trade.size, Quantity::from("5"));
1494        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1495        assert_eq!(trade.trade_id.to_string(), "1170380");
1496        assert_eq!(trade.ts_event, msg.ts_recv);
1497        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
1498        assert_eq!(trade.ts_init, 0);
1499    }
1500
1501    #[ignore] // TODO: Requires updated test data
1502    #[rstest]
1503    fn test_decode_ohlcv_msg() {
1504        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
1505        let mut dbn_stream = Decoder::from_zstd_file(path)
1506            .unwrap()
1507            .decode_stream::<dbn::OhlcvMsg>();
1508        let msg = dbn_stream.next().unwrap().unwrap();
1509
1510        let instrument_id = InstrumentId::from("ESM4.GLBX");
1511        let bar = decode_ohlcv_msg(msg, instrument_id, 2, 0.into()).unwrap();
1512
1513        assert_eq!(
1514            bar.bar_type,
1515            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
1516        );
1517        assert_eq!(bar.open, Price::from("3720.25"));
1518        assert_eq!(bar.high, Price::from("3720.50"));
1519        assert_eq!(bar.low, Price::from("3720.25"));
1520        assert_eq!(bar.close, Price::from("3720.50"));
1521        assert_eq!(bar.ts_event, 1_609_160_400_000_000_000);
1522        assert_eq!(bar.ts_init, 1_609_160_401_000_000_000); // Adjusted to open + interval
1523    }
1524
1525    #[rstest]
1526    fn test_decode_definition_msg() {
1527        let path = test_data_path().join("test_data.definition.dbn.zst");
1528        let mut dbn_stream = Decoder::from_zstd_file(path)
1529            .unwrap()
1530            .decode_stream::<dbn::InstrumentDefMsg>();
1531        let msg = dbn_stream.next().unwrap().unwrap();
1532
1533        let instrument_id = InstrumentId::from("ESM4.GLBX");
1534        let result = decode_instrument_def_msg(msg, instrument_id, 0.into());
1535
1536        assert!(result.is_ok());
1537        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1538    }
1539
1540    #[rstest]
1541    fn test_decode_definition_v1_msg() {
1542        let path = test_data_path().join("test_data.definition.v1.dbn.zst");
1543        let mut dbn_stream = Decoder::from_zstd_file(path)
1544            .unwrap()
1545            .decode_stream::<dbn::compat::InstrumentDefMsgV1>();
1546        let msg = dbn_stream.next().unwrap().unwrap();
1547
1548        let instrument_id = InstrumentId::from("ESM4.GLBX");
1549        let result = decode_instrument_def_msg_v1(msg, instrument_id, 0.into());
1550
1551        assert!(result.is_ok());
1552        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
1553    }
1554
1555    #[rstest]
1556    fn test_decode_status_msg() {
1557        let path = test_data_path().join("test_data.status.dbn.zst");
1558        let mut dbn_stream = Decoder::from_zstd_file(path)
1559            .unwrap()
1560            .decode_stream::<dbn::StatusMsg>();
1561        let msg = dbn_stream.next().unwrap().unwrap();
1562
1563        let instrument_id = InstrumentId::from("ESM4.GLBX");
1564        let status = decode_status_msg(msg, instrument_id, 0.into()).unwrap();
1565
1566        assert_eq!(status.instrument_id, instrument_id);
1567        assert_eq!(status.action, MarketStatusAction::Trading);
1568        assert_eq!(status.ts_event, msg.hd.ts_event);
1569        assert_eq!(status.ts_init, 0);
1570        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
1571        assert_eq!(status.trading_event, None);
1572        assert_eq!(status.is_trading, Some(true));
1573        assert_eq!(status.is_quoting, Some(true));
1574        assert_eq!(status.is_short_sell_restricted, None);
1575    }
1576
1577    #[rstest]
1578    fn test_decode_imbalance_msg() {
1579        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1580        let mut dbn_stream = Decoder::from_zstd_file(path)
1581            .unwrap()
1582            .decode_stream::<dbn::ImbalanceMsg>();
1583        let msg = dbn_stream.next().unwrap().unwrap();
1584
1585        let instrument_id = InstrumentId::from("ESM4.GLBX");
1586        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, 0.into()).unwrap();
1587
1588        assert_eq!(imbalance.instrument_id, instrument_id);
1589        assert_eq!(imbalance.ref_price, Price::from("229.43"));
1590        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
1591        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
1592        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
1593        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
1594        assert_eq!(imbalance.side, OrderSide::Buy);
1595        assert_eq!(imbalance.significant_imbalance, 126);
1596        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
1597        assert_eq!(imbalance.ts_recv, msg.ts_recv);
1598        assert_eq!(imbalance.ts_init, 0);
1599    }
1600
1601    #[rstest]
1602    fn test_decode_statistics_msg() {
1603        let path = test_data_path().join("test_data.statistics.dbn.zst");
1604        let mut dbn_stream = Decoder::from_zstd_file(path)
1605            .unwrap()
1606            .decode_stream::<dbn::StatMsg>();
1607        let msg = dbn_stream.next().unwrap().unwrap();
1608
1609        let instrument_id = InstrumentId::from("ESM4.GLBX");
1610        let statistics = decode_statistics_msg(msg, instrument_id, 2, 0.into()).unwrap();
1611
1612        assert_eq!(statistics.instrument_id, instrument_id);
1613        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
1614        assert_eq!(
1615            statistics.update_action,
1616            DatabentoStatisticUpdateAction::Added
1617        );
1618        assert_eq!(statistics.price, Some(Price::from("100.00")));
1619        assert_eq!(statistics.quantity, None);
1620        assert_eq!(statistics.channel_id, 13);
1621        assert_eq!(statistics.stat_flags, 255);
1622        assert_eq!(statistics.sequence, 2);
1623        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
1624        assert_eq!(statistics.ts_in_delta, 26961);
1625        assert_eq!(statistics.ts_event, msg.hd.ts_event);
1626        assert_eq!(statistics.ts_recv, msg.ts_recv);
1627        assert_eq!(statistics.ts_init, 0);
1628    }
1629}