nautilus_databento/
symbology.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use ahash::AHashMap;
19use databento::dbn::{self, PitSymbolMap, SType};
20use dbn::{Publisher, Record};
21use indexmap::IndexMap;
22use nautilus_core::correctness::check_slice_not_empty;
23use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
24
25use super::types::PublisherId;
26
27#[derive(Debug)]
28pub struct MetadataCache {
29    metadata: dbn::Metadata,
30    date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
31}
32
33impl MetadataCache {
34    #[must_use]
35    pub fn new(metadata: dbn::Metadata) -> Self {
36        Self {
37            metadata,
38            date_metadata_map: AHashMap::new(),
39        }
40    }
41
42    pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43        Ok(self
44            .date_metadata_map
45            .entry(date)
46            .or_insert_with(|| self.metadata.symbol_map_for_date(date).unwrap()))
47    }
48}
49
50pub fn instrument_id_to_symbol_string(
51    instrument_id: InstrumentId,
52    symbol_venue_map: &mut HashMap<Symbol, Venue>,
53) -> String {
54    symbol_venue_map
55        .entry(instrument_id.symbol)
56        .or_insert(instrument_id.venue);
57    instrument_id.symbol.to_string()
58}
59
60pub fn decode_nautilus_instrument_id(
61    record: &dbn::RecordRef,
62    metadata: &mut MetadataCache,
63    publisher_venue_map: &IndexMap<PublisherId, Venue>,
64    symbol_venue_map: &HashMap<Symbol, Venue>,
65) -> anyhow::Result<InstrumentId> {
66    let publisher = record.publisher().expect("Invalid `publisher` for record");
67    let publisher_id = publisher as PublisherId;
68    let venue = publisher_venue_map
69        .get(&publisher_id)
70        .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
71    let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
72    if publisher == Publisher::GlbxMdp3Glbx {
73        if let Some(venue) = symbol_venue_map.get(&instrument_id.symbol) {
74            instrument_id.venue = *venue;
75        }
76    }
77
78    Ok(instrument_id)
79}
80
81pub fn get_nautilus_instrument_id_for_record(
82    record: &dbn::RecordRef,
83    metadata: &mut MetadataCache,
84    venue: Venue,
85) -> anyhow::Result<InstrumentId> {
86    let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
87        (msg.hd.instrument_id, msg.ts_recv)
88    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
89        (msg.hd.instrument_id, msg.ts_recv)
90    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
91        (msg.hd.instrument_id, msg.ts_recv)
92    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
93        (msg.hd.instrument_id, msg.ts_recv)
94    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
95        (msg.hd.instrument_id, msg.ts_recv)
96    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
97        (msg.hd.instrument_id, msg.ts_recv)
98    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
99        (msg.hd.instrument_id, msg.hd.ts_event)
100    } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
101        (msg.hd.instrument_id, msg.ts_recv)
102    } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
103        (msg.hd.instrument_id, msg.ts_recv)
104    } else if let Some(msg) = record.get::<dbn::StatMsg>() {
105        (msg.hd.instrument_id, msg.ts_recv)
106    } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
107        (msg.hd.instrument_id, msg.ts_recv)
108    } else {
109        anyhow::bail!("DBN message type is not currently supported")
110    };
111
112    let duration = time::Duration::nanoseconds(nanoseconds as i64);
113    let datetime = time::OffsetDateTime::UNIX_EPOCH
114        .checked_add(duration)
115        .unwrap(); // SAFETY: Relying on correctness of record timestamps
116    let date = datetime.date();
117    let symbol_map = metadata.symbol_map_for_date(date)?;
118    let raw_symbol = symbol_map
119        .get(instrument_id)
120        .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
121
122    let symbol = Symbol::from_str_unchecked(raw_symbol);
123
124    Ok(InstrumentId::new(symbol, venue))
125}
126
127#[must_use]
128pub fn infer_symbology_type(symbol: &str) -> SType {
129    if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
130        return SType::Parent;
131    }
132
133    let parts: Vec<&str> = symbol.split('.').collect();
134    if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
135        return SType::Continuous;
136    }
137
138    if symbol.chars().all(|c| c.is_ascii_digit()) {
139        return SType::InstrumentId;
140    }
141
142    SType::RawSymbol
143}
144
145pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
146    check_slice_not_empty(symbols, stringify!(symbols)).unwrap();
147
148    // SAFETY: We checked len so know there must be at least one symbol
149    let first_symbol = symbols.first().unwrap();
150    let first_stype = infer_symbology_type(first_symbol);
151
152    for symbol in symbols {
153        let next_stype = infer_symbology_type(symbol);
154        if next_stype != first_stype {
155            anyhow::bail!(
156                "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
157                first_stype,
158                first_symbol,
159                next_stype,
160                symbol
161            );
162        }
163    }
164
165    Ok(())
166}
167
168////////////////////////////////////////////////////////////////////////////////
169// Tests
170////////////////////////////////////////////////////////////////////////////////
171#[cfg(test)]
172mod tests {
173    use rstest::*;
174
175    use super::*;
176
177    #[rstest]
178    #[case("1", "instrument_id")]
179    #[case("123456789", "instrument_id")]
180    #[case("AAPL", "raw_symbol")]
181    #[case("ESM4", "raw_symbol")]
182    #[case("BRN FMM0024!", "raw_symbol")]
183    #[case("BRN  99   5617289", "raw_symbol")]
184    #[case("SPY   240319P00511000", "raw_symbol")]
185    #[case("ES.FUT", "parent")]
186    #[case("ES.OPT", "parent")]
187    #[case("BRN.FUT", "parent")]
188    #[case("SPX.OPT", "parent")]
189    #[case("ES.c.0", "continuous")]
190    #[case("SPX.n.0", "continuous")]
191    fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
192        let result = infer_symbology_type(&symbol);
193        assert_eq!(result, expected);
194    }
195
196    #[rstest]
197    #[should_panic]
198    fn test_check_consistent_symbology_when_empty_symbols() {
199        let symbols: Vec<&str> = vec![];
200        let _ = check_consistent_symbology(&symbols);
201    }
202
203    #[rstest]
204    fn test_check_consistent_symbology_when_inconsistent() {
205        let symbols = vec!["ESM4", "ES.OPT"];
206        let result = check_consistent_symbology(&symbols);
207        assert!(result.is_err());
208        assert_eq!(
209            result.err().unwrap().to_string(),
210            "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
211        );
212    }
213
214    #[rstest]
215    #[case(vec!["AAPL,MSFT"])]
216    #[case(vec!["ES.OPT,ES.FUT"])]
217    #[case(vec!["ES.c.0,ES.c.1"])]
218    fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
219        let result = check_consistent_symbology(&symbols);
220        assert!(result.is_ok());
221    }
222}