nautilus_databento/
symbology.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use databento::dbn::{self, PitSymbolMap, SType};
18use dbn::{Publisher, Record};
19use indexmap::IndexMap;
20use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
21
22use super::types::PublisherId;
23
24#[derive(Debug)]
25pub struct MetadataCache {
26    metadata: dbn::Metadata,
27    date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
28}
29
30impl MetadataCache {
31    #[must_use]
32    pub fn new(metadata: dbn::Metadata) -> Self {
33        Self {
34            metadata,
35            date_metadata_map: AHashMap::new(),
36        }
37    }
38
39    /// # Errors
40    ///
41    /// Returns an error if metadata for the given `date` cannot be retrieved.
42    pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43        if !self.date_metadata_map.contains_key(&date) {
44            let map = self.metadata.symbol_map_for_date(date)?;
45            self.date_metadata_map.insert(date, map);
46        }
47
48        self.date_metadata_map
49            .get(&date)
50            .ok_or_else(|| dbn::Error::decode(format!("metadata cache missing for date {date}")))
51    }
52}
53
54pub fn instrument_id_to_symbol_string(
55    instrument_id: InstrumentId,
56    symbol_venue_map: &mut AHashMap<Symbol, Venue>,
57) -> String {
58    symbol_venue_map
59        .entry(instrument_id.symbol)
60        .or_insert(instrument_id.venue);
61    instrument_id.symbol.to_string()
62}
63
64/// # Errors
65///
66/// Returns an error if mapping record to `InstrumentId` fails.
67///
68/// # Panics
69///
70/// Panics if the raw symbol from metadata cannot be converted into a `Symbol`.
71pub fn decode_nautilus_instrument_id(
72    record: &dbn::RecordRef,
73    metadata: &mut MetadataCache,
74    publisher_venue_map: &IndexMap<PublisherId, Venue>,
75    symbol_venue_map: &AHashMap<Symbol, Venue>,
76) -> anyhow::Result<InstrumentId> {
77    let publisher = record
78        .publisher()
79        .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
80    let publisher_id = publisher as PublisherId;
81    let venue = publisher_venue_map
82        .get(&publisher_id)
83        .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
84    let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
85    if publisher == Publisher::GlbxMdp3Glbx
86        && let Some(venue) = symbol_venue_map.get(&instrument_id.symbol)
87    {
88        instrument_id.venue = *venue;
89    }
90
91    Ok(instrument_id)
92}
93
94/// # Errors
95///
96/// Returns an error if mapping record to `InstrumentId` fails or timestamp overflow occurs.
97///
98/// # Panics
99///
100/// Panics if the raw symbol from metadata cannot be converted into a `Symbol`.
101pub fn get_nautilus_instrument_id_for_record(
102    record: &dbn::RecordRef,
103    metadata: &mut MetadataCache,
104    venue: Venue,
105) -> anyhow::Result<InstrumentId> {
106    let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
107        (msg.hd.instrument_id, msg.ts_recv)
108    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
109        (msg.hd.instrument_id, msg.ts_recv)
110    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
111        (msg.hd.instrument_id, msg.ts_recv)
112    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
113        (msg.hd.instrument_id, msg.ts_recv)
114    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
115        (msg.hd.instrument_id, msg.ts_recv)
116    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
117        (msg.hd.instrument_id, msg.ts_recv)
118    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
119        (msg.hd.instrument_id, msg.hd.ts_event)
120    } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
121        (msg.hd.instrument_id, msg.ts_recv)
122    } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
123        (msg.hd.instrument_id, msg.ts_recv)
124    } else if let Some(msg) = record.get::<dbn::StatMsg>() {
125        (msg.hd.instrument_id, msg.ts_recv)
126    } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
127        (msg.hd.instrument_id, msg.ts_recv)
128    } else {
129        anyhow::bail!("DBN message type is not currently supported")
130    };
131
132    let duration = time::Duration::nanoseconds(nanoseconds as i64);
133    let datetime = time::OffsetDateTime::UNIX_EPOCH
134        .checked_add(duration)
135        .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for record"))?;
136    let date = datetime.date();
137    let symbol_map = metadata.symbol_map_for_date(date)?;
138    let raw_symbol = symbol_map
139        .get(instrument_id)
140        .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
141
142    let symbol = Symbol::from_str_unchecked(raw_symbol);
143
144    Ok(InstrumentId::new(symbol, venue))
145}
146
147#[must_use]
148pub fn infer_symbology_type(symbol: &str) -> SType {
149    if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
150        return SType::Parent;
151    }
152
153    let parts: Vec<&str> = symbol.split('.').collect();
154    if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
155        return SType::Continuous;
156    }
157
158    if symbol.chars().all(|c| c.is_ascii_digit()) {
159        return SType::InstrumentId;
160    }
161
162    SType::RawSymbol
163}
164
165/// # Errors
166///
167/// Returns an error if `symbols` is empty or symbols have inconsistent symbology types.
168pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
169    if symbols.is_empty() {
170        anyhow::bail!("No symbols provided");
171    }
172    let first_symbol = symbols[0];
173    let first_stype = infer_symbology_type(first_symbol);
174
175    for symbol in symbols {
176        let next_stype = infer_symbology_type(symbol);
177        if next_stype != first_stype {
178            anyhow::bail!(
179                "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
180                first_stype,
181                first_symbol,
182                next_stype,
183                symbol
184            );
185        }
186    }
187
188    Ok(())
189}
190
191////////////////////////////////////////////////////////////////////////////////
192// Tests
193////////////////////////////////////////////////////////////////////////////////
194#[cfg(test)]
195mod tests {
196    use rstest::*;
197
198    use super::*;
199
200    #[rstest]
201    #[case("1", "instrument_id")]
202    #[case("123456789", "instrument_id")]
203    #[case("AAPL", "raw_symbol")]
204    #[case("ESM4", "raw_symbol")]
205    #[case("BRN FMM0024!", "raw_symbol")]
206    #[case("BRN  99   5617289", "raw_symbol")]
207    #[case("SPY   240319P00511000", "raw_symbol")]
208    #[case("ES.FUT", "parent")]
209    #[case("ES.OPT", "parent")]
210    #[case("BRN.FUT", "parent")]
211    #[case("SPX.OPT", "parent")]
212    #[case("ES.c.0", "continuous")]
213    #[case("SPX.n.0", "continuous")]
214    fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
215        let result = infer_symbology_type(&symbol);
216        assert_eq!(result, expected);
217    }
218
219    #[rstest]
220    fn test_check_consistent_symbology_when_empty_symbols() {
221        let symbols: Vec<&str> = vec![];
222        assert!(check_consistent_symbology(&symbols).is_err());
223    }
224
225    #[rstest]
226    fn test_instrument_id_to_symbol_string_updates_map() {
227        use nautilus_model::identifiers::Venue;
228        let symbol = Symbol::from("TEST");
229        let venue = Venue::from("XNAS");
230        let instrument_id = InstrumentId::new(symbol, venue);
231        let mut map: AHashMap<Symbol, Venue> = AHashMap::new();
232
233        // First call should insert the mapping
234        let sym_str = instrument_id_to_symbol_string(instrument_id, &mut map);
235        assert_eq!(sym_str, "TEST");
236        assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
237
238        // Call again with same symbol but different venue should not override existing
239        let other = Venue::from("XLON");
240        let inst2 = InstrumentId::new(Symbol::from("TEST"), other);
241        let sym_str2 = instrument_id_to_symbol_string(inst2, &mut map);
242        assert_eq!(sym_str2, "TEST");
243
244        // Venue remains the original
245        assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
246    }
247
248    #[rstest]
249    fn test_check_consistent_symbology_when_inconsistent() {
250        let symbols = vec!["ESM4", "ES.OPT"];
251        let result = check_consistent_symbology(&symbols);
252        assert!(result.is_err());
253        assert_eq!(
254            result.err().unwrap().to_string(),
255            "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
256        );
257    }
258
259    #[rstest]
260    #[case(vec!["AAPL,MSFT"])]
261    #[case(vec!["ES.OPT,ES.FUT"])]
262    #[case(vec!["ES.c.0,ES.c.1"])]
263    fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
264        let result = check_consistent_symbology(&symbols);
265        assert!(result.is_ok());
266    }
267}