1use std::{
17 sync::{Arc, RwLock},
18 time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23 dbn::{self, PitSymbolMap, Record, SymbolIndex},
24 live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30 enums::RecordFlag,
31 identifiers::{InstrumentId, Symbol, Venue},
32 instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38 types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41 decode::{decode_instrument_def_msg, decode_record},
42 types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47 Subscribe(Subscription),
48 Start,
49 Close,
50}
51
52#[derive(Debug)]
53#[allow(
54 clippy::large_enum_variant,
55 reason = "TODO: Optimize this (largest variant 1096 vs 80 bytes)"
56)]
57pub enum LiveMessage {
58 Data(Data),
59 Instrument(InstrumentAny),
60 Status(InstrumentStatus),
61 Imbalance(DatabentoImbalance),
62 Statistics(DatabentoStatistics),
63 Error(anyhow::Error),
64 Close,
65}
66
67#[derive(Debug)]
81pub struct DatabentoFeedHandler {
82 key: String,
83 dataset: String,
84 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
85 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
86 publisher_venue_map: IndexMap<PublisherId, Venue>,
87 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
88 replay: bool,
89 use_exchange_as_venue: bool,
90 bars_timestamp_on_close: bool,
91}
92
93impl DatabentoFeedHandler {
94 #[must_use]
96 #[allow(clippy::too_many_arguments)]
97 pub const fn new(
98 key: String,
99 dataset: String,
100 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
101 tx: tokio::sync::mpsc::Sender<LiveMessage>,
102 publisher_venue_map: IndexMap<PublisherId, Venue>,
103 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
104 use_exchange_as_venue: bool,
105 bars_timestamp_on_close: bool,
106 ) -> Self {
107 Self {
108 key,
109 dataset,
110 cmd_rx: rx,
111 msg_tx: tx,
112 publisher_venue_map,
113 symbol_venue_map,
114 replay: false,
115 use_exchange_as_venue,
116 bars_timestamp_on_close,
117 }
118 }
119
120 #[allow(clippy::blocks_in_conditions)]
129 pub async fn run(&mut self) -> anyhow::Result<()> {
130 tracing::debug!("Running feed handler");
131 let clock = get_atomic_clock_realtime();
132 let mut symbol_map = PitSymbolMap::new();
133 let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
134
135 let mut buffering_start = None;
136 let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
137 let mut initialized_books = HashSet::new();
138 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
141 timeout,
142 databento::LiveClient::builder()
143 .user_agent_extension(NAUTILUS_USER_AGENT.into())
144 .key(self.key.clone())?
145 .dataset(self.dataset.clone())
146 .build(),
147 )
148 .await?;
149
150 tracing::info!("Connected");
151
152 let mut client = match result {
153 Ok(client) => client,
154 Err(e) => {
155 self.msg_tx.send(LiveMessage::Close).await?;
156 self.cmd_rx.close();
157 anyhow::bail!("Failed to connect to Databento LSG: {e}");
158 }
159 };
160
161 let timeout = Duration::from_millis(10);
163
164 let mut running = false;
166
167 loop {
168 if self.msg_tx.is_closed() {
169 tracing::debug!("Message channel was closed: stopping");
170 break;
171 }
172
173 match self.cmd_rx.try_recv() {
174 Ok(cmd) => {
175 tracing::debug!("Received command: {cmd:?}");
176 match cmd {
177 LiveCommand::Subscribe(sub) => {
178 if !self.replay && sub.start.is_some() {
179 self.replay = true;
180 }
181 client.subscribe(sub).await?;
182 }
183 LiveCommand::Start => {
184 buffering_start = if self.replay {
185 Some(clock.get_time_ns())
186 } else {
187 None
188 };
189 client.start().await?;
190 running = true;
191 tracing::debug!("Started");
192 }
193 LiveCommand::Close => {
194 self.msg_tx.send(LiveMessage::Close).await?;
195 if running {
196 client.close().await?;
197 tracing::debug!("Closed inner client");
198 }
199 break;
200 }
201 }
202 }
203 Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => {
205 tracing::debug!("Disconnected");
206 break;
207 }
208 }
209
210 if !running {
211 continue;
212 }
213
214 let result = tokio::time::timeout(timeout, client.next_record()).await;
216 let record_opt = match result {
217 Ok(record_opt) => record_opt,
218 Err(_) => continue, };
220
221 let record = match record_opt {
222 Ok(Some(record)) => record,
223 Ok(None) => break, Err(e) => {
225 self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
228 break;
229 }
230 };
231
232 let ts_init = clock.get_time_ns();
233
234 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
236 handle_error_msg(msg);
237 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
238 handle_system_msg(msg);
239 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
240 instrument_id_map.remove(&msg.hd.instrument_id);
242 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
243 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
244 if self.use_exchange_as_venue {
245 let exchange = msg.exchange()?;
246 if !exchange.is_empty() {
247 update_instrument_id_map_with_exchange(
248 &symbol_map,
249 &self.symbol_venue_map,
250 &mut instrument_id_map,
251 msg.hd.instrument_id,
252 exchange,
253 )?;
254 }
255 }
256 let data = {
257 let sym_map = self.read_symbol_venue_map()?;
258 handle_instrument_def_msg(
259 msg,
260 &record,
261 &symbol_map,
262 &self.publisher_venue_map,
263 &sym_map,
264 &mut instrument_id_map,
265 ts_init,
266 )?
267 };
268 self.send_msg(LiveMessage::Instrument(data)).await;
269 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
270 let data = {
271 let sym_map = self.read_symbol_venue_map()?;
272 handle_status_msg(
273 msg,
274 &record,
275 &symbol_map,
276 &self.publisher_venue_map,
277 &sym_map,
278 &mut instrument_id_map,
279 ts_init,
280 )?
281 };
282 self.send_msg(LiveMessage::Status(data)).await;
283 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
284 let data = {
285 let sym_map = self.read_symbol_venue_map()?;
286 handle_imbalance_msg(
287 msg,
288 &record,
289 &symbol_map,
290 &self.publisher_venue_map,
291 &sym_map,
292 &mut instrument_id_map,
293 ts_init,
294 )?
295 };
296 self.send_msg(LiveMessage::Imbalance(data)).await;
297 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
298 let data = {
299 let sym_map = self.read_symbol_venue_map()?;
300 handle_statistics_msg(
301 msg,
302 &record,
303 &symbol_map,
304 &self.publisher_venue_map,
305 &sym_map,
306 &mut instrument_id_map,
307 ts_init,
308 )?
309 };
310 self.send_msg(LiveMessage::Statistics(data)).await;
311 } else {
312 let (mut data1, data2) = match {
314 let sym_map = self.read_symbol_venue_map()?;
315 handle_record(
316 record,
317 &symbol_map,
318 &self.publisher_venue_map,
319 &sym_map,
320 &mut instrument_id_map,
321 ts_init,
322 &initialized_books,
323 self.bars_timestamp_on_close,
324 )
325 } {
326 Ok(decoded) => decoded,
327 Err(e) => {
328 tracing::error!("Error decoding record: {e}");
329 continue;
330 }
331 };
332
333 if let Some(msg) = record.get::<dbn::MboMsg>() {
334 if let Some(Data::Delta(delta)) = &data1 {
336 initialized_books.insert(delta.instrument_id);
337 } else {
338 continue; }
340
341 if let Some(Data::Delta(delta)) = &data1 {
342 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
343 buffer.push(*delta);
344
345 tracing::trace!(
346 "Buffering delta: {} {buffering_start:?} flags={}",
347 delta.ts_event,
348 msg.flags.raw(),
349 );
350
351 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
353 continue; }
355
356 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
358 continue; }
360
361 if let Some(start_ns) = buffering_start {
363 if delta.ts_event <= start_ns {
364 continue; }
366 buffering_start = None;
367 }
368
369 let buffer =
371 buffered_deltas
372 .remove(&delta.instrument_id)
373 .ok_or_else(|| {
374 anyhow::anyhow!(
375 "Internal error: no buffered deltas for instrument {id}",
376 id = delta.instrument_id
377 )
378 })?;
379 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
380 let deltas = OrderBookDeltas_API::new(deltas);
381 data1 = Some(Data::Deltas(deltas));
382 }
383 }
384
385 if let Some(data) = data1 {
386 self.send_msg(LiveMessage::Data(data)).await;
387 }
388
389 if let Some(data) = data2 {
390 self.send_msg(LiveMessage::Data(data)).await;
391 }
392 }
393 }
394
395 self.cmd_rx.close();
396 tracing::debug!("Closed command receiver");
397
398 Ok(())
399 }
400
401 async fn send_msg(&mut self, msg: LiveMessage) {
403 tracing::trace!("Sending {msg:?}");
404 match self.msg_tx.send(msg).await {
405 Ok(()) => {}
406 Err(e) => tracing::error!("Error sending message: {e}"),
407 }
408 }
409
410 fn read_symbol_venue_map(
416 &self,
417 ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
418 const MAX_WAIT_MS: u64 = 500; const INITIAL_DELAY_MICROS: u64 = 10;
421 const MAX_DELAY_MICROS: u64 = 1000;
422
423 let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
424 let mut delay = INITIAL_DELAY_MICROS;
425
426 loop {
427 match self.symbol_venue_map.try_read() {
428 Ok(guard) => return Ok(guard),
429 Err(std::sync::TryLockError::WouldBlock) => {
430 if std::time::Instant::now() >= deadline {
431 break;
432 }
433
434 std::thread::yield_now();
436
437 if std::time::Instant::now() < deadline {
439 let remaining = deadline - std::time::Instant::now();
440 let sleep_duration = StdDuration::from_micros(delay).min(remaining);
441 std::thread::sleep(sleep_duration);
442 delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
444 }
445 }
446 Err(std::sync::TryLockError::Poisoned(e)) => {
447 anyhow::bail!("symbol_venue_map lock poisoned: {e}");
448 }
449 }
450 }
451
452 anyhow::bail!(
453 "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
454 )
455 }
456}
457
458fn handle_error_msg(msg: &dbn::ErrorMsg) {
460 tracing::error!("{msg:?}");
461}
462
463fn handle_system_msg(msg: &dbn::SystemMsg) {
465 tracing::info!("{msg:?}");
466}
467
468fn handle_symbol_mapping_msg(
474 msg: &dbn::SymbolMappingMsg,
475 symbol_map: &mut PitSymbolMap,
476 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
477) -> anyhow::Result<()> {
478 symbol_map
479 .on_symbol_mapping(msg)
480 .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
481 instrument_id_map.remove(&msg.header().instrument_id);
482 Ok(())
483}
484
485fn update_instrument_id_map_with_exchange(
487 symbol_map: &PitSymbolMap,
488 symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
489 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
490 raw_instrument_id: u32,
491 exchange: &str,
492) -> anyhow::Result<InstrumentId> {
493 let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
494 anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
495 })?;
496 let symbol = Symbol::from(raw_symbol.as_str());
497 let venue = Venue::from_code(exchange)
498 .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
499 let instrument_id = InstrumentId::new(symbol, venue);
500 let mut map = symbol_venue_map
501 .write()
502 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
503 map.entry(symbol).or_insert(venue);
504 instrument_id_map.insert(raw_instrument_id, instrument_id);
505 Ok(instrument_id)
506}
507
508fn update_instrument_id_map(
509 record: &dbn::RecordRef,
510 symbol_map: &PitSymbolMap,
511 publisher_venue_map: &IndexMap<PublisherId, Venue>,
512 symbol_venue_map: &AHashMap<Symbol, Venue>,
513 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
514) -> anyhow::Result<InstrumentId> {
515 let header = record.header();
516
517 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
519 return Ok(instrument_id);
520 }
521
522 let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
523 anyhow::anyhow!(
524 "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
525 header.instrument_id
526 )
527 })?;
528
529 let symbol = Symbol::from_str_unchecked(raw_symbol);
530
531 let publisher_id = header.publisher_id;
532 let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
533 *venue
534 } else {
535 let venue = publisher_venue_map
536 .get(&publisher_id)
537 .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
538 *venue
539 };
540 let instrument_id = InstrumentId::new(symbol, venue);
541
542 instrument_id_map.insert(header.instrument_id, instrument_id);
543 Ok(instrument_id)
544}
545
546fn handle_instrument_def_msg(
552 msg: &dbn::InstrumentDefMsg,
553 record: &dbn::RecordRef,
554 symbol_map: &PitSymbolMap,
555 publisher_venue_map: &IndexMap<PublisherId, Venue>,
556 symbol_venue_map: &AHashMap<Symbol, Venue>,
557 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
558 ts_init: UnixNanos,
559) -> anyhow::Result<InstrumentAny> {
560 let instrument_id = update_instrument_id_map(
561 record,
562 symbol_map,
563 publisher_venue_map,
564 symbol_venue_map,
565 instrument_id_map,
566 )?;
567
568 decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
569}
570
571fn handle_status_msg(
572 msg: &dbn::StatusMsg,
573 record: &dbn::RecordRef,
574 symbol_map: &PitSymbolMap,
575 publisher_venue_map: &IndexMap<PublisherId, Venue>,
576 symbol_venue_map: &AHashMap<Symbol, Venue>,
577 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
578 ts_init: UnixNanos,
579) -> anyhow::Result<InstrumentStatus> {
580 let instrument_id = update_instrument_id_map(
581 record,
582 symbol_map,
583 publisher_venue_map,
584 symbol_venue_map,
585 instrument_id_map,
586 )?;
587
588 decode_status_msg(msg, instrument_id, Some(ts_init))
589}
590
591fn handle_imbalance_msg(
592 msg: &dbn::ImbalanceMsg,
593 record: &dbn::RecordRef,
594 symbol_map: &PitSymbolMap,
595 publisher_venue_map: &IndexMap<PublisherId, Venue>,
596 symbol_venue_map: &AHashMap<Symbol, Venue>,
597 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
598 ts_init: UnixNanos,
599) -> anyhow::Result<DatabentoImbalance> {
600 let instrument_id = update_instrument_id_map(
601 record,
602 symbol_map,
603 publisher_venue_map,
604 symbol_venue_map,
605 instrument_id_map,
606 )?;
607
608 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
611}
612
613fn handle_statistics_msg(
614 msg: &dbn::StatMsg,
615 record: &dbn::RecordRef,
616 symbol_map: &PitSymbolMap,
617 publisher_venue_map: &IndexMap<PublisherId, Venue>,
618 symbol_venue_map: &AHashMap<Symbol, Venue>,
619 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
620 ts_init: UnixNanos,
621) -> anyhow::Result<DatabentoStatistics> {
622 let instrument_id = update_instrument_id_map(
623 record,
624 symbol_map,
625 publisher_venue_map,
626 symbol_venue_map,
627 instrument_id_map,
628 )?;
629
630 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
633}
634
635#[allow(clippy::too_many_arguments)]
636fn handle_record(
637 record: dbn::RecordRef,
638 symbol_map: &PitSymbolMap,
639 publisher_venue_map: &IndexMap<PublisherId, Venue>,
640 symbol_venue_map: &AHashMap<Symbol, Venue>,
641 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
642 ts_init: UnixNanos,
643 initialized_books: &HashSet<InstrumentId>,
644 bars_timestamp_on_close: bool,
645) -> anyhow::Result<(Option<Data>, Option<Data>)> {
646 let instrument_id = update_instrument_id_map(
647 &record,
648 symbol_map,
649 publisher_venue_map,
650 symbol_venue_map,
651 instrument_id_map,
652 )?;
653
654 let price_precision = 2; let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
659 || record.get::<dbn::TbboMsg>().is_some()
660 || record.get::<dbn::Cmbp1Msg>().is_some()
661 {
662 true } else {
664 initialized_books.contains(&instrument_id) };
666
667 decode_record(
668 &record,
669 instrument_id,
670 price_precision,
671 Some(ts_init),
672 include_trades,
673 bars_timestamp_on_close,
674 )
675}