nautilus_databento/
symbology.rs1use ahash::AHashMap;
17use databento::dbn::{self, PitSymbolMap, SType};
18use dbn::{Publisher, Record};
19use indexmap::IndexMap;
20use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
21
22use super::types::PublisherId;
23
24#[derive(Debug)]
25pub struct MetadataCache {
26 metadata: dbn::Metadata,
27 date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
28}
29
30impl MetadataCache {
31 #[must_use]
32 pub fn new(metadata: dbn::Metadata) -> Self {
33 Self {
34 metadata,
35 date_metadata_map: AHashMap::new(),
36 }
37 }
38
39 pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43 if !self.date_metadata_map.contains_key(&date) {
44 let map = self.metadata.symbol_map_for_date(date)?;
45 self.date_metadata_map.insert(date, map);
46 }
47
48 self.date_metadata_map
49 .get(&date)
50 .ok_or_else(|| dbn::Error::decode(format!("metadata cache missing for date {date}")))
51 }
52}
53
54pub fn instrument_id_to_symbol_string(
55 instrument_id: InstrumentId,
56 symbol_venue_map: &mut AHashMap<Symbol, Venue>,
57) -> String {
58 symbol_venue_map
59 .entry(instrument_id.symbol)
60 .or_insert(instrument_id.venue);
61 instrument_id.symbol.to_string()
62}
63
64pub fn decode_nautilus_instrument_id(
73 record: &dbn::RecordRef,
74 metadata: &mut MetadataCache,
75 publisher_venue_map: &IndexMap<PublisherId, Venue>,
76 symbol_venue_map: &AHashMap<Symbol, Venue>,
77) -> anyhow::Result<InstrumentId> {
78 let publisher = record
79 .publisher()
80 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
81 let publisher_id = publisher as PublisherId;
82 let venue = publisher_venue_map
83 .get(&publisher_id)
84 .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
85 let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
86 if publisher == Publisher::GlbxMdp3Glbx
87 && let Some(venue) = symbol_venue_map.get(&instrument_id.symbol)
88 {
89 instrument_id.venue = *venue;
90 }
91
92 Ok(instrument_id)
93}
94
95pub fn get_nautilus_instrument_id_for_record(
105 record: &dbn::RecordRef,
106 metadata: &mut MetadataCache,
107 venue: Venue,
108) -> anyhow::Result<InstrumentId> {
109 let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
110 (msg.hd.instrument_id, msg.ts_recv)
111 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
112 (msg.hd.instrument_id, msg.ts_recv)
113 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
114 (msg.hd.instrument_id, msg.ts_recv)
115 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
116 (msg.hd.instrument_id, msg.ts_recv)
117 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
118 (msg.hd.instrument_id, msg.ts_recv)
119 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
120 (msg.hd.instrument_id, msg.ts_recv)
121 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
122 (msg.hd.instrument_id, msg.hd.ts_event)
123 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
124 (msg.hd.instrument_id, msg.ts_recv)
125 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
126 (msg.hd.instrument_id, msg.ts_recv)
127 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
128 (msg.hd.instrument_id, msg.ts_recv)
129 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
130 (msg.hd.instrument_id, msg.ts_recv)
131 } else {
132 anyhow::bail!("DBN message type is not currently supported")
133 };
134
135 let duration = time::Duration::nanoseconds(nanoseconds as i64);
136 let datetime = time::OffsetDateTime::UNIX_EPOCH
137 .checked_add(duration)
138 .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for record"))?;
139 let date = datetime.date();
140 let symbol_map = metadata.symbol_map_for_date(date)?;
141 let raw_symbol = symbol_map
142 .get(instrument_id)
143 .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
144
145 let symbol = Symbol::from_str_unchecked(raw_symbol);
146
147 Ok(InstrumentId::new(symbol, venue))
148}
149
150#[must_use]
151pub fn infer_symbology_type(symbol: &str) -> SType {
152 if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
153 return SType::Parent;
154 }
155
156 let parts: Vec<&str> = symbol.split('.').collect();
157 if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
158 return SType::Continuous;
159 }
160
161 if symbol.chars().all(|c| c.is_ascii_digit()) {
162 return SType::InstrumentId;
163 }
164
165 SType::RawSymbol
166}
167
168pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
172 if symbols.is_empty() {
173 anyhow::bail!("No symbols provided");
174 }
175 let first_symbol = symbols[0];
176 let first_stype = infer_symbology_type(first_symbol);
177
178 for symbol in symbols {
179 let next_stype = infer_symbology_type(symbol);
180 if next_stype != first_stype {
181 anyhow::bail!(
182 "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
183 first_stype,
184 first_symbol,
185 next_stype,
186 symbol
187 );
188 }
189 }
190
191 Ok(())
192}
193
194#[cfg(test)]
198mod tests {
199 use rstest::*;
200
201 use super::*;
202
203 #[rstest]
204 #[case("1", "instrument_id")]
205 #[case("123456789", "instrument_id")]
206 #[case("AAPL", "raw_symbol")]
207 #[case("ESM4", "raw_symbol")]
208 #[case("BRN FMM0024!", "raw_symbol")]
209 #[case("BRN 99 5617289", "raw_symbol")]
210 #[case("SPY 240319P00511000", "raw_symbol")]
211 #[case("ES.FUT", "parent")]
212 #[case("ES.OPT", "parent")]
213 #[case("BRN.FUT", "parent")]
214 #[case("SPX.OPT", "parent")]
215 #[case("ES.c.0", "continuous")]
216 #[case("SPX.n.0", "continuous")]
217 fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
218 let result = infer_symbology_type(&symbol);
219 assert_eq!(result, expected);
220 }
221
222 #[rstest]
223 fn test_check_consistent_symbology_when_empty_symbols() {
224 let symbols: Vec<&str> = vec![];
225 assert!(check_consistent_symbology(&symbols).is_err());
226 }
227
228 #[rstest]
229 fn test_instrument_id_to_symbol_string_updates_map() {
230 use nautilus_model::identifiers::Venue;
231 let symbol = Symbol::from("TEST");
232 let venue = Venue::from("XNAS");
233 let instrument_id = InstrumentId::new(symbol, venue);
234 let mut map: AHashMap<Symbol, Venue> = AHashMap::new();
235
236 let sym_str = instrument_id_to_symbol_string(instrument_id, &mut map);
238 assert_eq!(sym_str, "TEST");
239 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
240
241 let other = Venue::from("XLON");
243 let inst2 = InstrumentId::new(Symbol::from("TEST"), other);
244 let sym_str2 = instrument_id_to_symbol_string(inst2, &mut map);
245 assert_eq!(sym_str2, "TEST");
246
247 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
249 }
250
251 #[rstest]
252 fn test_check_consistent_symbology_when_inconsistent() {
253 let symbols = vec!["ESM4", "ES.OPT"];
254 let result = check_consistent_symbology(&symbols);
255 assert!(result.is_err());
256 assert_eq!(
257 result.err().unwrap().to_string(),
258 "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
259 );
260 }
261
262 #[rstest]
263 #[case(vec!["AAPL,MSFT"])]
264 #[case(vec!["ES.OPT,ES.FUT"])]
265 #[case(vec!["ES.c.0,ES.c.1"])]
266 fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
267 let result = check_consistent_symbology(&symbols);
268 assert!(result.is_ok());
269 }
270}