1use std::{
17 sync::{Arc, RwLock},
18 time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23 dbn::{self, PitSymbolMap, Record, SymbolIndex},
24 live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30 enums::RecordFlag,
31 identifiers::{InstrumentId, Symbol, Venue},
32 instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38 types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41 decode::{decode_instrument_def_msg, decode_record},
42 types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47 Subscribe(Subscription),
48 Start,
49 Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] pub enum LiveMessage {
55 Data(Data),
56 Instrument(InstrumentAny),
57 Status(InstrumentStatus),
58 Imbalance(DatabentoImbalance),
59 Statistics(DatabentoStatistics),
60 Error(anyhow::Error),
61 Close,
62}
63
64#[derive(Debug)]
78pub struct DatabentoFeedHandler {
79 key: String,
80 dataset: String,
81 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
82 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
83 publisher_venue_map: IndexMap<PublisherId, Venue>,
84 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
85 replay: bool,
86 use_exchange_as_venue: bool,
87 bars_timestamp_on_close: bool,
88}
89
90impl DatabentoFeedHandler {
91 #[must_use]
93 #[allow(clippy::too_many_arguments)]
94 pub const fn new(
95 key: String,
96 dataset: String,
97 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
98 tx: tokio::sync::mpsc::Sender<LiveMessage>,
99 publisher_venue_map: IndexMap<PublisherId, Venue>,
100 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
101 use_exchange_as_venue: bool,
102 bars_timestamp_on_close: bool,
103 ) -> Self {
104 Self {
105 key,
106 dataset,
107 cmd_rx: rx,
108 msg_tx: tx,
109 publisher_venue_map,
110 symbol_venue_map,
111 replay: false,
112 use_exchange_as_venue,
113 bars_timestamp_on_close,
114 }
115 }
116
117 #[allow(clippy::blocks_in_conditions)]
126 pub async fn run(&mut self) -> anyhow::Result<()> {
127 tracing::debug!("Running feed handler");
128 let clock = get_atomic_clock_realtime();
129 let mut symbol_map = PitSymbolMap::new();
130 let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
131
132 let mut buffering_start = None;
133 let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
134 let mut initialized_books = HashSet::new();
135 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
138 timeout,
139 databento::LiveClient::builder()
140 .user_agent_extension(NAUTILUS_USER_AGENT.into())
141 .key(self.key.clone())?
142 .dataset(self.dataset.clone())
143 .build(),
144 )
145 .await?;
146
147 tracing::info!("Connected");
148
149 let mut client = match result {
150 Ok(client) => client,
151 Err(e) => {
152 self.msg_tx.send(LiveMessage::Close).await?;
153 self.cmd_rx.close();
154 anyhow::bail!("Failed to connect to Databento LSG: {e}");
155 }
156 };
157
158 let timeout = Duration::from_millis(10);
160
161 let mut running = false;
163
164 loop {
165 if self.msg_tx.is_closed() {
166 tracing::debug!("Message channel was closed: stopping");
167 break;
168 }
169
170 match self.cmd_rx.try_recv() {
171 Ok(cmd) => {
172 tracing::debug!("Received command: {cmd:?}");
173 match cmd {
174 LiveCommand::Subscribe(sub) => {
175 if !self.replay && sub.start.is_some() {
176 self.replay = true;
177 }
178 client.subscribe(sub).await?;
179 }
180 LiveCommand::Start => {
181 buffering_start = if self.replay {
182 Some(clock.get_time_ns())
183 } else {
184 None
185 };
186 client.start().await?;
187 running = true;
188 tracing::debug!("Started");
189 }
190 LiveCommand::Close => {
191 self.msg_tx.send(LiveMessage::Close).await?;
192 if running {
193 client.close().await?;
194 tracing::debug!("Closed inner client");
195 }
196 break;
197 }
198 }
199 }
200 Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => {
202 tracing::debug!("Disconnected");
203 break;
204 }
205 }
206
207 if !running {
208 continue;
209 }
210
211 let result = tokio::time::timeout(timeout, client.next_record()).await;
213 let record_opt = match result {
214 Ok(record_opt) => record_opt,
215 Err(_) => continue, };
217
218 let record = match record_opt {
219 Ok(Some(record)) => record,
220 Ok(None) => break, Err(e) => {
222 self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
225 break;
226 }
227 };
228
229 let ts_init = clock.get_time_ns();
230
231 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
233 handle_error_msg(msg);
234 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
235 handle_system_msg(msg);
236 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
237 instrument_id_map.remove(&msg.hd.instrument_id);
239 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
240 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
241 if self.use_exchange_as_venue {
242 let exchange = msg.exchange()?;
243 if !exchange.is_empty() {
244 update_instrument_id_map_with_exchange(
245 &symbol_map,
246 &self.symbol_venue_map,
247 &mut instrument_id_map,
248 msg.hd.instrument_id,
249 exchange,
250 )?;
251 }
252 }
253 let data = {
254 let sym_map = self.read_symbol_venue_map()?;
255 handle_instrument_def_msg(
256 msg,
257 &record,
258 &symbol_map,
259 &self.publisher_venue_map,
260 &sym_map,
261 &mut instrument_id_map,
262 ts_init,
263 )?
264 };
265 self.send_msg(LiveMessage::Instrument(data)).await;
266 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
267 let data = {
268 let sym_map = self.read_symbol_venue_map()?;
269 handle_status_msg(
270 msg,
271 &record,
272 &symbol_map,
273 &self.publisher_venue_map,
274 &sym_map,
275 &mut instrument_id_map,
276 ts_init,
277 )?
278 };
279 self.send_msg(LiveMessage::Status(data)).await;
280 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
281 let data = {
282 let sym_map = self.read_symbol_venue_map()?;
283 handle_imbalance_msg(
284 msg,
285 &record,
286 &symbol_map,
287 &self.publisher_venue_map,
288 &sym_map,
289 &mut instrument_id_map,
290 ts_init,
291 )?
292 };
293 self.send_msg(LiveMessage::Imbalance(data)).await;
294 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
295 let data = {
296 let sym_map = self.read_symbol_venue_map()?;
297 handle_statistics_msg(
298 msg,
299 &record,
300 &symbol_map,
301 &self.publisher_venue_map,
302 &sym_map,
303 &mut instrument_id_map,
304 ts_init,
305 )?
306 };
307 self.send_msg(LiveMessage::Statistics(data)).await;
308 } else {
309 let (mut data1, data2) = match {
311 let sym_map = self.read_symbol_venue_map()?;
312 handle_record(
313 record,
314 &symbol_map,
315 &self.publisher_venue_map,
316 &sym_map,
317 &mut instrument_id_map,
318 ts_init,
319 &initialized_books,
320 self.bars_timestamp_on_close,
321 )
322 } {
323 Ok(decoded) => decoded,
324 Err(e) => {
325 tracing::error!("Error decoding record: {e}");
326 continue;
327 }
328 };
329
330 if let Some(msg) = record.get::<dbn::MboMsg>() {
331 if let Some(Data::Delta(delta)) = &data1 {
333 initialized_books.insert(delta.instrument_id);
334 } else {
335 continue; }
337
338 if let Some(Data::Delta(delta)) = &data1 {
339 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
340 buffer.push(*delta);
341
342 tracing::trace!(
343 "Buffering delta: {} {buffering_start:?} flags={}",
344 delta.ts_event,
345 msg.flags.raw(),
346 );
347
348 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
350 continue; }
352
353 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
355 continue; }
357
358 if let Some(start_ns) = buffering_start {
360 if delta.ts_event <= start_ns {
361 continue; }
363 buffering_start = None;
364 }
365
366 let buffer =
368 buffered_deltas
369 .remove(&delta.instrument_id)
370 .ok_or_else(|| {
371 anyhow::anyhow!(
372 "Internal error: no buffered deltas for instrument {id}",
373 id = delta.instrument_id
374 )
375 })?;
376 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
377 let deltas = OrderBookDeltas_API::new(deltas);
378 data1 = Some(Data::Deltas(deltas));
379 }
380 }
381
382 if let Some(data) = data1 {
383 self.send_msg(LiveMessage::Data(data)).await;
384 }
385
386 if let Some(data) = data2 {
387 self.send_msg(LiveMessage::Data(data)).await;
388 }
389 }
390 }
391
392 self.cmd_rx.close();
393 tracing::debug!("Closed command receiver");
394
395 Ok(())
396 }
397
398 async fn send_msg(&mut self, msg: LiveMessage) {
400 tracing::trace!("Sending {msg:?}");
401 match self.msg_tx.send(msg).await {
402 Ok(()) => {}
403 Err(e) => tracing::error!("Error sending message: {e}"),
404 }
405 }
406
407 fn read_symbol_venue_map(
413 &self,
414 ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
415 const MAX_WAIT_MS: u64 = 500; const INITIAL_DELAY_MICROS: u64 = 10;
418 const MAX_DELAY_MICROS: u64 = 1000;
419
420 let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
421 let mut delay = INITIAL_DELAY_MICROS;
422
423 loop {
424 match self.symbol_venue_map.try_read() {
425 Ok(guard) => return Ok(guard),
426 Err(std::sync::TryLockError::WouldBlock) => {
427 if std::time::Instant::now() >= deadline {
428 break;
429 }
430
431 std::thread::yield_now();
433
434 if std::time::Instant::now() < deadline {
436 let remaining = deadline - std::time::Instant::now();
437 let sleep_duration = StdDuration::from_micros(delay).min(remaining);
438 std::thread::sleep(sleep_duration);
439 delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
441 }
442 }
443 Err(std::sync::TryLockError::Poisoned(e)) => {
444 anyhow::bail!("symbol_venue_map lock poisoned: {e}");
445 }
446 }
447 }
448
449 anyhow::bail!(
450 "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
451 )
452 }
453}
454
455fn handle_error_msg(msg: &dbn::ErrorMsg) {
457 tracing::error!("{msg:?}");
458}
459
460fn handle_system_msg(msg: &dbn::SystemMsg) {
462 tracing::info!("{msg:?}");
463}
464
465fn handle_symbol_mapping_msg(
471 msg: &dbn::SymbolMappingMsg,
472 symbol_map: &mut PitSymbolMap,
473 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
474) -> anyhow::Result<()> {
475 symbol_map
476 .on_symbol_mapping(msg)
477 .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
478 instrument_id_map.remove(&msg.header().instrument_id);
479 Ok(())
480}
481
482fn update_instrument_id_map_with_exchange(
484 symbol_map: &PitSymbolMap,
485 symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
486 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
487 raw_instrument_id: u32,
488 exchange: &str,
489) -> anyhow::Result<InstrumentId> {
490 let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
491 anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
492 })?;
493 let symbol = Symbol::from(raw_symbol.as_str());
494 let venue = Venue::from_code(exchange)
495 .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
496 let instrument_id = InstrumentId::new(symbol, venue);
497 let mut map = symbol_venue_map
498 .write()
499 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
500 map.entry(symbol).or_insert(venue);
501 instrument_id_map.insert(raw_instrument_id, instrument_id);
502 Ok(instrument_id)
503}
504
505fn update_instrument_id_map(
506 record: &dbn::RecordRef,
507 symbol_map: &PitSymbolMap,
508 publisher_venue_map: &IndexMap<PublisherId, Venue>,
509 symbol_venue_map: &AHashMap<Symbol, Venue>,
510 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
511) -> anyhow::Result<InstrumentId> {
512 let header = record.header();
513
514 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
516 return Ok(instrument_id);
517 }
518
519 let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
520 anyhow::anyhow!(
521 "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
522 header.instrument_id
523 )
524 })?;
525
526 let symbol = Symbol::from_str_unchecked(raw_symbol);
527
528 let publisher_id = header.publisher_id;
529 let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
530 *venue
531 } else {
532 let venue = publisher_venue_map
533 .get(&publisher_id)
534 .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
535 *venue
536 };
537 let instrument_id = InstrumentId::new(symbol, venue);
538
539 instrument_id_map.insert(header.instrument_id, instrument_id);
540 Ok(instrument_id)
541}
542
543fn handle_instrument_def_msg(
549 msg: &dbn::InstrumentDefMsg,
550 record: &dbn::RecordRef,
551 symbol_map: &PitSymbolMap,
552 publisher_venue_map: &IndexMap<PublisherId, Venue>,
553 symbol_venue_map: &AHashMap<Symbol, Venue>,
554 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
555 ts_init: UnixNanos,
556) -> anyhow::Result<InstrumentAny> {
557 let instrument_id = update_instrument_id_map(
558 record,
559 symbol_map,
560 publisher_venue_map,
561 symbol_venue_map,
562 instrument_id_map,
563 )?;
564
565 decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
566}
567
568fn handle_status_msg(
569 msg: &dbn::StatusMsg,
570 record: &dbn::RecordRef,
571 symbol_map: &PitSymbolMap,
572 publisher_venue_map: &IndexMap<PublisherId, Venue>,
573 symbol_venue_map: &AHashMap<Symbol, Venue>,
574 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
575 ts_init: UnixNanos,
576) -> anyhow::Result<InstrumentStatus> {
577 let instrument_id = update_instrument_id_map(
578 record,
579 symbol_map,
580 publisher_venue_map,
581 symbol_venue_map,
582 instrument_id_map,
583 )?;
584
585 decode_status_msg(msg, instrument_id, Some(ts_init))
586}
587
588fn handle_imbalance_msg(
589 msg: &dbn::ImbalanceMsg,
590 record: &dbn::RecordRef,
591 symbol_map: &PitSymbolMap,
592 publisher_venue_map: &IndexMap<PublisherId, Venue>,
593 symbol_venue_map: &AHashMap<Symbol, Venue>,
594 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
595 ts_init: UnixNanos,
596) -> anyhow::Result<DatabentoImbalance> {
597 let instrument_id = update_instrument_id_map(
598 record,
599 symbol_map,
600 publisher_venue_map,
601 symbol_venue_map,
602 instrument_id_map,
603 )?;
604
605 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
608}
609
610fn handle_statistics_msg(
611 msg: &dbn::StatMsg,
612 record: &dbn::RecordRef,
613 symbol_map: &PitSymbolMap,
614 publisher_venue_map: &IndexMap<PublisherId, Venue>,
615 symbol_venue_map: &AHashMap<Symbol, Venue>,
616 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
617 ts_init: UnixNanos,
618) -> anyhow::Result<DatabentoStatistics> {
619 let instrument_id = update_instrument_id_map(
620 record,
621 symbol_map,
622 publisher_venue_map,
623 symbol_venue_map,
624 instrument_id_map,
625 )?;
626
627 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
630}
631
632#[allow(clippy::too_many_arguments)]
633fn handle_record(
634 record: dbn::RecordRef,
635 symbol_map: &PitSymbolMap,
636 publisher_venue_map: &IndexMap<PublisherId, Venue>,
637 symbol_venue_map: &AHashMap<Symbol, Venue>,
638 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
639 ts_init: UnixNanos,
640 initialized_books: &HashSet<InstrumentId>,
641 bars_timestamp_on_close: bool,
642) -> anyhow::Result<(Option<Data>, Option<Data>)> {
643 let instrument_id = update_instrument_id_map(
644 &record,
645 symbol_map,
646 publisher_venue_map,
647 symbol_venue_map,
648 instrument_id_map,
649 )?;
650
651 let price_precision = 2; let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
656 || record.get::<dbn::TbboMsg>().is_some()
657 || record.get::<dbn::Cmbp1Msg>().is_some()
658 {
659 true } else {
661 initialized_books.contains(&instrument_id) };
663
664 decode_record(
665 &record,
666 instrument_id,
667 price_precision,
668 Some(ts_init),
669 include_trades,
670 bars_timestamp_on_close,
671 )
672}