nautilus_databento/
symbology.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use databento::dbn::{self, PitSymbolMap, SType};
18use dbn::{Publisher, Record};
19use indexmap::IndexMap;
20use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
21
22use super::types::PublisherId;
23
24#[derive(Debug)]
25pub struct MetadataCache {
26    metadata: dbn::Metadata,
27    date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
28}
29
30impl MetadataCache {
31    #[must_use]
32    pub fn new(metadata: dbn::Metadata) -> Self {
33        Self {
34            metadata,
35            date_metadata_map: AHashMap::new(),
36        }
37    }
38
39    /// # Errors
40    ///
41    /// Returns an error if metadata for the given `date` cannot be retrieved.
42    pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43        if !self.date_metadata_map.contains_key(&date) {
44            let map = self.metadata.symbol_map_for_date(date)?;
45            self.date_metadata_map.insert(date, map);
46        }
47
48        self.date_metadata_map
49            .get(&date)
50            .ok_or_else(|| dbn::Error::decode(format!("metadata cache missing for date {date}")))
51    }
52}
53
54pub fn instrument_id_to_symbol_string(
55    instrument_id: InstrumentId,
56    symbol_venue_map: &mut AHashMap<Symbol, Venue>,
57) -> String {
58    symbol_venue_map
59        .entry(instrument_id.symbol)
60        .or_insert(instrument_id.venue);
61    instrument_id.symbol.to_string()
62}
63
64/// Decodes a Databento record into a Nautilus `InstrumentId`.
65///
66/// # Errors
67///
68/// Returns an error if:
69/// - The publisher cannot be extracted from the record.
70/// - The publisher ID is not found in the venue map.
71/// - The underlying instrument ID mapping fails.
72pub fn decode_nautilus_instrument_id(
73    record: &dbn::RecordRef,
74    metadata: &mut MetadataCache,
75    publisher_venue_map: &IndexMap<PublisherId, Venue>,
76    symbol_venue_map: &AHashMap<Symbol, Venue>,
77) -> anyhow::Result<InstrumentId> {
78    let publisher = record
79        .publisher()
80        .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
81    let publisher_id = publisher as PublisherId;
82    let venue = publisher_venue_map
83        .get(&publisher_id)
84        .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
85    let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
86    if publisher == Publisher::GlbxMdp3Glbx
87        && let Some(venue) = symbol_venue_map.get(&instrument_id.symbol)
88    {
89        instrument_id.venue = *venue;
90    }
91
92    Ok(instrument_id)
93}
94
95/// Gets the Nautilus `InstrumentId` for a Databento record.
96///
97/// # Errors
98///
99/// Returns an error if:
100/// - The record type is not supported.
101/// - Timestamp overflow occurs when calculating the date.
102/// - Symbol metadata lookup fails.
103/// - No raw symbol is found for the instrument ID.
104pub fn get_nautilus_instrument_id_for_record(
105    record: &dbn::RecordRef,
106    metadata: &mut MetadataCache,
107    venue: Venue,
108) -> anyhow::Result<InstrumentId> {
109    let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
110        (msg.hd.instrument_id, msg.ts_recv)
111    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
112        (msg.hd.instrument_id, msg.ts_recv)
113    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
114        (msg.hd.instrument_id, msg.ts_recv)
115    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
116        (msg.hd.instrument_id, msg.ts_recv)
117    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
118        (msg.hd.instrument_id, msg.ts_recv)
119    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
120        (msg.hd.instrument_id, msg.ts_recv)
121    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
122        (msg.hd.instrument_id, msg.hd.ts_event)
123    } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
124        (msg.hd.instrument_id, msg.ts_recv)
125    } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
126        (msg.hd.instrument_id, msg.ts_recv)
127    } else if let Some(msg) = record.get::<dbn::StatMsg>() {
128        (msg.hd.instrument_id, msg.ts_recv)
129    } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
130        (msg.hd.instrument_id, msg.ts_recv)
131    } else {
132        anyhow::bail!("DBN message type is not currently supported")
133    };
134
135    let duration = time::Duration::nanoseconds(nanoseconds as i64);
136    let datetime = time::OffsetDateTime::UNIX_EPOCH
137        .checked_add(duration)
138        .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for record"))?;
139    let date = datetime.date();
140    let symbol_map = metadata.symbol_map_for_date(date)?;
141    let raw_symbol = symbol_map
142        .get(instrument_id)
143        .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
144
145    let symbol = Symbol::from_str_unchecked(raw_symbol);
146
147    Ok(InstrumentId::new(symbol, venue))
148}
149
150#[must_use]
151pub fn infer_symbology_type(symbol: &str) -> SType {
152    if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
153        return SType::Parent;
154    }
155
156    let parts: Vec<&str> = symbol.split('.').collect();
157    if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
158        return SType::Continuous;
159    }
160
161    if symbol.chars().all(|c| c.is_ascii_digit()) {
162        return SType::InstrumentId;
163    }
164
165    SType::RawSymbol
166}
167
168/// # Errors
169///
170/// Returns an error if `symbols` is empty or symbols have inconsistent symbology types.
171pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
172    if symbols.is_empty() {
173        anyhow::bail!("No symbols provided");
174    }
175    let first_symbol = symbols[0];
176    let first_stype = infer_symbology_type(first_symbol);
177
178    for symbol in symbols {
179        let next_stype = infer_symbology_type(symbol);
180        if next_stype != first_stype {
181            anyhow::bail!(
182                "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
183                first_stype,
184                first_symbol,
185                next_stype,
186                symbol
187            );
188        }
189    }
190
191    Ok(())
192}
193
194////////////////////////////////////////////////////////////////////////////////
195// Tests
196////////////////////////////////////////////////////////////////////////////////
197#[cfg(test)]
198mod tests {
199    use rstest::*;
200
201    use super::*;
202
203    #[rstest]
204    #[case("1", "instrument_id")]
205    #[case("123456789", "instrument_id")]
206    #[case("AAPL", "raw_symbol")]
207    #[case("ESM4", "raw_symbol")]
208    #[case("BRN FMM0024!", "raw_symbol")]
209    #[case("BRN  99   5617289", "raw_symbol")]
210    #[case("SPY   240319P00511000", "raw_symbol")]
211    #[case("ES.FUT", "parent")]
212    #[case("ES.OPT", "parent")]
213    #[case("BRN.FUT", "parent")]
214    #[case("SPX.OPT", "parent")]
215    #[case("ES.c.0", "continuous")]
216    #[case("SPX.n.0", "continuous")]
217    fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
218        let result = infer_symbology_type(&symbol);
219        assert_eq!(result, expected);
220    }
221
222    #[rstest]
223    fn test_check_consistent_symbology_when_empty_symbols() {
224        let symbols: Vec<&str> = vec![];
225        assert!(check_consistent_symbology(&symbols).is_err());
226    }
227
228    #[rstest]
229    fn test_instrument_id_to_symbol_string_updates_map() {
230        use nautilus_model::identifiers::Venue;
231        let symbol = Symbol::from("TEST");
232        let venue = Venue::from("XNAS");
233        let instrument_id = InstrumentId::new(symbol, venue);
234        let mut map: AHashMap<Symbol, Venue> = AHashMap::new();
235
236        // First call should insert the mapping
237        let sym_str = instrument_id_to_symbol_string(instrument_id, &mut map);
238        assert_eq!(sym_str, "TEST");
239        assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
240
241        // Call again with same symbol but different venue should not override existing
242        let other = Venue::from("XLON");
243        let inst2 = InstrumentId::new(Symbol::from("TEST"), other);
244        let sym_str2 = instrument_id_to_symbol_string(inst2, &mut map);
245        assert_eq!(sym_str2, "TEST");
246
247        // Venue remains the original
248        assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
249    }
250
251    #[rstest]
252    fn test_check_consistent_symbology_when_inconsistent() {
253        let symbols = vec!["ESM4", "ES.OPT"];
254        let result = check_consistent_symbology(&symbols);
255        assert!(result.is_err());
256        assert_eq!(
257            result.err().unwrap().to_string(),
258            "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
259        );
260    }
261
262    #[rstest]
263    #[case(vec!["AAPL,MSFT"])]
264    #[case(vec!["ES.OPT,ES.FUT"])]
265    #[case(vec!["ES.c.0,ES.c.1"])]
266    fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
267        let result = check_consistent_symbology(&symbols);
268        assert!(result.is_ok());
269    }
270}