nautilus_databento/
symbology.rs1use std::collections::HashMap;
17
18use databento::dbn::{self, SType};
19use dbn::{Publisher, Record};
20use indexmap::IndexMap;
21use nautilus_core::correctness::check_slice_not_empty;
22use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
23
24use super::types::PublisherId;
25
26pub fn instrument_id_to_symbol_string(
27 instrument_id: InstrumentId,
28 symbol_venue_map: &mut HashMap<Symbol, Venue>,
29) -> String {
30 let venue = instrument_id.venue;
31 if venue == Venue::CBCM()
32 || venue == Venue::NYUM()
33 || venue == Venue::XCBT()
34 || venue == Venue::XCEC()
35 || venue == Venue::XCME()
36 || venue == Venue::XFXS()
37 || venue == Venue::XNYM()
38 {
39 symbol_venue_map.insert(instrument_id.symbol, venue);
40 }
41
42 instrument_id.symbol.to_string()
43}
44
45pub fn decode_nautilus_instrument_id(
46 record: &dbn::RecordRef,
47 metadata: &dbn::Metadata,
48 publisher_venue_map: &IndexMap<PublisherId, Venue>,
49 symbol_venue_map: &HashMap<Symbol, Venue>,
50) -> anyhow::Result<InstrumentId> {
51 let publisher = record.publisher().expect("Invalid `publisher` for record");
52 let publisher_id = publisher as PublisherId;
53 let venue = publisher_venue_map
54 .get(&publisher_id)
55 .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
56 let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
57 if publisher == Publisher::GlbxMdp3Glbx {
58 if let Some(venue) = symbol_venue_map.get(&instrument_id.symbol) {
59 instrument_id.venue = *venue;
60 }
61 }
62
63 Ok(instrument_id)
64}
65
66pub fn get_nautilus_instrument_id_for_record(
67 record: &dbn::RecordRef,
68 metadata: &dbn::Metadata,
69 venue: Venue,
70) -> anyhow::Result<InstrumentId> {
71 let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
72 (msg.hd.instrument_id, msg.ts_recv)
73 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
74 (msg.hd.instrument_id, msg.ts_recv)
75 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
76 (msg.hd.instrument_id, msg.ts_recv)
77 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
78 (msg.hd.instrument_id, msg.ts_recv)
79 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
80 (msg.hd.instrument_id, msg.ts_recv)
81 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
82 (msg.hd.instrument_id, msg.ts_recv)
83 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
84 (msg.hd.instrument_id, msg.hd.ts_event)
85 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
86 (msg.hd.instrument_id, msg.ts_recv)
87 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
88 (msg.hd.instrument_id, msg.ts_recv)
89 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
90 (msg.hd.instrument_id, msg.ts_recv)
91 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
92 (msg.hd.instrument_id, msg.ts_recv)
93 } else {
94 anyhow::bail!("DBN message type is not currently supported")
95 };
96
97 let duration = time::Duration::nanoseconds(nanoseconds as i64);
98 let datetime = time::OffsetDateTime::UNIX_EPOCH
99 .checked_add(duration)
100 .unwrap(); let date = datetime.date();
102 let symbol_map = metadata.symbol_map_for_date(date)?;
103 let raw_symbol = symbol_map
104 .get(instrument_id)
105 .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
106
107 let symbol = Symbol::from_str_unchecked(raw_symbol);
108
109 Ok(InstrumentId::new(symbol, venue))
110}
111
112#[must_use]
113pub fn infer_symbology_type(symbol: &str) -> SType {
114 if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
115 return SType::Parent;
116 }
117
118 let parts: Vec<&str> = symbol.split('.').collect();
119 if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
120 return SType::Continuous;
121 }
122
123 if symbol.chars().all(|c| c.is_ascii_digit()) {
124 return SType::InstrumentId;
125 }
126
127 SType::RawSymbol
128}
129
130pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
131 check_slice_not_empty(symbols, stringify!(symbols)).unwrap();
132
133 let first_symbol = symbols.first().unwrap();
135 let first_stype = infer_symbology_type(first_symbol);
136
137 for symbol in symbols {
138 let next_stype = infer_symbology_type(symbol);
139 if next_stype != first_stype {
140 return Err(anyhow::anyhow!(
141 "Inconsistent symbology types: '{}' for {} vs '{}' for {}",
142 first_stype,
143 first_symbol,
144 next_stype,
145 symbol
146 ));
147 }
148 }
149
150 Ok(())
151}
152
153#[cfg(test)]
157mod tests {
158 use rstest::*;
159
160 use super::*;
161
162 #[rstest]
163 #[case("1", "instrument_id")]
164 #[case("123456789", "instrument_id")]
165 #[case("AAPL", "raw_symbol")]
166 #[case("ESM4", "raw_symbol")]
167 #[case("BRN FMM0024!", "raw_symbol")]
168 #[case("BRN 99 5617289", "raw_symbol")]
169 #[case("SPY 240319P00511000", "raw_symbol")]
170 #[case("ES.FUT", "parent")]
171 #[case("ES.OPT", "parent")]
172 #[case("BRN.FUT", "parent")]
173 #[case("SPX.OPT", "parent")]
174 #[case("ES.c.0", "continuous")]
175 #[case("SPX.n.0", "continuous")]
176 fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
177 let result = infer_symbology_type(&symbol);
178 assert_eq!(result, expected);
179 }
180
181 #[rstest]
182 #[should_panic]
183 fn test_check_consistent_symbology_when_empty_symbols() {
184 let symbols: Vec<&str> = vec![];
185 let _ = check_consistent_symbology(&symbols);
186 }
187
188 #[rstest]
189 fn test_check_consistent_symbology_when_inconsistent() {
190 let symbols = vec!["ESM4", "ES.OPT"];
191 let result = check_consistent_symbology(&symbols);
192 assert!(result.is_err());
193 assert_eq!(
194 result.err().unwrap().to_string(),
195 "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
196 );
197 }
198
199 #[rstest]
200 #[case(vec!["AAPL,MSFT"])]
201 #[case(vec!["ES.OPT,ES.FUT"])]
202 #[case(vec!["ES.c.0,ES.c.1"])]
203 fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
204 let result = check_consistent_symbology(&symbols);
205 assert!(result.is_ok());
206 }
207}