nautilus_databento/
symbology.rs1use ahash::AHashMap;
17use databento::dbn::{self, PitSymbolMap, SType};
18use dbn::{Publisher, Record};
19use indexmap::IndexMap;
20use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
21
22use super::types::PublisherId;
23
24#[derive(Debug)]
25pub struct MetadataCache {
26 metadata: dbn::Metadata,
27 date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
28}
29
30impl MetadataCache {
31 #[must_use]
32 pub fn new(metadata: dbn::Metadata) -> Self {
33 Self {
34 metadata,
35 date_metadata_map: AHashMap::new(),
36 }
37 }
38
39 pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43 if !self.date_metadata_map.contains_key(&date) {
44 let map = self.metadata.symbol_map_for_date(date)?;
45 self.date_metadata_map.insert(date, map);
46 }
47
48 self.date_metadata_map
49 .get(&date)
50 .ok_or_else(|| dbn::Error::decode(format!("metadata cache missing for date {date}")))
51 }
52}
53
54pub fn instrument_id_to_symbol_string(
55 instrument_id: InstrumentId,
56 symbol_venue_map: &mut AHashMap<Symbol, Venue>,
57) -> String {
58 symbol_venue_map
59 .entry(instrument_id.symbol)
60 .or_insert(instrument_id.venue);
61 instrument_id.symbol.to_string()
62}
63
64pub fn decode_nautilus_instrument_id(
73 record: &dbn::RecordRef,
74 metadata: &mut MetadataCache,
75 publisher_venue_map: &IndexMap<PublisherId, Venue>,
76 symbol_venue_map: &AHashMap<Symbol, Venue>,
77) -> anyhow::Result<InstrumentId> {
78 let publisher = record
79 .publisher()
80 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
81 let publisher_id = publisher as PublisherId;
82 let venue = publisher_venue_map
83 .get(&publisher_id)
84 .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
85 let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
86 if publisher == Publisher::GlbxMdp3Glbx
87 && let Some(venue) = symbol_venue_map.get(&instrument_id.symbol)
88 {
89 instrument_id.venue = *venue;
90 }
91
92 Ok(instrument_id)
93}
94
95pub fn get_nautilus_instrument_id_for_record(
105 record: &dbn::RecordRef,
106 metadata: &mut MetadataCache,
107 venue: Venue,
108) -> anyhow::Result<InstrumentId> {
109 let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
110 (msg.hd.instrument_id, msg.ts_recv)
111 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
112 (msg.hd.instrument_id, msg.ts_recv)
113 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
114 (msg.hd.instrument_id, msg.ts_recv)
115 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
116 (msg.hd.instrument_id, msg.ts_recv)
117 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
118 (msg.hd.instrument_id, msg.ts_recv)
119 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
120 (msg.hd.instrument_id, msg.ts_recv)
121 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
122 (msg.hd.instrument_id, msg.hd.ts_event)
123 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
124 (msg.hd.instrument_id, msg.ts_recv)
125 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
126 (msg.hd.instrument_id, msg.ts_recv)
127 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
128 (msg.hd.instrument_id, msg.ts_recv)
129 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
130 (msg.hd.instrument_id, msg.ts_recv)
131 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
132 (msg.hd.instrument_id, msg.ts_recv)
133 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
134 (msg.hd.instrument_id, msg.ts_recv)
135 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
136 (msg.hd.instrument_id, msg.ts_recv)
137 } else {
138 anyhow::bail!("DBN message type is not currently supported")
139 };
140
141 let duration = time::Duration::nanoseconds(nanoseconds as i64);
142 let datetime = time::OffsetDateTime::UNIX_EPOCH
143 .checked_add(duration)
144 .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for record"))?;
145 let date = datetime.date();
146 let symbol_map = metadata.symbol_map_for_date(date)?;
147 let raw_symbol = symbol_map
148 .get(instrument_id)
149 .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
150
151 let symbol = Symbol::from_str_unchecked(raw_symbol);
152
153 Ok(InstrumentId::new(symbol, venue))
154}
155
156#[must_use]
157pub fn infer_symbology_type(symbol: &str) -> SType {
158 if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
159 return SType::Parent;
160 }
161
162 let parts: Vec<&str> = symbol.split('.').collect();
163 if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
164 return SType::Continuous;
165 }
166
167 if symbol.chars().all(|c| c.is_ascii_digit()) {
168 return SType::InstrumentId;
169 }
170
171 SType::RawSymbol
172}
173
174pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
178 if symbols.is_empty() {
179 anyhow::bail!("No symbols provided");
180 }
181 let first_symbol = symbols[0];
182 let first_stype = infer_symbology_type(first_symbol);
183
184 for symbol in symbols {
185 let next_stype = infer_symbology_type(symbol);
186 if next_stype != first_stype {
187 anyhow::bail!(
188 "Inconsistent symbology types: '{first_stype}' for {first_symbol} vs '{next_stype}' for {symbol}"
189 );
190 }
191 }
192
193 Ok(())
194}
195
196#[cfg(test)]
197mod tests {
198 use rstest::*;
199
200 use super::*;
201
202 #[rstest]
203 #[case("1", "instrument_id")]
204 #[case("123456789", "instrument_id")]
205 #[case("AAPL", "raw_symbol")]
206 #[case("ESM4", "raw_symbol")]
207 #[case("BRN FMM0024!", "raw_symbol")]
208 #[case("BRN 99 5617289", "raw_symbol")]
209 #[case("SPY 240319P00511000", "raw_symbol")]
210 #[case("ES.FUT", "parent")]
211 #[case("ES.OPT", "parent")]
212 #[case("BRN.FUT", "parent")]
213 #[case("SPX.OPT", "parent")]
214 #[case("ES.c.0", "continuous")]
215 #[case("SPX.n.0", "continuous")]
216 fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
217 let result = infer_symbology_type(&symbol);
218 assert_eq!(result, expected);
219 }
220
221 #[rstest]
222 fn test_check_consistent_symbology_when_empty_symbols() {
223 let symbols: Vec<&str> = vec![];
224 assert!(check_consistent_symbology(&symbols).is_err());
225 }
226
227 #[rstest]
228 fn test_instrument_id_to_symbol_string_updates_map() {
229 let symbol = Symbol::from("TEST");
230 let venue = Venue::from("XNAS");
231 let instrument_id = InstrumentId::new(symbol, venue);
232 let mut map: AHashMap<Symbol, Venue> = AHashMap::new();
233
234 let sym_str = instrument_id_to_symbol_string(instrument_id, &mut map);
236 assert_eq!(sym_str, "TEST");
237 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
238
239 let other = Venue::from("XLON");
241 let inst2 = InstrumentId::new(Symbol::from("TEST"), other);
242 let sym_str2 = instrument_id_to_symbol_string(inst2, &mut map);
243 assert_eq!(sym_str2, "TEST");
244
245 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
247 }
248
249 #[rstest]
250 fn test_check_consistent_symbology_when_inconsistent() {
251 let symbols = vec!["ESM4", "ES.OPT"];
252 let result = check_consistent_symbology(&symbols);
253 assert!(result.is_err());
254 assert_eq!(
255 result.err().unwrap().to_string(),
256 "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
257 );
258 }
259
260 #[rstest]
261 #[case(vec!["AAPL,MSFT"])]
262 #[case(vec!["ES.OPT,ES.FUT"])]
263 #[case(vec!["ES.c.0,ES.c.1"])]
264 fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
265 let result = check_consistent_symbology(&symbols);
266 assert!(result.is_ok());
267 }
268}