nautilus_databento/
symbology.rs1use ahash::AHashMap;
17use databento::dbn::{self, PitSymbolMap, SType};
18use dbn::{Publisher, Record};
19use indexmap::IndexMap;
20use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
21
22use super::types::PublisherId;
23
24#[derive(Debug)]
25pub struct MetadataCache {
26 metadata: dbn::Metadata,
27 date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
28}
29
30impl MetadataCache {
31 #[must_use]
32 pub fn new(metadata: dbn::Metadata) -> Self {
33 Self {
34 metadata,
35 date_metadata_map: AHashMap::new(),
36 }
37 }
38
39 pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43 if !self.date_metadata_map.contains_key(&date) {
44 let map = self.metadata.symbol_map_for_date(date)?;
45 self.date_metadata_map.insert(date, map);
46 }
47
48 self.date_metadata_map
49 .get(&date)
50 .ok_or_else(|| dbn::Error::decode(format!("metadata cache missing for date {date}")))
51 }
52}
53
54pub fn instrument_id_to_symbol_string(
55 instrument_id: InstrumentId,
56 symbol_venue_map: &mut AHashMap<Symbol, Venue>,
57) -> String {
58 symbol_venue_map
59 .entry(instrument_id.symbol)
60 .or_insert(instrument_id.venue);
61 instrument_id.symbol.to_string()
62}
63
64pub fn decode_nautilus_instrument_id(
72 record: &dbn::RecordRef,
73 metadata: &mut MetadataCache,
74 publisher_venue_map: &IndexMap<PublisherId, Venue>,
75 symbol_venue_map: &AHashMap<Symbol, Venue>,
76) -> anyhow::Result<InstrumentId> {
77 let publisher = record
78 .publisher()
79 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
80 let publisher_id = publisher as PublisherId;
81 let venue = publisher_venue_map
82 .get(&publisher_id)
83 .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
84 let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
85 if publisher == Publisher::GlbxMdp3Glbx
86 && let Some(venue) = symbol_venue_map.get(&instrument_id.symbol)
87 {
88 instrument_id.venue = *venue;
89 }
90
91 Ok(instrument_id)
92}
93
94pub fn get_nautilus_instrument_id_for_record(
102 record: &dbn::RecordRef,
103 metadata: &mut MetadataCache,
104 venue: Venue,
105) -> anyhow::Result<InstrumentId> {
106 let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
107 (msg.hd.instrument_id, msg.ts_recv)
108 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
109 (msg.hd.instrument_id, msg.ts_recv)
110 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
111 (msg.hd.instrument_id, msg.ts_recv)
112 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
113 (msg.hd.instrument_id, msg.ts_recv)
114 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
115 (msg.hd.instrument_id, msg.ts_recv)
116 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
117 (msg.hd.instrument_id, msg.ts_recv)
118 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
119 (msg.hd.instrument_id, msg.hd.ts_event)
120 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
121 (msg.hd.instrument_id, msg.ts_recv)
122 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
123 (msg.hd.instrument_id, msg.ts_recv)
124 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
125 (msg.hd.instrument_id, msg.ts_recv)
126 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
127 (msg.hd.instrument_id, msg.ts_recv)
128 } else {
129 anyhow::bail!("DBN message type is not currently supported")
130 };
131
132 let duration = time::Duration::nanoseconds(nanoseconds as i64);
133 let datetime = time::OffsetDateTime::UNIX_EPOCH
134 .checked_add(duration)
135 .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for record"))?;
136 let date = datetime.date();
137 let symbol_map = metadata.symbol_map_for_date(date)?;
138 let raw_symbol = symbol_map
139 .get(instrument_id)
140 .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
141
142 let symbol = Symbol::from_str_unchecked(raw_symbol);
143
144 Ok(InstrumentId::new(symbol, venue))
145}
146
147#[must_use]
148pub fn infer_symbology_type(symbol: &str) -> SType {
149 if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
150 return SType::Parent;
151 }
152
153 let parts: Vec<&str> = symbol.split('.').collect();
154 if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
155 return SType::Continuous;
156 }
157
158 if symbol.chars().all(|c| c.is_ascii_digit()) {
159 return SType::InstrumentId;
160 }
161
162 SType::RawSymbol
163}
164
165pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
169 if symbols.is_empty() {
170 anyhow::bail!("No symbols provided");
171 }
172 let first_symbol = symbols[0];
173 let first_stype = infer_symbology_type(first_symbol);
174
175 for symbol in symbols {
176 let next_stype = infer_symbology_type(symbol);
177 if next_stype != first_stype {
178 anyhow::bail!(
179 "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
180 first_stype,
181 first_symbol,
182 next_stype,
183 symbol
184 );
185 }
186 }
187
188 Ok(())
189}
190
191#[cfg(test)]
195mod tests {
196 use rstest::*;
197
198 use super::*;
199
200 #[rstest]
201 #[case("1", "instrument_id")]
202 #[case("123456789", "instrument_id")]
203 #[case("AAPL", "raw_symbol")]
204 #[case("ESM4", "raw_symbol")]
205 #[case("BRN FMM0024!", "raw_symbol")]
206 #[case("BRN 99 5617289", "raw_symbol")]
207 #[case("SPY 240319P00511000", "raw_symbol")]
208 #[case("ES.FUT", "parent")]
209 #[case("ES.OPT", "parent")]
210 #[case("BRN.FUT", "parent")]
211 #[case("SPX.OPT", "parent")]
212 #[case("ES.c.0", "continuous")]
213 #[case("SPX.n.0", "continuous")]
214 fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
215 let result = infer_symbology_type(&symbol);
216 assert_eq!(result, expected);
217 }
218
219 #[rstest]
220 fn test_check_consistent_symbology_when_empty_symbols() {
221 let symbols: Vec<&str> = vec![];
222 assert!(check_consistent_symbology(&symbols).is_err());
223 }
224
225 #[rstest]
226 fn test_instrument_id_to_symbol_string_updates_map() {
227 use nautilus_model::identifiers::Venue;
228 let symbol = Symbol::from("TEST");
229 let venue = Venue::from("XNAS");
230 let instrument_id = InstrumentId::new(symbol, venue);
231 let mut map: AHashMap<Symbol, Venue> = AHashMap::new();
232
233 let sym_str = instrument_id_to_symbol_string(instrument_id, &mut map);
235 assert_eq!(sym_str, "TEST");
236 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
237
238 let other = Venue::from("XLON");
240 let inst2 = InstrumentId::new(Symbol::from("TEST"), other);
241 let sym_str2 = instrument_id_to_symbol_string(inst2, &mut map);
242 assert_eq!(sym_str2, "TEST");
243
244 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
246 }
247
248 #[rstest]
249 fn test_check_consistent_symbology_when_inconsistent() {
250 let symbols = vec!["ESM4", "ES.OPT"];
251 let result = check_consistent_symbology(&symbols);
252 assert!(result.is_err());
253 assert_eq!(
254 result.err().unwrap().to_string(),
255 "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
256 );
257 }
258
259 #[rstest]
260 #[case(vec!["AAPL,MSFT"])]
261 #[case(vec!["ES.OPT,ES.FUT"])]
262 #[case(vec!["ES.c.0,ES.c.1"])]
263 fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
264 let result = check_consistent_symbology(&symbols);
265 assert!(result.is_ok());
266 }
267}