1use std::{
22 fmt::Debug,
23 path::PathBuf,
24 sync::{
25 Arc, Mutex, RwLock,
26 atomic::{AtomicBool, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34 live::{runner::get_data_event_sender, runtime::get_runtime},
35 messages::{
36 DataEvent,
37 data::{
38 RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
39 SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
40 UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43};
44use nautilus_core::{MUTEX_POISONED, time::AtomicTime};
45use nautilus_data::client::DataClient;
46use nautilus_model::{
47 enums::BarAggregation,
48 identifiers::{ClientId, Symbol, Venue},
49 instruments::Instrument,
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 common::Credential,
56 historical::{DatabentoHistoricalClient, RangeQueryParams},
57 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
58 loader::DatabentoDataLoader,
59 symbology::instrument_id_to_symbol_string,
60 types::PublisherId,
61};
62
63#[derive(Clone)]
65pub struct DatabentoDataClientConfig {
66 credential: Credential,
68 pub publishers_filepath: PathBuf,
70 pub use_exchange_as_venue: bool,
72 pub bars_timestamp_on_close: bool,
74 pub reconnect_timeout_mins: Option<u64>,
76 pub http_proxy_url: Option<String>,
78 pub ws_proxy_url: Option<String>,
80}
81
82impl Debug for DatabentoDataClientConfig {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 f.debug_struct("DatabentoDataClientConfig")
85 .field("credential", &"<redacted>")
86 .field("publishers_filepath", &self.publishers_filepath)
87 .field("use_exchange_as_venue", &self.use_exchange_as_venue)
88 .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
89 .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
90 .field("http_proxy_url", &self.http_proxy_url)
91 .field("ws_proxy_url", &self.ws_proxy_url)
92 .finish()
93 }
94}
95
96impl DatabentoDataClientConfig {
97 #[must_use]
99 pub fn new(
100 api_key: impl Into<String>,
101 publishers_filepath: PathBuf,
102 use_exchange_as_venue: bool,
103 bars_timestamp_on_close: bool,
104 ) -> Self {
105 Self {
106 credential: Credential::new(api_key),
107 publishers_filepath,
108 use_exchange_as_venue,
109 bars_timestamp_on_close,
110 reconnect_timeout_mins: Some(10), http_proxy_url: None,
112 ws_proxy_url: None,
113 }
114 }
115
116 #[must_use]
118 pub fn api_key(&self) -> &str {
119 self.credential.api_key()
120 }
121
122 #[must_use]
124 pub fn api_key_masked(&self) -> String {
125 self.credential.api_key_masked()
126 }
127}
128
129#[cfg_attr(feature = "python", pyo3::pyclass)]
135#[derive(Debug)]
136pub struct DatabentoDataClient {
137 client_id: ClientId,
139 config: DatabentoDataClientConfig,
141 is_connected: AtomicBool,
143 historical: DatabentoHistoricalClient,
145 loader: DatabentoDataLoader,
147 cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
149 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
151 cancellation_token: CancellationToken,
153 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
155 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
157 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
159}
160
161impl DatabentoDataClient {
162 pub fn new(
168 client_id: ClientId,
169 config: DatabentoDataClientConfig,
170 clock: &'static AtomicTime,
171 ) -> anyhow::Result<Self> {
172 let historical = DatabentoHistoricalClient::new(
173 config.api_key().to_string(),
174 config.publishers_filepath.clone(),
175 clock,
176 config.use_exchange_as_venue,
177 )?;
178
179 let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
181
182 let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
184 let publishers_vec: Vec<crate::types::DatabentoPublisher> =
185 serde_json::from_str(&file_content)?;
186
187 let publisher_venue_map = publishers_vec
188 .into_iter()
189 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
190 .collect::<IndexMap<u16, Venue>>();
191
192 let data_sender = get_data_event_sender();
193
194 Ok(Self {
195 client_id,
196 config,
197 is_connected: AtomicBool::new(false),
198 historical,
199 loader,
200 cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
201 task_handles: Arc::new(Mutex::new(Vec::new())),
202 cancellation_token: CancellationToken::new(),
203 publisher_venue_map: Arc::new(publisher_venue_map),
204 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
205 data_sender,
206 })
207 }
208
209 fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
215 self.loader
216 .get_dataset_for_venue(&venue)
217 .map(ToString::to_string)
218 .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
219 }
220
221 fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
227 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
228
229 if !channels.contains_key(dataset) {
230 tracing::info!("Creating new feed handler for dataset: {dataset}");
231 let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
232 channels.insert(dataset.to_string(), cmd_tx);
233
234 tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
235 }
236
237 Ok(())
238 }
239
240 fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
246 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
247 if let Some(tx) = channels.get(dataset) {
248 tx.send(cmd)
249 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
250 } else {
251 anyhow::bail!("No feed handler found for dataset: {dataset}");
252 }
253 Ok(())
254 }
255
256 fn initialize_live_feed(
262 &self,
263 dataset: String,
264 ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
265 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
266 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
267
268 let mut feed_handler = DatabentoFeedHandler::new(
269 self.config.api_key().to_string(),
270 dataset,
271 cmd_rx,
272 msg_tx,
273 (*self.publisher_venue_map).clone(),
274 self.symbol_venue_map.clone(),
275 self.config.use_exchange_as_venue,
276 self.config.bars_timestamp_on_close,
277 self.config.reconnect_timeout_mins,
278 );
279
280 let cancellation_token = self.cancellation_token.clone();
281
282 let feed_handle = get_runtime().spawn(async move {
284 tokio::select! {
285 result = feed_handler.run() => {
286 if let Err(e) = result {
287 tracing::error!("Feed handler error: {e}");
288 }
289 }
290 () = cancellation_token.cancelled() => {
291 tracing::debug!("Feed handler cancelled");
292 }
293 }
294 });
295
296 let cancellation_token = self.cancellation_token.clone();
297 let data_sender = self.data_sender.clone();
298
299 let msg_handle = get_runtime().spawn(async move {
301 let mut msg_rx = msg_rx;
302 loop {
303 tokio::select! {
304 msg = msg_rx.recv() => {
305 match msg {
306 Some(LiveMessage::Data(data)) => {
307 tracing::debug!("Received data: {data:?}");
308 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
309 tracing::error!("Failed to send data event: {e}");
310 }
311 }
312 Some(LiveMessage::Instrument(instrument)) => {
313 tracing::debug!("Received instrument: {}", instrument.id());
314 }
316 Some(LiveMessage::Status(status)) => {
317 tracing::debug!("Received status: {status:?}");
318 }
320 Some(LiveMessage::Imbalance(imbalance)) => {
321 tracing::debug!("Received imbalance: {imbalance:?}");
322 }
324 Some(LiveMessage::Statistics(statistics)) => {
325 tracing::debug!("Received statistics: {statistics:?}");
326 }
328 Some(LiveMessage::SubscriptionAck(ack)) => {
329 tracing::debug!("Received subscription ack: {}", ack.message);
330 }
331 Some(LiveMessage::Error(error)) => {
332 tracing::error!("Feed handler error: {error}");
333 }
335 Some(LiveMessage::Close) => {
336 tracing::info!("Feed handler closed");
337 break;
338 }
339 None => {
340 tracing::debug!("Message channel closed");
341 break;
342 }
343 }
344 }
345 () = cancellation_token.cancelled() => {
346 tracing::debug!("Message processing cancelled");
347 break;
348 }
349 }
350 }
351 });
352
353 {
354 let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
355 handles.push(feed_handle);
356 handles.push(msg_handle);
357 }
358
359 Ok(cmd_tx)
360 }
361}
362
363#[async_trait::async_trait(?Send)]
364impl DataClient for DatabentoDataClient {
365 fn client_id(&self) -> ClientId {
367 self.client_id
368 }
369
370 fn venue(&self) -> Option<Venue> {
372 None
373 }
374
375 fn start(&mut self) -> anyhow::Result<()> {
381 tracing::debug!("Starting");
382 Ok(())
383 }
384
385 fn stop(&mut self) -> anyhow::Result<()> {
391 tracing::debug!("Stopping");
392
393 self.cancellation_token.cancel();
395
396 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
398 for (dataset, tx) in channels.iter() {
399 if let Err(e) = tx.send(LiveCommand::Close) {
400 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
401 }
402 }
403
404 self.is_connected.store(false, Ordering::Relaxed);
405 Ok(())
406 }
407
408 fn reset(&mut self) -> anyhow::Result<()> {
409 tracing::debug!("Resetting");
410 self.is_connected.store(false, Ordering::Relaxed);
411 Ok(())
412 }
413
414 fn dispose(&mut self) -> anyhow::Result<()> {
415 tracing::debug!("Disposing");
416 self.stop()
417 }
418
419 async fn connect(&mut self) -> anyhow::Result<()> {
420 tracing::debug!("Connecting...");
421
422 self.is_connected.store(true, Ordering::Relaxed);
425
426 tracing::info!("Connected");
427 Ok(())
428 }
429
430 async fn disconnect(&mut self) -> anyhow::Result<()> {
431 tracing::debug!("Disconnecting...");
432
433 self.cancellation_token.cancel();
435
436 {
438 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
439 for (dataset, tx) in channels.iter() {
440 if let Err(e) = tx.send(LiveCommand::Close) {
441 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
442 }
443 }
444 }
445
446 let handles = {
448 let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
449 std::mem::take(&mut *task_handles)
450 };
451
452 for handle in handles {
453 if let Err(e) = handle.await
454 && !e.is_cancelled()
455 {
456 tracing::error!("Task join error: {e}");
457 }
458 }
459
460 self.is_connected.store(false, Ordering::Relaxed);
461
462 {
463 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
464 channels.clear();
465 }
466
467 tracing::info!("Disconnected");
468 Ok(())
469 }
470
471 fn is_connected(&self) -> bool {
473 self.is_connected.load(Ordering::Relaxed)
474 }
475
476 fn is_disconnected(&self) -> bool {
477 !self.is_connected()
478 }
479
480 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
486 tracing::debug!("Subscribe quotes: {cmd:?}");
487
488 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
489 let was_new_handler = {
490 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
491 !channels.contains_key(&dataset)
492 };
493
494 self.get_or_create_feed_handler(&dataset)?;
495
496 if was_new_handler {
498 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
499 }
500
501 let symbol = instrument_id_to_symbol_string(
502 cmd.instrument_id,
503 &mut self.symbol_venue_map.write().unwrap(),
504 );
505
506 let subscription = Subscription::builder()
507 .schema(databento::dbn::Schema::Mbp1) .symbols(symbol)
509 .build();
510
511 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
512
513 Ok(())
514 }
515
516 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
522 tracing::debug!("Subscribe trades: {cmd:?}");
523
524 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
525 let was_new_handler = {
526 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
527 !channels.contains_key(&dataset)
528 };
529
530 self.get_or_create_feed_handler(&dataset)?;
531
532 if was_new_handler {
534 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
535 }
536
537 let symbol = instrument_id_to_symbol_string(
538 cmd.instrument_id,
539 &mut self.symbol_venue_map.write().unwrap(),
540 );
541
542 let subscription = Subscription::builder()
543 .schema(databento::dbn::Schema::Trades)
544 .symbols(symbol)
545 .build();
546
547 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
548
549 Ok(())
550 }
551
552 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
558 tracing::debug!("Subscribe book deltas: {cmd:?}");
559
560 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
561 let was_new_handler = {
562 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
563 !channels.contains_key(&dataset)
564 };
565
566 self.get_or_create_feed_handler(&dataset)?;
567
568 if was_new_handler {
570 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
571 }
572
573 let symbol = instrument_id_to_symbol_string(
574 cmd.instrument_id,
575 &mut self.symbol_venue_map.write().unwrap(),
576 );
577
578 let subscription = Subscription::builder()
579 .schema(databento::dbn::Schema::Mbo) .symbols(symbol)
581 .build();
582
583 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
584
585 Ok(())
586 }
587
588 fn subscribe_instrument_status(
594 &mut self,
595 cmd: &SubscribeInstrumentStatus,
596 ) -> anyhow::Result<()> {
597 tracing::debug!("Subscribe instrument status: {cmd:?}");
598
599 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
600 let was_new_handler = {
601 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
602 !channels.contains_key(&dataset)
603 };
604
605 self.get_or_create_feed_handler(&dataset)?;
606
607 if was_new_handler {
609 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
610 }
611
612 let symbol = instrument_id_to_symbol_string(
613 cmd.instrument_id,
614 &mut self.symbol_venue_map.write().unwrap(),
615 );
616
617 let subscription = Subscription::builder()
618 .schema(databento::dbn::Schema::Status)
619 .symbols(symbol)
620 .build();
621
622 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
623
624 Ok(())
625 }
626
627 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
629 tracing::debug!("Unsubscribe quotes: {cmd:?}");
630
631 tracing::warn!(
635 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
636 cmd.instrument_id
637 );
638
639 Ok(())
640 }
641
642 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
643 tracing::debug!("Unsubscribe trades: {cmd:?}");
644
645 tracing::warn!(
649 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
650 cmd.instrument_id
651 );
652
653 Ok(())
654 }
655
656 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
657 tracing::debug!("Unsubscribe book deltas: {cmd:?}");
658
659 tracing::warn!(
663 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
664 cmd.instrument_id
665 );
666
667 Ok(())
668 }
669
670 fn unsubscribe_instrument_status(
671 &mut self,
672 cmd: &UnsubscribeInstrumentStatus,
673 ) -> anyhow::Result<()> {
674 tracing::debug!("Unsubscribe instrument status: {cmd:?}");
675
676 tracing::warn!(
680 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
681 cmd.instrument_id
682 );
683
684 Ok(())
685 }
686
687 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
689 tracing::debug!("Request instruments: {request:?}");
690
691 let historical_client = self.historical.clone();
692 let request = request.clone();
693
694 get_runtime().spawn(async move {
695 let symbols = vec!["ALL_SYMBOLS".to_string()]; let params = RangeQueryParams {
700 dataset: "GLBX.MDP3".to_string(), symbols,
702 start: request
703 .start
704 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
705 .into(),
706 end: request
707 .end
708 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
709 .map(Into::into),
710 limit: None,
711 price_precision: None,
712 };
713
714 match historical_client.get_range_instruments(params).await {
715 Ok(instruments) => {
716 tracing::info!("Retrieved {} instruments", instruments.len());
717 }
719 Err(e) => {
720 tracing::error!("Failed to request instruments: {e}");
721 }
722 }
723 });
724
725 Ok(())
726 }
727
728 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
729 tracing::debug!("Request quotes: {request:?}");
730
731 let historical_client = self.historical.clone();
732 let request = request.clone();
733
734 get_runtime().spawn(async move {
735 let symbols = vec![instrument_id_to_symbol_string(
736 request.instrument_id,
737 &mut AHashMap::new(), )];
739
740 let params = RangeQueryParams {
741 dataset: "GLBX.MDP3".to_string(), symbols,
743 start: request
744 .start
745 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
746 .into(),
747 end: request
748 .end
749 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
750 .map(Into::into),
751 limit: request.limit.map(|l| l.get() as u64),
752 price_precision: None,
753 };
754
755 match historical_client.get_range_quotes(params, None).await {
756 Ok(quotes) => {
757 tracing::info!("Retrieved {} quotes", quotes.len());
758 }
760 Err(e) => {
761 tracing::error!("Failed to request quotes: {e}");
762 }
763 }
764 });
765
766 Ok(())
767 }
768
769 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
770 tracing::debug!("Request trades: {request:?}");
771
772 let historical_client = self.historical.clone();
773 let request = request.clone();
774
775 get_runtime().spawn(async move {
776 let symbols = vec![instrument_id_to_symbol_string(
777 request.instrument_id,
778 &mut AHashMap::new(), )];
780
781 let params = RangeQueryParams {
782 dataset: "GLBX.MDP3".to_string(), symbols,
784 start: request
785 .start
786 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
787 .into(),
788 end: request
789 .end
790 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
791 .map(Into::into),
792 limit: request.limit.map(|l| l.get() as u64),
793 price_precision: None,
794 };
795
796 match historical_client.get_range_trades(params).await {
797 Ok(trades) => {
798 tracing::info!("Retrieved {} trades", trades.len());
799 }
801 Err(e) => {
802 tracing::error!("Failed to request trades: {e}");
803 }
804 }
805 });
806
807 Ok(())
808 }
809
810 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
811 tracing::debug!("Request bars: {request:?}");
812
813 let historical_client = self.historical.clone();
814 let request = request.clone();
815
816 get_runtime().spawn(async move {
817 let symbols = vec![instrument_id_to_symbol_string(
818 request.bar_type.instrument_id(),
819 &mut AHashMap::new(), )];
821
822 let params = RangeQueryParams {
823 dataset: "GLBX.MDP3".to_string(), symbols,
825 start: request
826 .start
827 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
828 .into(),
829 end: request
830 .end
831 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
832 .map(Into::into),
833 limit: request.limit.map(|l| l.get() as u64),
834 price_precision: None,
835 };
836
837 let aggregation = match request.bar_type.spec().aggregation {
839 BarAggregation::Second => BarAggregation::Second,
840 BarAggregation::Minute => BarAggregation::Minute,
841 BarAggregation::Hour => BarAggregation::Hour,
842 BarAggregation::Day => BarAggregation::Day,
843 _ => {
844 tracing::error!(
845 "Unsupported bar aggregation: {:?}",
846 request.bar_type.spec().aggregation
847 );
848 return;
849 }
850 };
851
852 match historical_client
853 .get_range_bars(params, aggregation, true)
854 .await
855 {
856 Ok(bars) => {
857 tracing::info!("Retrieved {} bars", bars.len());
858 }
860 Err(e) => {
861 tracing::error!("Failed to request bars: {e}");
862 }
863 }
864 });
865
866 Ok(())
867 }
868}