1use std::{
22 fmt::Debug,
23 path::PathBuf,
24 sync::{
25 Arc, Mutex, RwLock,
26 atomic::{AtomicBool, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34 messages::{
35 DataEvent,
36 data::{
37 RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
38 SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
39 UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
40 },
41 },
42 runner::get_data_event_sender,
43};
44use nautilus_core::{MUTEX_POISONED, time::AtomicTime};
45use nautilus_data::client::DataClient;
46use nautilus_model::{
47 enums::BarAggregation,
48 identifiers::{ClientId, Symbol, Venue},
49 instruments::Instrument,
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 common::Credential,
56 historical::{DatabentoHistoricalClient, RangeQueryParams},
57 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
58 loader::DatabentoDataLoader,
59 symbology::instrument_id_to_symbol_string,
60 types::PublisherId,
61};
62
63#[derive(Clone)]
65pub struct DatabentoDataClientConfig {
66 credential: Credential,
68 pub publishers_filepath: PathBuf,
70 pub use_exchange_as_venue: bool,
72 pub bars_timestamp_on_close: bool,
74 pub reconnect_timeout_mins: Option<u64>,
76 pub http_proxy_url: Option<String>,
78 pub ws_proxy_url: Option<String>,
80}
81
82impl Debug for DatabentoDataClientConfig {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 f.debug_struct("DatabentoDataClientConfig")
85 .field("credential", &"<redacted>")
86 .field("publishers_filepath", &self.publishers_filepath)
87 .field("use_exchange_as_venue", &self.use_exchange_as_venue)
88 .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
89 .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
90 .field("http_proxy_url", &self.http_proxy_url)
91 .field("ws_proxy_url", &self.ws_proxy_url)
92 .finish()
93 }
94}
95
96impl DatabentoDataClientConfig {
97 #[must_use]
99 pub fn new(
100 api_key: impl Into<String>,
101 publishers_filepath: PathBuf,
102 use_exchange_as_venue: bool,
103 bars_timestamp_on_close: bool,
104 ) -> Self {
105 Self {
106 credential: Credential::new(api_key),
107 publishers_filepath,
108 use_exchange_as_venue,
109 bars_timestamp_on_close,
110 reconnect_timeout_mins: Some(10), http_proxy_url: None,
112 ws_proxy_url: None,
113 }
114 }
115
116 #[must_use]
118 pub fn api_key(&self) -> &str {
119 self.credential.api_key()
120 }
121
122 #[must_use]
124 pub fn api_key_masked(&self) -> String {
125 self.credential.api_key_masked()
126 }
127}
128
129#[cfg_attr(feature = "python", pyo3::pyclass)]
135#[derive(Debug)]
136pub struct DatabentoDataClient {
137 client_id: ClientId,
139 config: DatabentoDataClientConfig,
141 is_connected: AtomicBool,
143 historical: DatabentoHistoricalClient,
145 loader: DatabentoDataLoader,
147 cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
149 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
151 cancellation_token: CancellationToken,
153 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
155 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
157 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
159}
160
161impl DatabentoDataClient {
162 pub fn new(
168 client_id: ClientId,
169 config: DatabentoDataClientConfig,
170 clock: &'static AtomicTime,
171 ) -> anyhow::Result<Self> {
172 let historical = DatabentoHistoricalClient::new(
173 config.api_key().to_string(),
174 config.publishers_filepath.clone(),
175 clock,
176 config.use_exchange_as_venue,
177 )?;
178
179 let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
181
182 let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
184 let publishers_vec: Vec<crate::types::DatabentoPublisher> =
185 serde_json::from_str(&file_content)?;
186
187 let publisher_venue_map = publishers_vec
188 .into_iter()
189 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
190 .collect::<IndexMap<u16, Venue>>();
191
192 let data_sender = get_data_event_sender();
193
194 Ok(Self {
195 client_id,
196 config,
197 is_connected: AtomicBool::new(false),
198 historical,
199 loader,
200 cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
201 task_handles: Arc::new(Mutex::new(Vec::new())),
202 cancellation_token: CancellationToken::new(),
203 publisher_venue_map: Arc::new(publisher_venue_map),
204 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
205 data_sender,
206 })
207 }
208
209 fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
215 self.loader
216 .get_dataset_for_venue(&venue)
217 .map(ToString::to_string)
218 .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
219 }
220
221 fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
227 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
228
229 if !channels.contains_key(dataset) {
230 tracing::info!("Creating new feed handler for dataset: {dataset}");
231 let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
232 channels.insert(dataset.to_string(), cmd_tx);
233
234 tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
235 }
236
237 Ok(())
238 }
239
240 fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
246 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
247 if let Some(tx) = channels.get(dataset) {
248 tx.send(cmd)
249 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
250 } else {
251 anyhow::bail!("No feed handler found for dataset: {dataset}");
252 }
253 Ok(())
254 }
255
256 fn initialize_live_feed(
262 &self,
263 dataset: String,
264 ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
265 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
266 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
267
268 let mut feed_handler = DatabentoFeedHandler::new(
269 self.config.api_key().to_string(),
270 dataset,
271 cmd_rx,
272 msg_tx,
273 (*self.publisher_venue_map).clone(),
274 self.symbol_venue_map.clone(),
275 self.config.use_exchange_as_venue,
276 self.config.bars_timestamp_on_close,
277 self.config.reconnect_timeout_mins,
278 );
279
280 let cancellation_token = self.cancellation_token.clone();
281
282 let feed_handle = tokio::spawn(async move {
284 tokio::select! {
285 result = feed_handler.run() => {
286 if let Err(e) = result {
287 tracing::error!("Feed handler error: {e}");
288 }
289 }
290 () = cancellation_token.cancelled() => {
291 tracing::debug!("Feed handler cancelled");
292 }
293 }
294 });
295
296 let cancellation_token = self.cancellation_token.clone();
297 let data_sender = self.data_sender.clone();
298
299 let msg_handle = tokio::spawn(async move {
301 let mut msg_rx = msg_rx;
302 loop {
303 tokio::select! {
304 msg = msg_rx.recv() => {
305 match msg {
306 Some(LiveMessage::Data(data)) => {
307 tracing::debug!("Received data: {data:?}");
308 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
309 tracing::error!("Failed to send data event: {e}");
310 }
311 }
312 Some(LiveMessage::Instrument(instrument)) => {
313 tracing::debug!("Received instrument: {}", instrument.id());
314 }
316 Some(LiveMessage::Status(status)) => {
317 tracing::debug!("Received status: {status:?}");
318 }
320 Some(LiveMessage::Imbalance(imbalance)) => {
321 tracing::debug!("Received imbalance: {imbalance:?}");
322 }
324 Some(LiveMessage::Statistics(statistics)) => {
325 tracing::debug!("Received statistics: {statistics:?}");
326 }
328 Some(LiveMessage::Error(error)) => {
329 tracing::error!("Feed handler error: {error}");
330 }
332 Some(LiveMessage::Close) => {
333 tracing::info!("Feed handler closed");
334 break;
335 }
336 None => {
337 tracing::debug!("Message channel closed");
338 break;
339 }
340 }
341 }
342 () = cancellation_token.cancelled() => {
343 tracing::debug!("Message processing cancelled");
344 break;
345 }
346 }
347 }
348 });
349
350 {
351 let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
352 handles.push(feed_handle);
353 handles.push(msg_handle);
354 }
355
356 Ok(cmd_tx)
357 }
358}
359
360#[async_trait::async_trait]
361impl DataClient for DatabentoDataClient {
362 fn client_id(&self) -> ClientId {
364 self.client_id
365 }
366
367 fn venue(&self) -> Option<Venue> {
369 None
370 }
371
372 fn start(&mut self) -> anyhow::Result<()> {
378 tracing::debug!("Starting");
379 Ok(())
380 }
381
382 fn stop(&mut self) -> anyhow::Result<()> {
388 tracing::debug!("Stopping");
389
390 self.cancellation_token.cancel();
392
393 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
395 for (dataset, tx) in channels.iter() {
396 if let Err(e) = tx.send(LiveCommand::Close) {
397 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
398 }
399 }
400
401 self.is_connected.store(false, Ordering::Relaxed);
402 Ok(())
403 }
404
405 fn reset(&mut self) -> anyhow::Result<()> {
406 tracing::debug!("Resetting");
407 self.is_connected.store(false, Ordering::Relaxed);
408 Ok(())
409 }
410
411 fn dispose(&mut self) -> anyhow::Result<()> {
412 tracing::debug!("Disposing");
413 self.stop()
414 }
415
416 async fn connect(&mut self) -> anyhow::Result<()> {
417 tracing::debug!("Connecting...");
418
419 self.is_connected.store(true, Ordering::Relaxed);
422
423 tracing::info!("Connected");
424 Ok(())
425 }
426
427 async fn disconnect(&mut self) -> anyhow::Result<()> {
428 tracing::debug!("Disconnecting...");
429
430 self.cancellation_token.cancel();
432
433 {
435 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
436 for (dataset, tx) in channels.iter() {
437 if let Err(e) = tx.send(LiveCommand::Close) {
438 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
439 }
440 }
441 }
442
443 let handles = {
445 let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
446 std::mem::take(&mut *task_handles)
447 };
448
449 for handle in handles {
450 if let Err(e) = handle.await
451 && !e.is_cancelled()
452 {
453 tracing::error!("Task join error: {e}");
454 }
455 }
456
457 self.is_connected.store(false, Ordering::Relaxed);
458
459 {
460 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
461 channels.clear();
462 }
463
464 tracing::info!("Disconnected");
465 Ok(())
466 }
467
468 fn is_connected(&self) -> bool {
470 self.is_connected.load(Ordering::Relaxed)
471 }
472
473 fn is_disconnected(&self) -> bool {
474 !self.is_connected()
475 }
476
477 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
483 tracing::debug!("Subscribe quotes: {cmd:?}");
484
485 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
486 let was_new_handler = {
487 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
488 !channels.contains_key(&dataset)
489 };
490
491 self.get_or_create_feed_handler(&dataset)?;
492
493 if was_new_handler {
495 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
496 }
497
498 let symbol = instrument_id_to_symbol_string(
499 cmd.instrument_id,
500 &mut self.symbol_venue_map.write().unwrap(),
501 );
502
503 let subscription = Subscription::builder()
504 .schema(databento::dbn::Schema::Mbp1) .symbols(symbol)
506 .build();
507
508 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
509
510 Ok(())
511 }
512
513 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
519 tracing::debug!("Subscribe trades: {cmd:?}");
520
521 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
522 let was_new_handler = {
523 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
524 !channels.contains_key(&dataset)
525 };
526
527 self.get_or_create_feed_handler(&dataset)?;
528
529 if was_new_handler {
531 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
532 }
533
534 let symbol = instrument_id_to_symbol_string(
535 cmd.instrument_id,
536 &mut self.symbol_venue_map.write().unwrap(),
537 );
538
539 let subscription = Subscription::builder()
540 .schema(databento::dbn::Schema::Trades)
541 .symbols(symbol)
542 .build();
543
544 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
545
546 Ok(())
547 }
548
549 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
555 tracing::debug!("Subscribe book deltas: {cmd:?}");
556
557 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
558 let was_new_handler = {
559 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
560 !channels.contains_key(&dataset)
561 };
562
563 self.get_or_create_feed_handler(&dataset)?;
564
565 if was_new_handler {
567 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
568 }
569
570 let symbol = instrument_id_to_symbol_string(
571 cmd.instrument_id,
572 &mut self.symbol_venue_map.write().unwrap(),
573 );
574
575 let subscription = Subscription::builder()
576 .schema(databento::dbn::Schema::Mbo) .symbols(symbol)
578 .build();
579
580 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
581
582 Ok(())
583 }
584
585 fn subscribe_instrument_status(
591 &mut self,
592 cmd: &SubscribeInstrumentStatus,
593 ) -> anyhow::Result<()> {
594 tracing::debug!("Subscribe instrument status: {cmd:?}");
595
596 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
597 let was_new_handler = {
598 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
599 !channels.contains_key(&dataset)
600 };
601
602 self.get_or_create_feed_handler(&dataset)?;
603
604 if was_new_handler {
606 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
607 }
608
609 let symbol = instrument_id_to_symbol_string(
610 cmd.instrument_id,
611 &mut self.symbol_venue_map.write().unwrap(),
612 );
613
614 let subscription = Subscription::builder()
615 .schema(databento::dbn::Schema::Status)
616 .symbols(symbol)
617 .build();
618
619 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
620
621 Ok(())
622 }
623
624 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
626 tracing::debug!("Unsubscribe quotes: {cmd:?}");
627
628 tracing::warn!(
632 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
633 cmd.instrument_id
634 );
635
636 Ok(())
637 }
638
639 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
640 tracing::debug!("Unsubscribe trades: {cmd:?}");
641
642 tracing::warn!(
646 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
647 cmd.instrument_id
648 );
649
650 Ok(())
651 }
652
653 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
654 tracing::debug!("Unsubscribe book deltas: {cmd:?}");
655
656 tracing::warn!(
660 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
661 cmd.instrument_id
662 );
663
664 Ok(())
665 }
666
667 fn unsubscribe_instrument_status(
668 &mut self,
669 cmd: &UnsubscribeInstrumentStatus,
670 ) -> anyhow::Result<()> {
671 tracing::debug!("Unsubscribe instrument status: {cmd:?}");
672
673 tracing::warn!(
677 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
678 cmd.instrument_id
679 );
680
681 Ok(())
682 }
683
684 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
686 tracing::debug!("Request instruments: {request:?}");
687
688 let historical_client = self.historical.clone();
689 let request = request.clone();
690
691 tokio::spawn(async move {
692 let symbols = vec!["ALL_SYMBOLS".to_string()]; let params = RangeQueryParams {
697 dataset: "GLBX.MDP3".to_string(), symbols,
699 start: request
700 .start
701 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
702 .into(),
703 end: request
704 .end
705 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
706 .map(Into::into),
707 limit: None,
708 price_precision: None,
709 };
710
711 match historical_client.get_range_instruments(params).await {
712 Ok(instruments) => {
713 tracing::info!("Retrieved {} instruments", instruments.len());
714 }
716 Err(e) => {
717 tracing::error!("Failed to request instruments: {e}");
718 }
719 }
720 });
721
722 Ok(())
723 }
724
725 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
726 tracing::debug!("Request quotes: {request:?}");
727
728 let historical_client = self.historical.clone();
729 let request = request.clone();
730
731 tokio::spawn(async move {
732 let symbols = vec![instrument_id_to_symbol_string(
733 request.instrument_id,
734 &mut AHashMap::new(), )];
736
737 let params = RangeQueryParams {
738 dataset: "GLBX.MDP3".to_string(), symbols,
740 start: request
741 .start
742 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
743 .into(),
744 end: request
745 .end
746 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
747 .map(Into::into),
748 limit: request.limit.map(|l| l.get() as u64),
749 price_precision: None,
750 };
751
752 match historical_client.get_range_quotes(params, None).await {
753 Ok(quotes) => {
754 tracing::info!("Retrieved {} quotes", quotes.len());
755 }
757 Err(e) => {
758 tracing::error!("Failed to request quotes: {e}");
759 }
760 }
761 });
762
763 Ok(())
764 }
765
766 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
767 tracing::debug!("Request trades: {request:?}");
768
769 let historical_client = self.historical.clone();
770 let request = request.clone();
771
772 tokio::spawn(async move {
773 let symbols = vec![instrument_id_to_symbol_string(
774 request.instrument_id,
775 &mut AHashMap::new(), )];
777
778 let params = RangeQueryParams {
779 dataset: "GLBX.MDP3".to_string(), symbols,
781 start: request
782 .start
783 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
784 .into(),
785 end: request
786 .end
787 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
788 .map(Into::into),
789 limit: request.limit.map(|l| l.get() as u64),
790 price_precision: None,
791 };
792
793 match historical_client.get_range_trades(params).await {
794 Ok(trades) => {
795 tracing::info!("Retrieved {} trades", trades.len());
796 }
798 Err(e) => {
799 tracing::error!("Failed to request trades: {e}");
800 }
801 }
802 });
803
804 Ok(())
805 }
806
807 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
808 tracing::debug!("Request bars: {request:?}");
809
810 let historical_client = self.historical.clone();
811 let request = request.clone();
812
813 tokio::spawn(async move {
814 let symbols = vec![instrument_id_to_symbol_string(
815 request.bar_type.instrument_id(),
816 &mut AHashMap::new(), )];
818
819 let params = RangeQueryParams {
820 dataset: "GLBX.MDP3".to_string(), symbols,
822 start: request
823 .start
824 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
825 .into(),
826 end: request
827 .end
828 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
829 .map(Into::into),
830 limit: request.limit.map(|l| l.get() as u64),
831 price_precision: None,
832 };
833
834 let aggregation = match request.bar_type.spec().aggregation {
836 BarAggregation::Second => BarAggregation::Second,
837 BarAggregation::Minute => BarAggregation::Minute,
838 BarAggregation::Hour => BarAggregation::Hour,
839 BarAggregation::Day => BarAggregation::Day,
840 _ => {
841 tracing::error!(
842 "Unsupported bar aggregation: {:?}",
843 request.bar_type.spec().aggregation
844 );
845 return;
846 }
847 };
848
849 match historical_client
850 .get_range_bars(params, aggregation, true)
851 .await
852 {
853 Ok(bars) => {
854 tracing::info!("Retrieved {} bars", bars.len());
855 }
857 Err(e) => {
858 tracing::error!("Failed to request bars: {e}");
859 }
860 }
861 });
862
863 Ok(())
864 }
865}