nautilus_databento/
symbology.rs
1use std::collections::HashMap;
17
18use ahash::AHashMap;
19use databento::dbn::{self, PitSymbolMap, SType};
20use dbn::{Publisher, Record};
21use indexmap::IndexMap;
22use nautilus_core::correctness::check_slice_not_empty;
23use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
24
25use super::types::PublisherId;
26
27#[derive(Debug)]
28pub struct MetadataCache {
29 metadata: dbn::Metadata,
30 date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
31}
32
33impl MetadataCache {
34 #[must_use]
35 pub fn new(metadata: dbn::Metadata) -> Self {
36 Self {
37 metadata,
38 date_metadata_map: AHashMap::new(),
39 }
40 }
41
42 pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43 Ok(self
44 .date_metadata_map
45 .entry(date)
46 .or_insert_with(|| self.metadata.symbol_map_for_date(date).unwrap()))
47 }
48}
49
50pub fn instrument_id_to_symbol_string(
51 instrument_id: InstrumentId,
52 symbol_venue_map: &mut HashMap<Symbol, Venue>,
53) -> String {
54 symbol_venue_map
55 .entry(instrument_id.symbol)
56 .or_insert(instrument_id.venue);
57 instrument_id.symbol.to_string()
58}
59
60pub fn decode_nautilus_instrument_id(
61 record: &dbn::RecordRef,
62 metadata: &mut MetadataCache,
63 publisher_venue_map: &IndexMap<PublisherId, Venue>,
64 symbol_venue_map: &HashMap<Symbol, Venue>,
65) -> anyhow::Result<InstrumentId> {
66 let publisher = record.publisher().expect("Invalid `publisher` for record");
67 let publisher_id = publisher as PublisherId;
68 let venue = publisher_venue_map
69 .get(&publisher_id)
70 .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
71 let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
72 if publisher == Publisher::GlbxMdp3Glbx {
73 if let Some(venue) = symbol_venue_map.get(&instrument_id.symbol) {
74 instrument_id.venue = *venue;
75 }
76 }
77
78 Ok(instrument_id)
79}
80
81pub fn get_nautilus_instrument_id_for_record(
82 record: &dbn::RecordRef,
83 metadata: &mut MetadataCache,
84 venue: Venue,
85) -> anyhow::Result<InstrumentId> {
86 let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
87 (msg.hd.instrument_id, msg.ts_recv)
88 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
89 (msg.hd.instrument_id, msg.ts_recv)
90 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
91 (msg.hd.instrument_id, msg.ts_recv)
92 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
93 (msg.hd.instrument_id, msg.ts_recv)
94 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
95 (msg.hd.instrument_id, msg.ts_recv)
96 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
97 (msg.hd.instrument_id, msg.ts_recv)
98 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
99 (msg.hd.instrument_id, msg.hd.ts_event)
100 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
101 (msg.hd.instrument_id, msg.ts_recv)
102 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
103 (msg.hd.instrument_id, msg.ts_recv)
104 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
105 (msg.hd.instrument_id, msg.ts_recv)
106 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
107 (msg.hd.instrument_id, msg.ts_recv)
108 } else {
109 anyhow::bail!("DBN message type is not currently supported")
110 };
111
112 let duration = time::Duration::nanoseconds(nanoseconds as i64);
113 let datetime = time::OffsetDateTime::UNIX_EPOCH
114 .checked_add(duration)
115 .unwrap(); let date = datetime.date();
117 let symbol_map = metadata.symbol_map_for_date(date)?;
118 let raw_symbol = symbol_map
119 .get(instrument_id)
120 .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
121
122 let symbol = Symbol::from_str_unchecked(raw_symbol);
123
124 Ok(InstrumentId::new(symbol, venue))
125}
126
127#[must_use]
128pub fn infer_symbology_type(symbol: &str) -> SType {
129 if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
130 return SType::Parent;
131 }
132
133 let parts: Vec<&str> = symbol.split('.').collect();
134 if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
135 return SType::Continuous;
136 }
137
138 if symbol.chars().all(|c| c.is_ascii_digit()) {
139 return SType::InstrumentId;
140 }
141
142 SType::RawSymbol
143}
144
145pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
146 check_slice_not_empty(symbols, stringify!(symbols)).unwrap();
147
148 let first_symbol = symbols.first().unwrap();
150 let first_stype = infer_symbology_type(first_symbol);
151
152 for symbol in symbols {
153 let next_stype = infer_symbology_type(symbol);
154 if next_stype != first_stype {
155 anyhow::bail!(
156 "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
157 first_stype,
158 first_symbol,
159 next_stype,
160 symbol
161 );
162 }
163 }
164
165 Ok(())
166}
167
168#[cfg(test)]
172mod tests {
173 use rstest::*;
174
175 use super::*;
176
177 #[rstest]
178 #[case("1", "instrument_id")]
179 #[case("123456789", "instrument_id")]
180 #[case("AAPL", "raw_symbol")]
181 #[case("ESM4", "raw_symbol")]
182 #[case("BRN FMM0024!", "raw_symbol")]
183 #[case("BRN 99 5617289", "raw_symbol")]
184 #[case("SPY 240319P00511000", "raw_symbol")]
185 #[case("ES.FUT", "parent")]
186 #[case("ES.OPT", "parent")]
187 #[case("BRN.FUT", "parent")]
188 #[case("SPX.OPT", "parent")]
189 #[case("ES.c.0", "continuous")]
190 #[case("SPX.n.0", "continuous")]
191 fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
192 let result = infer_symbology_type(&symbol);
193 assert_eq!(result, expected);
194 }
195
196 #[rstest]
197 #[should_panic]
198 fn test_check_consistent_symbology_when_empty_symbols() {
199 let symbols: Vec<&str> = vec![];
200 let _ = check_consistent_symbology(&symbols);
201 }
202
203 #[rstest]
204 fn test_check_consistent_symbology_when_inconsistent() {
205 let symbols = vec!["ESM4", "ES.OPT"];
206 let result = check_consistent_symbology(&symbols);
207 assert!(result.is_err());
208 assert_eq!(
209 result.err().unwrap().to_string(),
210 "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
211 );
212 }
213
214 #[rstest]
215 #[case(vec!["AAPL,MSFT"])]
216 #[case(vec!["ES.OPT,ES.FUT"])]
217 #[case(vec!["ES.c.0,ES.c.1"])]
218 fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
219 let result = check_consistent_symbology(&symbols);
220 assert!(result.is_ok());
221 }
222}