1use std::{
17 collections::HashMap,
18 sync::{Arc, RwLock},
19};
20
21use ahash::{HashSet, HashSetExt};
22use databento::{
23 dbn::{self, PitSymbolMap, Record, SymbolIndex, VersionUpgradePolicy},
24 live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30 enums::RecordFlag,
31 identifiers::{InstrumentId, Symbol, Venue},
32 instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38 types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41 decode::{decode_instrument_def_msg, decode_record},
42 types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47 Subscribe(Subscription),
48 Start,
49 Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] pub enum LiveMessage {
55 Data(Data),
56 Instrument(InstrumentAny),
57 Status(InstrumentStatus),
58 Imbalance(DatabentoImbalance),
59 Statistics(DatabentoStatistics),
60 Error(anyhow::Error),
61 Close,
62}
63
64#[derive(Debug)]
70pub struct DatabentoFeedHandler {
71 key: String,
72 dataset: String,
73 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
74 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
75 publisher_venue_map: IndexMap<PublisherId, Venue>,
76 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
77 replay: bool,
78 use_exchange_as_venue: bool,
79}
80
81impl DatabentoFeedHandler {
82 #[must_use]
84 pub const fn new(
85 key: String,
86 dataset: String,
87 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
88 tx: tokio::sync::mpsc::Sender<LiveMessage>,
89 publisher_venue_map: IndexMap<PublisherId, Venue>,
90 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
91 use_exchange_as_venue: bool,
92 ) -> Self {
93 Self {
94 key,
95 dataset,
96 cmd_rx: rx,
97 msg_tx: tx,
98 publisher_venue_map,
99 symbol_venue_map,
100 replay: false,
101 use_exchange_as_venue,
102 }
103 }
104
105 pub async fn run(&mut self) -> anyhow::Result<()> {
107 tracing::debug!("Running feed handler");
108 let clock = get_atomic_clock_realtime();
109 let mut symbol_map = PitSymbolMap::new();
110 let mut instrument_id_map: HashMap<u32, InstrumentId> = HashMap::new();
111
112 let mut buffering_start = None;
113 let mut buffered_deltas: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
114 let mut deltas_count = 0_u64;
115 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
118 timeout,
119 databento::LiveClient::builder()
120 .key(self.key.clone())?
121 .dataset(self.dataset.clone())
122 .upgrade_policy(VersionUpgradePolicy::UpgradeToV2)
123 .build(),
124 )
125 .await?;
126 tracing::info!("Connected");
127
128 let mut client = if let Ok(client) = result {
129 client
130 } else {
131 self.msg_tx.send(LiveMessage::Close).await?;
132 self.cmd_rx.close();
133 anyhow::bail!("Timeout connecting to LSG");
134 };
135
136 let timeout = Duration::from_millis(10);
138
139 let mut running = false;
141
142 loop {
143 if self.msg_tx.is_closed() {
144 tracing::debug!("Message channel was closed: stopping");
145 break;
146 }
147
148 match self.cmd_rx.try_recv() {
149 Ok(cmd) => {
150 tracing::debug!("Received command: {cmd:?}");
151 match cmd {
152 LiveCommand::Subscribe(sub) => {
153 if !self.replay & sub.start.is_some() {
154 self.replay = true;
155 }
156 client.subscribe(sub).await.map_err(to_pyruntime_err)?;
157 }
158 LiveCommand::Start => {
159 buffering_start = if self.replay {
160 Some(clock.get_time_ns())
161 } else {
162 None
163 };
164 client.start().await.map_err(to_pyruntime_err)?;
165 running = true;
166 tracing::debug!("Started");
167 }
168 LiveCommand::Close => {
169 self.msg_tx.send(LiveMessage::Close).await?;
170 if running {
171 client.close().await.map_err(to_pyruntime_err)?;
172 tracing::debug!("Closed inner client");
173 }
174 break;
175 }
176 }
177 }
178 Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => {
180 tracing::debug!("Disconnected");
181 break;
182 }
183 }
184
185 if !running {
186 continue;
187 }
188
189 let result = tokio::time::timeout(timeout, client.next_record()).await;
191 let record_opt = match result {
192 Ok(record_opt) => record_opt,
193 Err(_) => continue, };
195
196 let record = match record_opt {
197 Ok(Some(record)) => record,
198 Ok(None) => break, Err(e) => {
200 self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
203 break;
204 }
205 };
206
207 let ts_init = clock.get_time_ns();
208 let mut initialized_books = HashSet::new();
209
210 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
212 handle_error_msg(msg);
213 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
214 handle_system_msg(msg);
215 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
216 instrument_id_map.remove(&msg.hd.instrument_id);
218 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map);
219 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
220 if self.use_exchange_as_venue {
221 update_instrument_id_map_with_exchange(
222 &symbol_map,
223 &self.symbol_venue_map,
224 &mut instrument_id_map,
225 msg.hd.instrument_id,
226 msg.exchange()?,
227 );
228 }
229 let data = handle_instrument_def_msg(
230 msg,
231 &record,
232 &symbol_map,
233 &self.publisher_venue_map,
234 &self.symbol_venue_map.read().unwrap(),
235 &mut instrument_id_map,
236 ts_init,
237 )?;
238 self.send_msg(LiveMessage::Instrument(data)).await;
239 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
240 let data = handle_status_msg(
241 msg,
242 &record,
243 &symbol_map,
244 &self.publisher_venue_map,
245 &self.symbol_venue_map.read().unwrap(),
246 &mut instrument_id_map,
247 ts_init,
248 )?;
249 self.send_msg(LiveMessage::Status(data)).await;
250 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
251 let data = handle_imbalance_msg(
252 msg,
253 &record,
254 &symbol_map,
255 &self.publisher_venue_map,
256 &self.symbol_venue_map.read().unwrap(),
257 &mut instrument_id_map,
258 ts_init,
259 )?;
260 self.send_msg(LiveMessage::Imbalance(data)).await;
261 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
262 let data = handle_statistics_msg(
263 msg,
264 &record,
265 &symbol_map,
266 &self.publisher_venue_map,
267 &self.symbol_venue_map.read().unwrap(),
268 &mut instrument_id_map,
269 ts_init,
270 )?;
271 self.send_msg(LiveMessage::Statistics(data)).await;
272 } else {
273 let (mut data1, data2) = match handle_record(
274 record,
275 &symbol_map,
276 &self.publisher_venue_map,
277 &self.symbol_venue_map.read().unwrap(),
278 &mut instrument_id_map,
279 ts_init,
280 &initialized_books,
281 ) {
282 Ok(decoded) => decoded,
283 Err(e) => {
284 tracing::error!("Error decoding record: {e}");
285 continue;
286 }
287 };
288
289 if let Some(msg) = record.get::<dbn::MboMsg>() {
290 if let Some(Data::Delta(delta)) = &data1 {
292 initialized_books.insert(delta.instrument_id);
293 } else {
294 continue; }
296
297 if let Data::Delta(delta) = data1.clone().expect("MBO should decode a delta") {
298 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
299 buffer.push(delta);
300
301 deltas_count += 1;
302 tracing::trace!(
303 "Buffering delta: {deltas_count} {} {buffering_start:?} flags={}",
304 delta.ts_event,
305 msg.flags.raw(),
306 );
307
308 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
310 continue; }
312
313 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
315 continue; }
317
318 if let Some(start_ns) = buffering_start {
320 if delta.ts_event <= start_ns {
321 continue; }
323 buffering_start = None;
324 }
325
326 let buffer = buffered_deltas.remove(&delta.instrument_id).unwrap();
328 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
329 let deltas = OrderBookDeltas_API::new(deltas);
330 data1 = Some(Data::Deltas(deltas));
331 }
332 }
333
334 if let Some(data) = data1 {
335 self.send_msg(LiveMessage::Data(data)).await;
336 }
337
338 if let Some(data) = data2 {
339 self.send_msg(LiveMessage::Data(data)).await;
340 }
341 }
342 }
343
344 self.cmd_rx.close();
345 tracing::debug!("Closed command receiver");
346
347 Ok(())
348 }
349
350 async fn send_msg(&mut self, msg: LiveMessage) {
351 tracing::trace!("Sending {msg:?}");
352 match self.msg_tx.send(msg).await {
353 Ok(()) => {}
354 Err(e) => tracing::error!("Error sending message: {e}"),
355 }
356 }
357}
358
359fn handle_error_msg(msg: &dbn::ErrorMsg) {
360 tracing::error!("{msg:?}");
361}
362
363fn handle_system_msg(msg: &dbn::SystemMsg) {
364 tracing::info!("{msg:?}");
365}
366
367fn handle_symbol_mapping_msg(
368 msg: &dbn::SymbolMappingMsg,
369 symbol_map: &mut PitSymbolMap,
370 instrument_id_map: &mut HashMap<u32, InstrumentId>,
371) {
372 symbol_map
374 .on_symbol_mapping(msg)
375 .unwrap_or_else(|_| panic!("Error updating `symbol_map` with {msg:?}"));
376
377 instrument_id_map.remove(&msg.header().instrument_id);
379}
380
381fn update_instrument_id_map_with_exchange(
382 symbol_map: &PitSymbolMap,
383 symbol_venue_map: &RwLock<HashMap<Symbol, Venue>>,
384 instrument_id_map: &mut HashMap<u32, InstrumentId>,
385 raw_instrument_id: u32,
386 exchange: &str,
387) -> InstrumentId {
388 let raw_symbol = symbol_map
389 .get(raw_instrument_id)
390 .expect("Cannot resolve `raw_symbol` from `symbol_map`");
391 let symbol = Symbol::from(raw_symbol.as_str());
392 let venue = Venue::from(exchange);
393 let instrument_id = InstrumentId::new(symbol, venue);
394 symbol_venue_map
395 .write()
396 .unwrap()
397 .entry(symbol)
398 .or_insert(venue);
399 instrument_id_map.insert(raw_instrument_id, instrument_id);
400 instrument_id
401}
402
403fn update_instrument_id_map(
404 record: &dbn::RecordRef,
405 symbol_map: &PitSymbolMap,
406 publisher_venue_map: &IndexMap<PublisherId, Venue>,
407 symbol_venue_map: &HashMap<Symbol, Venue>,
408 instrument_id_map: &mut HashMap<u32, InstrumentId>,
409) -> InstrumentId {
410 let header = record.header();
411
412 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
414 return instrument_id;
415 }
416
417 let raw_symbol = symbol_map
418 .get_for_rec(record)
419 .expect("Cannot resolve `raw_symbol` from `symbol_map`");
420
421 let symbol = Symbol::from_str_unchecked(raw_symbol);
422
423 let publisher_id = header.publisher_id;
424 let venue = match symbol_venue_map.get(&symbol) {
425 Some(venue) => venue,
426 None => publisher_venue_map
427 .get(&publisher_id)
428 .unwrap_or_else(|| panic!("No venue found for `publisher_id` {publisher_id}")),
429 };
430 let instrument_id = InstrumentId::new(symbol, *venue);
431
432 instrument_id_map.insert(header.instrument_id, instrument_id);
433 instrument_id
434}
435
436fn handle_instrument_def_msg(
437 msg: &dbn::InstrumentDefMsg,
438 record: &dbn::RecordRef,
439 symbol_map: &PitSymbolMap,
440 publisher_venue_map: &IndexMap<PublisherId, Venue>,
441 symbol_venue_map: &HashMap<Symbol, Venue>,
442 instrument_id_map: &mut HashMap<u32, InstrumentId>,
443 ts_init: UnixNanos,
444) -> anyhow::Result<InstrumentAny> {
445 let instrument_id = update_instrument_id_map(
446 record,
447 symbol_map,
448 publisher_venue_map,
449 symbol_venue_map,
450 instrument_id_map,
451 );
452
453 decode_instrument_def_msg(msg, instrument_id, ts_init)
454}
455
456fn handle_status_msg(
457 msg: &dbn::StatusMsg,
458 record: &dbn::RecordRef,
459 symbol_map: &PitSymbolMap,
460 publisher_venue_map: &IndexMap<PublisherId, Venue>,
461 symbol_venue_map: &HashMap<Symbol, Venue>,
462 instrument_id_map: &mut HashMap<u32, InstrumentId>,
463 ts_init: UnixNanos,
464) -> anyhow::Result<InstrumentStatus> {
465 let instrument_id = update_instrument_id_map(
466 record,
467 symbol_map,
468 publisher_venue_map,
469 symbol_venue_map,
470 instrument_id_map,
471 );
472
473 decode_status_msg(msg, instrument_id, ts_init)
474}
475
476fn handle_imbalance_msg(
477 msg: &dbn::ImbalanceMsg,
478 record: &dbn::RecordRef,
479 symbol_map: &PitSymbolMap,
480 publisher_venue_map: &IndexMap<PublisherId, Venue>,
481 symbol_venue_map: &HashMap<Symbol, Venue>,
482 instrument_id_map: &mut HashMap<u32, InstrumentId>,
483 ts_init: UnixNanos,
484) -> anyhow::Result<DatabentoImbalance> {
485 let instrument_id = update_instrument_id_map(
486 record,
487 symbol_map,
488 publisher_venue_map,
489 symbol_venue_map,
490 instrument_id_map,
491 );
492
493 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
496}
497
498fn handle_statistics_msg(
499 msg: &dbn::StatMsg,
500 record: &dbn::RecordRef,
501 symbol_map: &PitSymbolMap,
502 publisher_venue_map: &IndexMap<PublisherId, Venue>,
503 symbol_venue_map: &HashMap<Symbol, Venue>,
504 instrument_id_map: &mut HashMap<u32, InstrumentId>,
505 ts_init: UnixNanos,
506) -> anyhow::Result<DatabentoStatistics> {
507 let instrument_id = update_instrument_id_map(
508 record,
509 symbol_map,
510 publisher_venue_map,
511 symbol_venue_map,
512 instrument_id_map,
513 );
514
515 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
518}
519
520fn handle_record(
521 record: dbn::RecordRef,
522 symbol_map: &PitSymbolMap,
523 publisher_venue_map: &IndexMap<PublisherId, Venue>,
524 symbol_venue_map: &HashMap<Symbol, Venue>,
525 instrument_id_map: &mut HashMap<u32, InstrumentId>,
526 ts_init: UnixNanos,
527 initialized_books: &HashSet<InstrumentId>,
528) -> anyhow::Result<(Option<Data>, Option<Data>)> {
529 let instrument_id = update_instrument_id_map(
530 &record,
531 symbol_map,
532 publisher_venue_map,
533 symbol_venue_map,
534 instrument_id_map,
535 );
536
537 let price_precision = 2; let include_trades = initialized_books.contains(&instrument_id);
539
540 decode_record(
541 &record,
542 instrument_id,
543 price_precision,
544 Some(ts_init),
545 include_trades,
546 )
547}