1use std::{
17 sync::{Arc, RwLock},
18 time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23 dbn::{self, PitSymbolMap, Record, SymbolIndex},
24 live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30 enums::RecordFlag,
31 identifiers::{InstrumentId, Symbol, Venue},
32 instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38 types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41 decode::{decode_instrument_def_msg, decode_record},
42 types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47 Subscribe(Subscription),
48 Start,
49 Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] pub enum LiveMessage {
55 Data(Data),
56 Instrument(InstrumentAny),
57 Status(InstrumentStatus),
58 Imbalance(DatabentoImbalance),
59 Statistics(DatabentoStatistics),
60 Error(anyhow::Error),
61 Close,
62}
63
64#[derive(Debug)]
78pub struct DatabentoFeedHandler {
79 key: String,
80 dataset: String,
81 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
82 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
83 publisher_venue_map: IndexMap<PublisherId, Venue>,
84 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
85 replay: bool,
86 use_exchange_as_venue: bool,
87 bars_timestamp_on_close: bool,
88}
89
90impl DatabentoFeedHandler {
91 #[must_use]
93 #[allow(clippy::too_many_arguments)]
94 pub const fn new(
95 key: String,
96 dataset: String,
97 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
98 tx: tokio::sync::mpsc::Sender<LiveMessage>,
99 publisher_venue_map: IndexMap<PublisherId, Venue>,
100 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
101 use_exchange_as_venue: bool,
102 bars_timestamp_on_close: bool,
103 ) -> Self {
104 Self {
105 key,
106 dataset,
107 cmd_rx: rx,
108 msg_tx: tx,
109 publisher_venue_map,
110 symbol_venue_map,
111 replay: false,
112 use_exchange_as_venue,
113 bars_timestamp_on_close,
114 }
115 }
116
117 #[allow(clippy::blocks_in_conditions)]
126 pub async fn run(&mut self) -> anyhow::Result<()> {
127 tracing::debug!("Running feed handler");
128 let clock = get_atomic_clock_realtime();
129 let mut symbol_map = PitSymbolMap::new();
130 let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
131
132 let mut buffering_start = None;
133 let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
134 let mut initialized_books = HashSet::new();
135 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
138 timeout,
139 databento::LiveClient::builder()
140 .user_agent_extension(NAUTILUS_USER_AGENT.into())
141 .key(self.key.clone())?
142 .dataset(self.dataset.clone())
143 .build(),
144 )
145 .await?;
146
147 tracing::info!("Connected");
148
149 let mut client = match result {
150 Ok(client) => client,
151 Err(e) => {
152 self.msg_tx.send(LiveMessage::Close).await?;
153 self.cmd_rx.close();
154 anyhow::bail!("Failed to connect to Databento LSG: {e}");
155 }
156 };
157
158 let timeout = Duration::from_millis(10);
160
161 let mut running = false;
163
164 loop {
165 if self.msg_tx.is_closed() {
166 tracing::debug!("Message channel was closed: stopping");
167 break;
168 }
169
170 match self.cmd_rx.try_recv() {
171 Ok(cmd) => {
172 tracing::debug!("Received command: {cmd:?}");
173 match cmd {
174 LiveCommand::Subscribe(sub) => {
175 if !self.replay && sub.start.is_some() {
176 self.replay = true;
177 }
178 client.subscribe(sub).await?;
179 }
180 LiveCommand::Start => {
181 buffering_start = if self.replay {
182 Some(clock.get_time_ns())
183 } else {
184 None
185 };
186 client.start().await?;
187 running = true;
188 tracing::debug!("Started");
189 }
190 LiveCommand::Close => {
191 self.msg_tx.send(LiveMessage::Close).await?;
192 if running {
193 client.close().await?;
194 tracing::debug!("Closed inner client");
195 }
196 break;
197 }
198 }
199 }
200 Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => {
202 tracing::debug!("Disconnected");
203 break;
204 }
205 }
206
207 if !running {
208 continue;
209 }
210
211 let result = tokio::time::timeout(timeout, client.next_record()).await;
213 let record_opt = match result {
214 Ok(record_opt) => record_opt,
215 Err(_) => continue, };
217
218 let record = match record_opt {
219 Ok(Some(record)) => record,
220 Ok(None) => break, Err(e) => {
222 self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
225 break;
226 }
227 };
228
229 let ts_init = clock.get_time_ns();
230
231 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
233 handle_error_msg(msg);
234 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
235 handle_system_msg(msg);
236 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
237 instrument_id_map.remove(&msg.hd.instrument_id);
239 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
240 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
241 if self.use_exchange_as_venue {
242 let exchange = msg.exchange()?;
243 if !exchange.is_empty() {
244 update_instrument_id_map_with_exchange(
245 &symbol_map,
246 &self.symbol_venue_map,
247 &mut instrument_id_map,
248 msg.hd.instrument_id,
249 exchange,
250 )?;
251 }
252 }
253 let data = {
254 let sym_map = self.read_symbol_venue_map()?;
255 handle_instrument_def_msg(
256 msg,
257 &record,
258 &symbol_map,
259 &self.publisher_venue_map,
260 &sym_map,
261 &mut instrument_id_map,
262 ts_init,
263 )?
264 };
265 self.send_msg(LiveMessage::Instrument(data)).await;
266 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
267 let data = {
268 let sym_map = self.read_symbol_venue_map()?;
269 handle_status_msg(
270 msg,
271 &record,
272 &symbol_map,
273 &self.publisher_venue_map,
274 &sym_map,
275 &mut instrument_id_map,
276 ts_init,
277 )?
278 };
279 self.send_msg(LiveMessage::Status(data)).await;
280 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
281 let data = {
282 let sym_map = self.read_symbol_venue_map()?;
283 handle_imbalance_msg(
284 msg,
285 &record,
286 &symbol_map,
287 &self.publisher_venue_map,
288 &sym_map,
289 &mut instrument_id_map,
290 ts_init,
291 )?
292 };
293 self.send_msg(LiveMessage::Imbalance(data)).await;
294 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
295 let data = {
296 let sym_map = self.read_symbol_venue_map()?;
297 handle_statistics_msg(
298 msg,
299 &record,
300 &symbol_map,
301 &self.publisher_venue_map,
302 &sym_map,
303 &mut instrument_id_map,
304 ts_init,
305 )?
306 };
307 self.send_msg(LiveMessage::Statistics(data)).await;
308 } else {
309 let (mut data1, data2) = match {
311 let sym_map = self.read_symbol_venue_map()?;
312 handle_record(
313 record,
314 &symbol_map,
315 &self.publisher_venue_map,
316 &sym_map,
317 &mut instrument_id_map,
318 ts_init,
319 &initialized_books,
320 self.bars_timestamp_on_close,
321 )
322 } {
323 Ok(decoded) => decoded,
324 Err(e) => {
325 tracing::error!("Error decoding record: {e}");
326 continue;
327 }
328 };
329
330 if let Some(msg) = record.get::<dbn::MboMsg>() {
331 if let Some(Data::Delta(delta)) = &data1 {
333 initialized_books.insert(delta.instrument_id);
334 } else {
335 continue; }
337
338 if let Some(Data::Delta(delta)) = &data1 {
339 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
340 buffer.push(*delta);
341
342 tracing::trace!(
343 "Buffering delta: {} {buffering_start:?} flags={}",
344 delta.ts_event,
345 msg.flags.raw(),
346 );
347
348 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
350 continue; }
352
353 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
355 continue; }
357
358 if let Some(start_ns) = buffering_start {
360 if delta.ts_event <= start_ns {
361 continue; }
363 buffering_start = None;
364 }
365
366 let buffer =
368 buffered_deltas
369 .remove(&delta.instrument_id)
370 .ok_or_else(|| {
371 anyhow::anyhow!(
372 "Internal error: no buffered deltas for instrument {id}",
373 id = delta.instrument_id
374 )
375 })?;
376 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
377 let deltas = OrderBookDeltas_API::new(deltas);
378 data1 = Some(Data::Deltas(deltas));
379 }
380 }
381
382 if let Some(data) = data1 {
383 self.send_msg(LiveMessage::Data(data)).await;
384 }
385
386 if let Some(data) = data2 {
387 self.send_msg(LiveMessage::Data(data)).await;
388 }
389 }
390 }
391
392 self.cmd_rx.close();
393 tracing::debug!("Closed command receiver");
394
395 Ok(())
396 }
397
398 async fn send_msg(&mut self, msg: LiveMessage) {
400 tracing::trace!("Sending {msg:?}");
401 match self.msg_tx.send(msg).await {
402 Ok(()) => {}
403 Err(e) => tracing::error!("Error sending message: {e}"),
404 }
405 }
406
407 fn read_symbol_venue_map(
413 &self,
414 ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
415 const MAX_WAIT_MS: u64 = 500; const INITIAL_DELAY_MICROS: u64 = 10;
418 const MAX_DELAY_MICROS: u64 = 1000;
419
420 let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
421 let mut delay = INITIAL_DELAY_MICROS;
422
423 loop {
424 match self.symbol_venue_map.try_read() {
425 Ok(guard) => return Ok(guard),
426 Err(std::sync::TryLockError::WouldBlock) => {
427 if std::time::Instant::now() >= deadline {
428 break;
429 }
430 std::thread::yield_now();
432
433 if std::time::Instant::now() < deadline {
435 let remaining = deadline - std::time::Instant::now();
436 let sleep_duration = StdDuration::from_micros(delay).min(remaining);
437 std::thread::sleep(sleep_duration);
438 delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
440 }
441 }
442 Err(std::sync::TryLockError::Poisoned(e)) => {
443 return Err(anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"));
444 }
445 }
446 }
447
448 Err(anyhow::anyhow!(
449 "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
450 ))
451 }
452}
453
454fn handle_error_msg(msg: &dbn::ErrorMsg) {
456 tracing::error!("{msg:?}");
457}
458
459fn handle_system_msg(msg: &dbn::SystemMsg) {
461 tracing::info!("{msg:?}");
462}
463
464fn handle_symbol_mapping_msg(
470 msg: &dbn::SymbolMappingMsg,
471 symbol_map: &mut PitSymbolMap,
472 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
473) -> anyhow::Result<()> {
474 symbol_map
475 .on_symbol_mapping(msg)
476 .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
477 instrument_id_map.remove(&msg.header().instrument_id);
478 Ok(())
479}
480
481fn update_instrument_id_map_with_exchange(
483 symbol_map: &PitSymbolMap,
484 symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
485 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
486 raw_instrument_id: u32,
487 exchange: &str,
488) -> anyhow::Result<InstrumentId> {
489 let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
490 anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
491 })?;
492 let symbol = Symbol::from(raw_symbol.as_str());
493 let venue = Venue::from_code(exchange)
494 .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
495 let instrument_id = InstrumentId::new(symbol, venue);
496 let mut map = symbol_venue_map
497 .write()
498 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
499 map.entry(symbol).or_insert(venue);
500 instrument_id_map.insert(raw_instrument_id, instrument_id);
501 Ok(instrument_id)
502}
503
504fn update_instrument_id_map(
505 record: &dbn::RecordRef,
506 symbol_map: &PitSymbolMap,
507 publisher_venue_map: &IndexMap<PublisherId, Venue>,
508 symbol_venue_map: &AHashMap<Symbol, Venue>,
509 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
510) -> anyhow::Result<InstrumentId> {
511 let header = record.header();
512
513 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
515 return Ok(instrument_id);
516 }
517
518 let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
519 anyhow::anyhow!(
520 "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
521 header.instrument_id
522 )
523 })?;
524
525 let symbol = Symbol::from_str_unchecked(raw_symbol);
526
527 let publisher_id = header.publisher_id;
528 let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
529 *venue
530 } else {
531 let venue = publisher_venue_map
532 .get(&publisher_id)
533 .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
534 *venue
535 };
536 let instrument_id = InstrumentId::new(symbol, venue);
537
538 instrument_id_map.insert(header.instrument_id, instrument_id);
539 Ok(instrument_id)
540}
541
542fn handle_instrument_def_msg(
548 msg: &dbn::InstrumentDefMsg,
549 record: &dbn::RecordRef,
550 symbol_map: &PitSymbolMap,
551 publisher_venue_map: &IndexMap<PublisherId, Venue>,
552 symbol_venue_map: &AHashMap<Symbol, Venue>,
553 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
554 ts_init: UnixNanos,
555) -> anyhow::Result<InstrumentAny> {
556 let instrument_id = update_instrument_id_map(
557 record,
558 symbol_map,
559 publisher_venue_map,
560 symbol_venue_map,
561 instrument_id_map,
562 )?;
563
564 decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
565}
566
567fn handle_status_msg(
568 msg: &dbn::StatusMsg,
569 record: &dbn::RecordRef,
570 symbol_map: &PitSymbolMap,
571 publisher_venue_map: &IndexMap<PublisherId, Venue>,
572 symbol_venue_map: &AHashMap<Symbol, Venue>,
573 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
574 ts_init: UnixNanos,
575) -> anyhow::Result<InstrumentStatus> {
576 let instrument_id = update_instrument_id_map(
577 record,
578 symbol_map,
579 publisher_venue_map,
580 symbol_venue_map,
581 instrument_id_map,
582 )?;
583
584 decode_status_msg(msg, instrument_id, Some(ts_init))
585}
586
587fn handle_imbalance_msg(
588 msg: &dbn::ImbalanceMsg,
589 record: &dbn::RecordRef,
590 symbol_map: &PitSymbolMap,
591 publisher_venue_map: &IndexMap<PublisherId, Venue>,
592 symbol_venue_map: &AHashMap<Symbol, Venue>,
593 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
594 ts_init: UnixNanos,
595) -> anyhow::Result<DatabentoImbalance> {
596 let instrument_id = update_instrument_id_map(
597 record,
598 symbol_map,
599 publisher_venue_map,
600 symbol_venue_map,
601 instrument_id_map,
602 )?;
603
604 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
607}
608
609fn handle_statistics_msg(
610 msg: &dbn::StatMsg,
611 record: &dbn::RecordRef,
612 symbol_map: &PitSymbolMap,
613 publisher_venue_map: &IndexMap<PublisherId, Venue>,
614 symbol_venue_map: &AHashMap<Symbol, Venue>,
615 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
616 ts_init: UnixNanos,
617) -> anyhow::Result<DatabentoStatistics> {
618 let instrument_id = update_instrument_id_map(
619 record,
620 symbol_map,
621 publisher_venue_map,
622 symbol_venue_map,
623 instrument_id_map,
624 )?;
625
626 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
629}
630
631#[allow(clippy::too_many_arguments)]
632fn handle_record(
633 record: dbn::RecordRef,
634 symbol_map: &PitSymbolMap,
635 publisher_venue_map: &IndexMap<PublisherId, Venue>,
636 symbol_venue_map: &AHashMap<Symbol, Venue>,
637 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
638 ts_init: UnixNanos,
639 initialized_books: &HashSet<InstrumentId>,
640 bars_timestamp_on_close: bool,
641) -> anyhow::Result<(Option<Data>, Option<Data>)> {
642 let instrument_id = update_instrument_id_map(
643 &record,
644 symbol_map,
645 publisher_venue_map,
646 symbol_venue_map,
647 instrument_id_map,
648 )?;
649
650 let price_precision = 2; let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
655 || record.get::<dbn::TbboMsg>().is_some()
656 || record.get::<dbn::Cmbp1Msg>().is_some()
657 {
658 true } else {
660 initialized_books.contains(&instrument_id) };
662
663 decode_record(
664 &record,
665 instrument_id,
666 price_precision,
667 Some(ts_init),
668 include_trades,
669 bars_timestamp_on_close,
670 )
671}