1use std::sync::{Arc, RwLock};
17
18use ahash::{AHashMap, HashSet, HashSetExt};
19use databento::{
20 dbn::{self, PitSymbolMap, Record, SymbolIndex},
21 live::Subscription,
22};
23use indexmap::IndexMap;
24use nautilus_core::{
25 UnixNanos, consts::NAUTILUS_USER_AGENT, python::to_pyruntime_err,
26 time::get_atomic_clock_realtime,
27};
28use nautilus_model::{
29 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30 enums::RecordFlag,
31 identifiers::{InstrumentId, Symbol, Venue},
32 instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38 types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41 decode::{decode_instrument_def_msg, decode_record},
42 types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47 Subscribe(Subscription),
48 Start,
49 Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] pub enum LiveMessage {
55 Data(Data),
56 Instrument(InstrumentAny),
57 Status(InstrumentStatus),
58 Imbalance(DatabentoImbalance),
59 Statistics(DatabentoStatistics),
60 Error(anyhow::Error),
61 Close,
62}
63
64#[derive(Debug)]
70pub struct DatabentoFeedHandler {
71 key: String,
72 dataset: String,
73 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
74 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
75 publisher_venue_map: IndexMap<PublisherId, Venue>,
76 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
77 replay: bool,
78 use_exchange_as_venue: bool,
79 bars_timestamp_on_close: bool,
80}
81
82impl DatabentoFeedHandler {
83 #[must_use]
85 #[allow(clippy::too_many_arguments)]
86 pub const fn new(
87 key: String,
88 dataset: String,
89 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
90 tx: tokio::sync::mpsc::Sender<LiveMessage>,
91 publisher_venue_map: IndexMap<PublisherId, Venue>,
92 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
93 use_exchange_as_venue: bool,
94 bars_timestamp_on_close: bool,
95 ) -> Self {
96 Self {
97 key,
98 dataset,
99 cmd_rx: rx,
100 msg_tx: tx,
101 publisher_venue_map,
102 symbol_venue_map,
103 replay: false,
104 use_exchange_as_venue,
105 bars_timestamp_on_close,
106 }
107 }
108
109 #[allow(clippy::blocks_in_conditions)]
119 pub async fn run(&mut self) -> anyhow::Result<()> {
120 tracing::debug!("Running feed handler");
121 let clock = get_atomic_clock_realtime();
122 let mut symbol_map = PitSymbolMap::new();
123 let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
124
125 let mut buffering_start = None;
126 let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
127 let mut deltas_count = 0_u64;
128 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
131 timeout,
132 databento::LiveClient::builder()
133 .user_agent_extension(NAUTILUS_USER_AGENT.into())
134 .key(self.key.clone())?
135 .dataset(self.dataset.clone())
136 .build(),
137 )
138 .await?;
139
140 tracing::info!("Connected");
141
142 let mut client = if let Ok(client) = result {
143 client
144 } else {
145 self.msg_tx.send(LiveMessage::Close).await?;
146 self.cmd_rx.close();
147 anyhow::bail!("Timeout connecting to LSG");
148 };
149
150 let timeout = Duration::from_millis(10);
152
153 let mut running = false;
155
156 loop {
157 if self.msg_tx.is_closed() {
158 tracing::debug!("Message channel was closed: stopping");
159 break;
160 }
161
162 match self.cmd_rx.try_recv() {
163 Ok(cmd) => {
164 tracing::debug!("Received command: {cmd:?}");
165 match cmd {
166 LiveCommand::Subscribe(sub) => {
167 if !self.replay & sub.start.is_some() {
168 self.replay = true;
169 }
170 client.subscribe(sub).await.map_err(to_pyruntime_err)?;
171 }
172 LiveCommand::Start => {
173 buffering_start = if self.replay {
174 Some(clock.get_time_ns())
175 } else {
176 None
177 };
178 client.start().await.map_err(to_pyruntime_err)?;
179 running = true;
180 tracing::debug!("Started");
181 }
182 LiveCommand::Close => {
183 self.msg_tx.send(LiveMessage::Close).await?;
184 if running {
185 client.close().await.map_err(to_pyruntime_err)?;
186 tracing::debug!("Closed inner client");
187 }
188 break;
189 }
190 }
191 }
192 Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => {
194 tracing::debug!("Disconnected");
195 break;
196 }
197 }
198
199 if !running {
200 continue;
201 }
202
203 let result = tokio::time::timeout(timeout, client.next_record()).await;
205 let record_opt = match result {
206 Ok(record_opt) => record_opt,
207 Err(_) => continue, };
209
210 let record = match record_opt {
211 Ok(Some(record)) => record,
212 Ok(None) => break, Err(e) => {
214 self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
217 break;
218 }
219 };
220
221 let ts_init = clock.get_time_ns();
222 let mut initialized_books = HashSet::new();
223
224 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
226 handle_error_msg(msg);
227 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
228 handle_system_msg(msg);
229 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
230 instrument_id_map.remove(&msg.hd.instrument_id);
232 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
233 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
234 if self.use_exchange_as_venue {
235 let exchange = msg.exchange()?;
236 if !exchange.is_empty() {
237 update_instrument_id_map_with_exchange(
238 &symbol_map,
239 &self.symbol_venue_map,
240 &mut instrument_id_map,
241 msg.hd.instrument_id,
242 exchange,
243 )?;
244 }
245 }
246 let data = {
247 let sym_map = self
248 .symbol_venue_map
249 .read()
250 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
251 handle_instrument_def_msg(
252 msg,
253 &record,
254 &symbol_map,
255 &self.publisher_venue_map,
256 &sym_map,
257 &mut instrument_id_map,
258 ts_init,
259 )?
260 };
261 self.send_msg(LiveMessage::Instrument(data)).await;
262 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
263 let data = {
264 let sym_map = self
265 .symbol_venue_map
266 .read()
267 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
268 handle_status_msg(
269 msg,
270 &record,
271 &symbol_map,
272 &self.publisher_venue_map,
273 &sym_map,
274 &mut instrument_id_map,
275 ts_init,
276 )?
277 };
278 self.send_msg(LiveMessage::Status(data)).await;
279 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
280 let data = {
281 let sym_map = self
282 .symbol_venue_map
283 .read()
284 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
285 handle_imbalance_msg(
286 msg,
287 &record,
288 &symbol_map,
289 &self.publisher_venue_map,
290 &sym_map,
291 &mut instrument_id_map,
292 ts_init,
293 )?
294 };
295 self.send_msg(LiveMessage::Imbalance(data)).await;
296 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
297 let data = {
298 let sym_map = self
299 .symbol_venue_map
300 .read()
301 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
302 handle_statistics_msg(
303 msg,
304 &record,
305 &symbol_map,
306 &self.publisher_venue_map,
307 &sym_map,
308 &mut instrument_id_map,
309 ts_init,
310 )?
311 };
312 self.send_msg(LiveMessage::Statistics(data)).await;
313 } else {
314 let (mut data1, data2) = match {
316 let sym_map = self
317 .symbol_venue_map
318 .read()
319 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
320 handle_record(
321 record,
322 &symbol_map,
323 &self.publisher_venue_map,
324 &sym_map,
325 &mut instrument_id_map,
326 ts_init,
327 &initialized_books,
328 self.bars_timestamp_on_close,
329 )
330 } {
331 Ok(decoded) => decoded,
332 Err(e) => {
333 tracing::error!("Error decoding record: {e}");
334 continue;
335 }
336 };
337
338 if let Some(msg) = record.get::<dbn::MboMsg>() {
339 if let Some(Data::Delta(delta)) = &data1 {
341 initialized_books.insert(delta.instrument_id);
342 } else {
343 continue; }
345
346 if let Data::Delta(delta) = data1.clone().expect("MBO should decode a delta") {
347 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
348 buffer.push(delta);
349
350 deltas_count += 1;
351 tracing::trace!(
352 "Buffering delta: {deltas_count} {} {buffering_start:?} flags={}",
353 delta.ts_event,
354 msg.flags.raw(),
355 );
356
357 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
359 continue; }
361
362 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
364 continue; }
366
367 if let Some(start_ns) = buffering_start {
369 if delta.ts_event <= start_ns {
370 continue; }
372 buffering_start = None;
373 }
374
375 let buffer =
377 buffered_deltas
378 .remove(&delta.instrument_id)
379 .ok_or_else(|| {
380 anyhow::anyhow!(
381 "Internal error: no buffered deltas for instrument {id}",
382 id = delta.instrument_id
383 )
384 })?;
385 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
386 let deltas = OrderBookDeltas_API::new(deltas);
387 data1 = Some(Data::Deltas(deltas));
388 }
389 }
390
391 if let Some(data) = data1 {
392 self.send_msg(LiveMessage::Data(data)).await;
393 }
394
395 if let Some(data) = data2 {
396 self.send_msg(LiveMessage::Data(data)).await;
397 }
398 }
399 }
400
401 self.cmd_rx.close();
402 tracing::debug!("Closed command receiver");
403
404 Ok(())
405 }
406
407 async fn send_msg(&mut self, msg: LiveMessage) {
408 tracing::trace!("Sending {msg:?}");
409 match self.msg_tx.send(msg).await {
410 Ok(()) => {}
411 Err(e) => tracing::error!("Error sending message: {e}"),
412 }
413 }
414}
415
416fn handle_error_msg(msg: &dbn::ErrorMsg) {
417 tracing::error!("{msg:?}");
418}
419
420fn handle_system_msg(msg: &dbn::SystemMsg) {
421 tracing::info!("{msg:?}");
422}
423
424fn handle_symbol_mapping_msg(
425 msg: &dbn::SymbolMappingMsg,
426 symbol_map: &mut PitSymbolMap,
427 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
428) -> anyhow::Result<()> {
429 symbol_map
430 .on_symbol_mapping(msg)
431 .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
432 instrument_id_map.remove(&msg.header().instrument_id);
433 Ok(())
434}
435
436fn update_instrument_id_map_with_exchange(
437 symbol_map: &PitSymbolMap,
438 symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
439 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
440 raw_instrument_id: u32,
441 exchange: &str,
442) -> anyhow::Result<InstrumentId> {
443 let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
444 anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
445 })?;
446 let symbol = Symbol::from(raw_symbol.as_str());
447 let venue = Venue::from(exchange);
448 let instrument_id = InstrumentId::new(symbol, venue);
449 let mut map = symbol_venue_map
450 .write()
451 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
452 map.entry(symbol).or_insert(venue);
453 instrument_id_map.insert(raw_instrument_id, instrument_id);
454 Ok(instrument_id)
455}
456
457fn update_instrument_id_map(
458 record: &dbn::RecordRef,
459 symbol_map: &PitSymbolMap,
460 publisher_venue_map: &IndexMap<PublisherId, Venue>,
461 symbol_venue_map: &AHashMap<Symbol, Venue>,
462 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
463) -> InstrumentId {
464 let header = record.header();
465
466 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
468 return instrument_id;
469 }
470
471 let raw_symbol = symbol_map
472 .get_for_rec(record)
473 .expect("Cannot resolve `raw_symbol` from `symbol_map`");
474
475 let symbol = Symbol::from_str_unchecked(raw_symbol);
476
477 let publisher_id = header.publisher_id;
478 let venue = match symbol_venue_map.get(&symbol) {
479 Some(venue) => venue,
480 None => publisher_venue_map
481 .get(&publisher_id)
482 .unwrap_or_else(|| panic!("No venue found for `publisher_id` {publisher_id}")),
483 };
484 let instrument_id = InstrumentId::new(symbol, *venue);
485
486 instrument_id_map.insert(header.instrument_id, instrument_id);
487 instrument_id
488}
489
490fn handle_instrument_def_msg(
491 msg: &dbn::InstrumentDefMsg,
492 record: &dbn::RecordRef,
493 symbol_map: &PitSymbolMap,
494 publisher_venue_map: &IndexMap<PublisherId, Venue>,
495 symbol_venue_map: &AHashMap<Symbol, Venue>,
496 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
497 ts_init: UnixNanos,
498) -> anyhow::Result<InstrumentAny> {
499 let instrument_id = update_instrument_id_map(
500 record,
501 symbol_map,
502 publisher_venue_map,
503 symbol_venue_map,
504 instrument_id_map,
505 );
506
507 decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
508}
509
510fn handle_status_msg(
511 msg: &dbn::StatusMsg,
512 record: &dbn::RecordRef,
513 symbol_map: &PitSymbolMap,
514 publisher_venue_map: &IndexMap<PublisherId, Venue>,
515 symbol_venue_map: &AHashMap<Symbol, Venue>,
516 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
517 ts_init: UnixNanos,
518) -> anyhow::Result<InstrumentStatus> {
519 let instrument_id = update_instrument_id_map(
520 record,
521 symbol_map,
522 publisher_venue_map,
523 symbol_venue_map,
524 instrument_id_map,
525 );
526
527 decode_status_msg(msg, instrument_id, Some(ts_init))
528}
529
530fn handle_imbalance_msg(
531 msg: &dbn::ImbalanceMsg,
532 record: &dbn::RecordRef,
533 symbol_map: &PitSymbolMap,
534 publisher_venue_map: &IndexMap<PublisherId, Venue>,
535 symbol_venue_map: &AHashMap<Symbol, Venue>,
536 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
537 ts_init: UnixNanos,
538) -> anyhow::Result<DatabentoImbalance> {
539 let instrument_id = update_instrument_id_map(
540 record,
541 symbol_map,
542 publisher_venue_map,
543 symbol_venue_map,
544 instrument_id_map,
545 );
546
547 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
550}
551
552fn handle_statistics_msg(
553 msg: &dbn::StatMsg,
554 record: &dbn::RecordRef,
555 symbol_map: &PitSymbolMap,
556 publisher_venue_map: &IndexMap<PublisherId, Venue>,
557 symbol_venue_map: &AHashMap<Symbol, Venue>,
558 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
559 ts_init: UnixNanos,
560) -> anyhow::Result<DatabentoStatistics> {
561 let instrument_id = update_instrument_id_map(
562 record,
563 symbol_map,
564 publisher_venue_map,
565 symbol_venue_map,
566 instrument_id_map,
567 );
568
569 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
572}
573
574#[allow(clippy::too_many_arguments)]
575fn handle_record(
576 record: dbn::RecordRef,
577 symbol_map: &PitSymbolMap,
578 publisher_venue_map: &IndexMap<PublisherId, Venue>,
579 symbol_venue_map: &AHashMap<Symbol, Venue>,
580 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
581 ts_init: UnixNanos,
582 initialized_books: &HashSet<InstrumentId>,
583 bars_timestamp_on_close: bool,
584) -> anyhow::Result<(Option<Data>, Option<Data>)> {
585 let instrument_id = update_instrument_id_map(
586 &record,
587 symbol_map,
588 publisher_venue_map,
589 symbol_venue_map,
590 instrument_id_map,
591 );
592
593 let price_precision = 2; let include_trades = initialized_books.contains(&instrument_id);
595
596 decode_record(
597 &record,
598 instrument_id,
599 price_precision,
600 Some(ts_init),
601 include_trades,
602 bars_timestamp_on_close,
603 )
604}