nautilus_databento/
symbology.rs1use ahash::AHashMap;
17use databento::dbn::{self, PitSymbolMap, SType};
18use dbn::{Publisher, Record};
19use indexmap::IndexMap;
20use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
21
22use super::types::PublisherId;
23
24#[derive(Debug)]
25pub struct MetadataCache {
26 metadata: dbn::Metadata,
27 date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
28}
29
30impl MetadataCache {
31 #[must_use]
32 pub fn new(metadata: dbn::Metadata) -> Self {
33 Self {
34 metadata,
35 date_metadata_map: AHashMap::new(),
36 }
37 }
38
39 pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43 if !self.date_metadata_map.contains_key(&date) {
44 let map = self.metadata.symbol_map_for_date(date)?;
45 self.date_metadata_map.insert(date, map);
46 }
47
48 self.date_metadata_map
49 .get(&date)
50 .ok_or_else(|| dbn::Error::decode(format!("metadata cache missing for date {date}")))
51 }
52}
53
54pub fn instrument_id_to_symbol_string(
55 instrument_id: InstrumentId,
56 symbol_venue_map: &mut AHashMap<Symbol, Venue>,
57) -> String {
58 symbol_venue_map
59 .entry(instrument_id.symbol)
60 .or_insert(instrument_id.venue);
61 instrument_id.symbol.to_string()
62}
63
64pub fn decode_nautilus_instrument_id(
73 record: &dbn::RecordRef,
74 metadata: &mut MetadataCache,
75 publisher_venue_map: &IndexMap<PublisherId, Venue>,
76 symbol_venue_map: &AHashMap<Symbol, Venue>,
77) -> anyhow::Result<InstrumentId> {
78 let publisher = record
79 .publisher()
80 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
81 let publisher_id = publisher as PublisherId;
82 let venue = publisher_venue_map
83 .get(&publisher_id)
84 .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
85 let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
86 if publisher == Publisher::GlbxMdp3Glbx
87 && let Some(venue) = symbol_venue_map.get(&instrument_id.symbol)
88 {
89 instrument_id.venue = *venue;
90 }
91
92 Ok(instrument_id)
93}
94
95pub fn get_nautilus_instrument_id_for_record(
105 record: &dbn::RecordRef,
106 metadata: &mut MetadataCache,
107 venue: Venue,
108) -> anyhow::Result<InstrumentId> {
109 let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
110 (msg.hd.instrument_id, msg.ts_recv)
111 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
112 (msg.hd.instrument_id, msg.ts_recv)
113 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
114 (msg.hd.instrument_id, msg.ts_recv)
115 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
116 (msg.hd.instrument_id, msg.ts_recv)
117 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
118 (msg.hd.instrument_id, msg.ts_recv)
119 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
120 (msg.hd.instrument_id, msg.ts_recv)
121 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
122 (msg.hd.instrument_id, msg.hd.ts_event)
123 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
124 (msg.hd.instrument_id, msg.ts_recv)
125 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
126 (msg.hd.instrument_id, msg.ts_recv)
127 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
128 (msg.hd.instrument_id, msg.ts_recv)
129 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
130 (msg.hd.instrument_id, msg.ts_recv)
131 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
132 (msg.hd.instrument_id, msg.ts_recv)
133 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
134 (msg.hd.instrument_id, msg.ts_recv)
135 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
136 (msg.hd.instrument_id, msg.ts_recv)
137 } else {
138 anyhow::bail!("DBN message type is not currently supported")
139 };
140
141 let duration = time::Duration::nanoseconds(nanoseconds as i64);
142 let datetime = time::OffsetDateTime::UNIX_EPOCH
143 .checked_add(duration)
144 .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for record"))?;
145 let date = datetime.date();
146 let symbol_map = metadata.symbol_map_for_date(date)?;
147 let raw_symbol = symbol_map
148 .get(instrument_id)
149 .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
150
151 let symbol = Symbol::from_str_unchecked(raw_symbol);
152
153 Ok(InstrumentId::new(symbol, venue))
154}
155
156#[must_use]
157pub fn infer_symbology_type(symbol: &str) -> SType {
158 if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
159 return SType::Parent;
160 }
161
162 let parts: Vec<&str> = symbol.split('.').collect();
163 if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
164 return SType::Continuous;
165 }
166
167 if symbol.chars().all(|c| c.is_ascii_digit()) {
168 return SType::InstrumentId;
169 }
170
171 SType::RawSymbol
172}
173
174pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
178 if symbols.is_empty() {
179 anyhow::bail!("No symbols provided");
180 }
181 let first_symbol = symbols[0];
182 let first_stype = infer_symbology_type(first_symbol);
183
184 for symbol in symbols {
185 let next_stype = infer_symbology_type(symbol);
186 if next_stype != first_stype {
187 anyhow::bail!(
188 "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
189 first_stype,
190 first_symbol,
191 next_stype,
192 symbol
193 );
194 }
195 }
196
197 Ok(())
198}
199
200#[cfg(test)]
204mod tests {
205 use rstest::*;
206
207 use super::*;
208
209 #[rstest]
210 #[case("1", "instrument_id")]
211 #[case("123456789", "instrument_id")]
212 #[case("AAPL", "raw_symbol")]
213 #[case("ESM4", "raw_symbol")]
214 #[case("BRN FMM0024!", "raw_symbol")]
215 #[case("BRN 99 5617289", "raw_symbol")]
216 #[case("SPY 240319P00511000", "raw_symbol")]
217 #[case("ES.FUT", "parent")]
218 #[case("ES.OPT", "parent")]
219 #[case("BRN.FUT", "parent")]
220 #[case("SPX.OPT", "parent")]
221 #[case("ES.c.0", "continuous")]
222 #[case("SPX.n.0", "continuous")]
223 fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
224 let result = infer_symbology_type(&symbol);
225 assert_eq!(result, expected);
226 }
227
228 #[rstest]
229 fn test_check_consistent_symbology_when_empty_symbols() {
230 let symbols: Vec<&str> = vec![];
231 assert!(check_consistent_symbology(&symbols).is_err());
232 }
233
234 #[rstest]
235 fn test_instrument_id_to_symbol_string_updates_map() {
236 let symbol = Symbol::from("TEST");
237 let venue = Venue::from("XNAS");
238 let instrument_id = InstrumentId::new(symbol, venue);
239 let mut map: AHashMap<Symbol, Venue> = AHashMap::new();
240
241 let sym_str = instrument_id_to_symbol_string(instrument_id, &mut map);
243 assert_eq!(sym_str, "TEST");
244 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
245
246 let other = Venue::from("XLON");
248 let inst2 = InstrumentId::new(Symbol::from("TEST"), other);
249 let sym_str2 = instrument_id_to_symbol_string(inst2, &mut map);
250 assert_eq!(sym_str2, "TEST");
251
252 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
254 }
255
256 #[rstest]
257 fn test_check_consistent_symbology_when_inconsistent() {
258 let symbols = vec!["ESM4", "ES.OPT"];
259 let result = check_consistent_symbology(&symbols);
260 assert!(result.is_err());
261 assert_eq!(
262 result.err().unwrap().to_string(),
263 "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
264 );
265 }
266
267 #[rstest]
268 #[case(vec!["AAPL,MSFT"])]
269 #[case(vec!["ES.OPT,ES.FUT"])]
270 #[case(vec!["ES.c.0,ES.c.1"])]
271 fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
272 let result = check_consistent_symbology(&symbols);
273 assert!(result.is_ok());
274 }
275}