1use std::{
22 path::PathBuf,
23 sync::{
24 Arc, Mutex, RwLock,
25 atomic::{AtomicBool, Ordering},
26 },
27};
28
29use ahash::AHashMap;
30use databento::live::Subscription;
31use indexmap::IndexMap;
32use nautilus_common::{
33 messages::{
34 DataEvent,
35 data::{
36 RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
37 SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
38 UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
39 },
40 },
41 runner::get_data_event_sender,
42};
43use nautilus_core::time::AtomicTime;
44use nautilus_data::client::DataClient;
45use nautilus_model::{
46 enums::BarAggregation,
47 identifiers::{ClientId, Symbol, Venue},
48 instruments::Instrument,
49};
50use tokio::task::JoinHandle;
51use tokio_util::sync::CancellationToken;
52
53use crate::{
54 historical::{DatabentoHistoricalClient, RangeQueryParams},
55 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
56 loader::DatabentoDataLoader,
57 symbology::instrument_id_to_symbol_string,
58 types::PublisherId,
59};
60
61#[derive(Debug, Clone)]
63pub struct DatabentoDataClientConfig {
64 pub api_key: String,
66 pub publishers_filepath: PathBuf,
68 pub use_exchange_as_venue: bool,
70 pub bars_timestamp_on_close: bool,
72}
73
74impl DatabentoDataClientConfig {
75 #[must_use]
77 pub const fn new(
78 api_key: String,
79 publishers_filepath: PathBuf,
80 use_exchange_as_venue: bool,
81 bars_timestamp_on_close: bool,
82 ) -> Self {
83 Self {
84 api_key,
85 publishers_filepath,
86 use_exchange_as_venue,
87 bars_timestamp_on_close,
88 }
89 }
90}
91
92#[cfg_attr(feature = "python", pyo3::pyclass)]
98#[derive(Debug)]
99pub struct DatabentoDataClient {
100 client_id: ClientId,
102 config: DatabentoDataClientConfig,
104 is_connected: AtomicBool,
106 historical: DatabentoHistoricalClient,
108 loader: DatabentoDataLoader,
110 cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
112 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
114 cancellation_token: CancellationToken,
116 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
118 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
120 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
122}
123
124impl DatabentoDataClient {
125 pub fn new(
131 client_id: ClientId,
132 config: DatabentoDataClientConfig,
133 clock: &'static AtomicTime,
134 ) -> anyhow::Result<Self> {
135 let historical = DatabentoHistoricalClient::new(
136 config.api_key.clone(),
137 config.publishers_filepath.clone(),
138 clock,
139 config.use_exchange_as_venue,
140 )?;
141
142 let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
144
145 let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
147 let publishers_vec: Vec<crate::types::DatabentoPublisher> =
148 serde_json::from_str(&file_content)?;
149
150 let publisher_venue_map = publishers_vec
151 .into_iter()
152 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
153 .collect::<IndexMap<u16, Venue>>();
154
155 let data_sender = get_data_event_sender();
156
157 Ok(Self {
158 client_id,
159 config,
160 is_connected: AtomicBool::new(false),
161 historical,
162 loader,
163 cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
164 task_handles: Arc::new(Mutex::new(Vec::new())),
165 cancellation_token: CancellationToken::new(),
166 publisher_venue_map: Arc::new(publisher_venue_map),
167 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
168 data_sender,
169 })
170 }
171
172 fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
178 self.loader
179 .get_dataset_for_venue(&venue)
180 .map(ToString::to_string)
181 .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
182 }
183
184 fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
190 let mut channels = self.cmd_channels.lock().unwrap();
191
192 if !channels.contains_key(dataset) {
193 tracing::info!("Creating new feed handler for dataset: {dataset}");
194 let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
195 channels.insert(dataset.to_string(), cmd_tx);
196
197 tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
198 }
199
200 Ok(())
201 }
202
203 fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
209 let channels = self.cmd_channels.lock().unwrap();
210 if let Some(tx) = channels.get(dataset) {
211 tx.send(cmd)
212 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
213 } else {
214 anyhow::bail!("No feed handler found for dataset: {dataset}");
215 }
216 Ok(())
217 }
218
219 fn initialize_live_feed(
225 &self,
226 dataset: String,
227 ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
228 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
229 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
230
231 let mut feed_handler = DatabentoFeedHandler::new(
232 self.config.api_key.clone(),
233 dataset,
234 cmd_rx,
235 msg_tx,
236 (*self.publisher_venue_map).clone(),
237 self.symbol_venue_map.clone(),
238 self.config.use_exchange_as_venue,
239 self.config.bars_timestamp_on_close,
240 );
241
242 let cancellation_token = self.cancellation_token.clone();
243
244 let feed_handle = tokio::spawn(async move {
246 tokio::select! {
247 result = feed_handler.run() => {
248 if let Err(e) = result {
249 tracing::error!("Feed handler error: {e}");
250 }
251 }
252 () = cancellation_token.cancelled() => {
253 tracing::debug!("Feed handler cancelled");
254 }
255 }
256 });
257
258 let cancellation_token = self.cancellation_token.clone();
259 let data_sender = self.data_sender.clone();
260
261 let msg_handle = tokio::spawn(async move {
263 let mut msg_rx = msg_rx;
264 loop {
265 tokio::select! {
266 msg = msg_rx.recv() => {
267 match msg {
268 Some(LiveMessage::Data(data)) => {
269 tracing::debug!("Received data: {data:?}");
270 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
271 tracing::error!("Failed to send data event: {e}");
272 }
273 }
274 Some(LiveMessage::Instrument(instrument)) => {
275 tracing::debug!("Received instrument: {}", instrument.id());
276 }
278 Some(LiveMessage::Status(status)) => {
279 tracing::debug!("Received status: {status:?}");
280 }
282 Some(LiveMessage::Imbalance(imbalance)) => {
283 tracing::debug!("Received imbalance: {imbalance:?}");
284 }
286 Some(LiveMessage::Statistics(statistics)) => {
287 tracing::debug!("Received statistics: {statistics:?}");
288 }
290 Some(LiveMessage::Error(error)) => {
291 tracing::error!("Feed handler error: {error}");
292 }
294 Some(LiveMessage::Close) => {
295 tracing::info!("Feed handler closed");
296 break;
297 }
298 None => {
299 tracing::debug!("Message channel closed");
300 break;
301 }
302 }
303 }
304 () = cancellation_token.cancelled() => {
305 tracing::debug!("Message processing cancelled");
306 break;
307 }
308 }
309 }
310 });
311
312 {
313 let mut handles = self.task_handles.lock().unwrap();
314 handles.push(feed_handle);
315 handles.push(msg_handle);
316 }
317
318 Ok(cmd_tx)
319 }
320}
321
322#[async_trait::async_trait]
323impl DataClient for DatabentoDataClient {
324 fn client_id(&self) -> ClientId {
326 self.client_id
327 }
328
329 fn venue(&self) -> Option<Venue> {
331 None
332 }
333
334 fn start(&mut self) -> anyhow::Result<()> {
340 tracing::debug!("Starting");
341 Ok(())
342 }
343
344 fn stop(&mut self) -> anyhow::Result<()> {
350 tracing::debug!("Stopping");
351
352 self.cancellation_token.cancel();
354
355 let channels = self.cmd_channels.lock().unwrap();
357 for (dataset, tx) in channels.iter() {
358 if let Err(e) = tx.send(LiveCommand::Close) {
359 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
360 }
361 }
362
363 self.is_connected.store(false, Ordering::Relaxed);
364 Ok(())
365 }
366
367 fn reset(&mut self) -> anyhow::Result<()> {
368 tracing::debug!("Resetting");
369 self.is_connected.store(false, Ordering::Relaxed);
370 Ok(())
371 }
372
373 fn dispose(&mut self) -> anyhow::Result<()> {
374 tracing::debug!("Disposing");
375 self.stop()
376 }
377
378 async fn connect(&mut self) -> anyhow::Result<()> {
379 tracing::debug!("Connecting...");
380
381 self.is_connected.store(true, Ordering::Relaxed);
384
385 tracing::info!("Connected");
386 Ok(())
387 }
388
389 async fn disconnect(&mut self) -> anyhow::Result<()> {
390 tracing::debug!("Disconnecting...");
391
392 self.cancellation_token.cancel();
394
395 {
397 let channels = self.cmd_channels.lock().unwrap();
398 for (dataset, tx) in channels.iter() {
399 if let Err(e) = tx.send(LiveCommand::Close) {
400 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
401 }
402 }
403 }
404
405 let handles = {
407 let mut task_handles = self.task_handles.lock().unwrap();
408 std::mem::take(&mut *task_handles)
409 };
410
411 for handle in handles {
412 if let Err(e) = handle.await
413 && !e.is_cancelled()
414 {
415 tracing::error!("Task join error: {e}");
416 }
417 }
418
419 self.is_connected.store(false, Ordering::Relaxed);
420
421 {
422 let mut channels = self.cmd_channels.lock().unwrap();
423 channels.clear();
424 }
425
426 tracing::info!("Disconnected");
427 Ok(())
428 }
429
430 fn is_connected(&self) -> bool {
432 self.is_connected.load(Ordering::Relaxed)
433 }
434
435 fn is_disconnected(&self) -> bool {
436 !self.is_connected()
437 }
438
439 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
445 tracing::debug!("Subscribe quotes: {cmd:?}");
446
447 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
448 let was_new_handler = {
449 let channels = self.cmd_channels.lock().unwrap();
450 !channels.contains_key(&dataset)
451 };
452
453 self.get_or_create_feed_handler(&dataset)?;
454
455 if was_new_handler {
457 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
458 }
459
460 let symbol = instrument_id_to_symbol_string(
461 cmd.instrument_id,
462 &mut self.symbol_venue_map.write().unwrap(),
463 );
464
465 let subscription = Subscription::builder()
466 .schema(databento::dbn::Schema::Mbp1) .symbols(symbol)
468 .build();
469
470 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
471
472 Ok(())
473 }
474
475 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
481 tracing::debug!("Subscribe trades: {cmd:?}");
482
483 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
484 let was_new_handler = {
485 let channels = self.cmd_channels.lock().unwrap();
486 !channels.contains_key(&dataset)
487 };
488
489 self.get_or_create_feed_handler(&dataset)?;
490
491 if was_new_handler {
493 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
494 }
495
496 let symbol = instrument_id_to_symbol_string(
497 cmd.instrument_id,
498 &mut self.symbol_venue_map.write().unwrap(),
499 );
500
501 let subscription = Subscription::builder()
502 .schema(databento::dbn::Schema::Trades)
503 .symbols(symbol)
504 .build();
505
506 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
507
508 Ok(())
509 }
510
511 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
517 tracing::debug!("Subscribe book deltas: {cmd:?}");
518
519 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
520 let was_new_handler = {
521 let channels = self.cmd_channels.lock().unwrap();
522 !channels.contains_key(&dataset)
523 };
524
525 self.get_or_create_feed_handler(&dataset)?;
526
527 if was_new_handler {
529 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
530 }
531
532 let symbol = instrument_id_to_symbol_string(
533 cmd.instrument_id,
534 &mut self.symbol_venue_map.write().unwrap(),
535 );
536
537 let subscription = Subscription::builder()
538 .schema(databento::dbn::Schema::Mbo) .symbols(symbol)
540 .build();
541
542 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
543
544 Ok(())
545 }
546
547 fn subscribe_instrument_status(
553 &mut self,
554 cmd: &SubscribeInstrumentStatus,
555 ) -> anyhow::Result<()> {
556 tracing::debug!("Subscribe instrument status: {cmd:?}");
557
558 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
559 let was_new_handler = {
560 let channels = self.cmd_channels.lock().unwrap();
561 !channels.contains_key(&dataset)
562 };
563
564 self.get_or_create_feed_handler(&dataset)?;
565
566 if was_new_handler {
568 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
569 }
570
571 let symbol = instrument_id_to_symbol_string(
572 cmd.instrument_id,
573 &mut self.symbol_venue_map.write().unwrap(),
574 );
575
576 let subscription = Subscription::builder()
577 .schema(databento::dbn::Schema::Status)
578 .symbols(symbol)
579 .build();
580
581 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
582
583 Ok(())
584 }
585
586 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
588 tracing::debug!("Unsubscribe quotes: {cmd:?}");
589
590 tracing::warn!(
594 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
595 cmd.instrument_id
596 );
597
598 Ok(())
599 }
600
601 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
602 tracing::debug!("Unsubscribe trades: {cmd:?}");
603
604 tracing::warn!(
608 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
609 cmd.instrument_id
610 );
611
612 Ok(())
613 }
614
615 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
616 tracing::debug!("Unsubscribe book deltas: {cmd:?}");
617
618 tracing::warn!(
622 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
623 cmd.instrument_id
624 );
625
626 Ok(())
627 }
628
629 fn unsubscribe_instrument_status(
630 &mut self,
631 cmd: &UnsubscribeInstrumentStatus,
632 ) -> anyhow::Result<()> {
633 tracing::debug!("Unsubscribe instrument status: {cmd:?}");
634
635 tracing::warn!(
639 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
640 cmd.instrument_id
641 );
642
643 Ok(())
644 }
645
646 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
648 tracing::debug!("Request instruments: {request:?}");
649
650 let historical_client = self.historical.clone();
651 let request = request.clone();
652
653 tokio::spawn(async move {
654 let symbols = vec!["ALL_SYMBOLS".to_string()]; let params = RangeQueryParams {
659 dataset: "GLBX.MDP3".to_string(), symbols,
661 start: request
662 .start
663 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
664 .into(),
665 end: request
666 .end
667 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
668 .map(Into::into),
669 limit: None,
670 price_precision: None,
671 };
672
673 match historical_client.get_range_instruments(params).await {
674 Ok(instruments) => {
675 tracing::info!("Retrieved {} instruments", instruments.len());
676 }
678 Err(e) => {
679 tracing::error!("Failed to request instruments: {e}");
680 }
681 }
682 });
683
684 Ok(())
685 }
686
687 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
688 tracing::debug!("Request quotes: {request:?}");
689
690 let historical_client = self.historical.clone();
691 let request = request.clone();
692
693 tokio::spawn(async move {
694 let symbols = vec![instrument_id_to_symbol_string(
695 request.instrument_id,
696 &mut AHashMap::new(), )];
698
699 let params = RangeQueryParams {
700 dataset: "GLBX.MDP3".to_string(), symbols,
702 start: request
703 .start
704 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
705 .into(),
706 end: request
707 .end
708 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
709 .map(Into::into),
710 limit: request.limit.map(|l| l.get() as u64),
711 price_precision: None,
712 };
713
714 match historical_client.get_range_quotes(params, None).await {
715 Ok(quotes) => {
716 tracing::info!("Retrieved {} quotes", quotes.len());
717 }
719 Err(e) => {
720 tracing::error!("Failed to request quotes: {e}");
721 }
722 }
723 });
724
725 Ok(())
726 }
727
728 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
729 tracing::debug!("Request trades: {request:?}");
730
731 let historical_client = self.historical.clone();
732 let request = request.clone();
733
734 tokio::spawn(async move {
735 let symbols = vec![instrument_id_to_symbol_string(
736 request.instrument_id,
737 &mut AHashMap::new(), )];
739
740 let params = RangeQueryParams {
741 dataset: "GLBX.MDP3".to_string(), symbols,
743 start: request
744 .start
745 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
746 .into(),
747 end: request
748 .end
749 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
750 .map(Into::into),
751 limit: request.limit.map(|l| l.get() as u64),
752 price_precision: None,
753 };
754
755 match historical_client.get_range_trades(params).await {
756 Ok(trades) => {
757 tracing::info!("Retrieved {} trades", trades.len());
758 }
760 Err(e) => {
761 tracing::error!("Failed to request trades: {e}");
762 }
763 }
764 });
765
766 Ok(())
767 }
768
769 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
770 tracing::debug!("Request bars: {request:?}");
771
772 let historical_client = self.historical.clone();
773 let request = request.clone();
774
775 tokio::spawn(async move {
776 let symbols = vec![instrument_id_to_symbol_string(
777 request.bar_type.instrument_id(),
778 &mut AHashMap::new(), )];
780
781 let params = RangeQueryParams {
782 dataset: "GLBX.MDP3".to_string(), symbols,
784 start: request
785 .start
786 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
787 .into(),
788 end: request
789 .end
790 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
791 .map(Into::into),
792 limit: request.limit.map(|l| l.get() as u64),
793 price_precision: None,
794 };
795
796 let aggregation = match request.bar_type.spec().aggregation {
798 BarAggregation::Second => BarAggregation::Second,
799 BarAggregation::Minute => BarAggregation::Minute,
800 BarAggregation::Hour => BarAggregation::Hour,
801 BarAggregation::Day => BarAggregation::Day,
802 _ => {
803 tracing::error!(
804 "Unsupported bar aggregation: {:?}",
805 request.bar_type.spec().aggregation
806 );
807 return;
808 }
809 };
810
811 match historical_client
812 .get_range_bars(params, aggregation, true)
813 .await
814 {
815 Ok(bars) => {
816 tracing::info!("Retrieved {} bars", bars.len());
817 }
819 Err(e) => {
820 tracing::error!("Failed to request bars: {e}");
821 }
822 }
823 });
824
825 Ok(())
826 }
827}