1use std::{
19 path::PathBuf,
20 sync::{
21 Arc, Mutex, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use databento::live::Subscription;
28use indexmap::IndexMap;
29use nautilus_common::{
30 messages::{
31 DataEvent,
32 data::{
33 RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
34 SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
35 UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
36 },
37 },
38 runner::get_data_event_sender,
39};
40use nautilus_core::time::AtomicTime;
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43 enums::BarAggregation,
44 identifiers::{ClientId, Symbol, Venue},
45 instruments::Instrument,
46};
47use tokio::task::JoinHandle;
48use tokio_util::sync::CancellationToken;
49
50use crate::{
51 historical::{DatabentoHistoricalClient, RangeQueryParams},
52 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
53 loader::DatabentoDataLoader,
54 symbology::instrument_id_to_symbol_string,
55 types::PublisherId,
56};
57
58#[derive(Debug, Clone)]
60pub struct DatabentoDataClientConfig {
61 pub api_key: String,
63 pub publishers_filepath: PathBuf,
65 pub use_exchange_as_venue: bool,
67 pub bars_timestamp_on_close: bool,
69}
70
71impl DatabentoDataClientConfig {
72 #[must_use]
74 pub const fn new(
75 api_key: String,
76 publishers_filepath: PathBuf,
77 use_exchange_as_venue: bool,
78 bars_timestamp_on_close: bool,
79 ) -> Self {
80 Self {
81 api_key,
82 publishers_filepath,
83 use_exchange_as_venue,
84 bars_timestamp_on_close,
85 }
86 }
87}
88
89#[cfg_attr(feature = "python", pyo3::pyclass)]
95#[derive(Debug)]
96pub struct DatabentoDataClient {
97 client_id: ClientId,
99 config: DatabentoDataClientConfig,
101 is_connected: AtomicBool,
103 historical: DatabentoHistoricalClient,
105 loader: DatabentoDataLoader,
107 cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
109 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
111 cancellation_token: CancellationToken,
113 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
115 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
117 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
119}
120
121impl DatabentoDataClient {
122 pub fn new(
128 client_id: ClientId,
129 config: DatabentoDataClientConfig,
130 clock: &'static AtomicTime,
131 ) -> anyhow::Result<Self> {
132 let historical = DatabentoHistoricalClient::new(
133 config.api_key.clone(),
134 config.publishers_filepath.clone(),
135 clock,
136 config.use_exchange_as_venue,
137 )?;
138
139 let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
141
142 let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
144 let publishers_vec: Vec<crate::types::DatabentoPublisher> =
145 serde_json::from_str(&file_content)?;
146
147 let publisher_venue_map = publishers_vec
148 .into_iter()
149 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
150 .collect::<IndexMap<u16, Venue>>();
151
152 let data_sender = get_data_event_sender();
153
154 Ok(Self {
155 client_id,
156 config,
157 is_connected: AtomicBool::new(false),
158 historical,
159 loader,
160 cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
161 task_handles: Arc::new(Mutex::new(Vec::new())),
162 cancellation_token: CancellationToken::new(),
163 publisher_venue_map: Arc::new(publisher_venue_map),
164 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
165 data_sender,
166 })
167 }
168
169 fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
171 self.loader
172 .get_dataset_for_venue(&venue)
173 .map(ToString::to_string)
174 .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
175 }
176
177 fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
183 let mut channels = self.cmd_channels.lock().unwrap();
184
185 if !channels.contains_key(dataset) {
186 tracing::info!("Creating new feed handler for dataset: {dataset}");
187 let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
188 channels.insert(dataset.to_string(), cmd_tx);
189
190 tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
191 }
192
193 Ok(())
194 }
195
196 fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
202 let channels = self.cmd_channels.lock().unwrap();
203 if let Some(tx) = channels.get(dataset) {
204 tx.send(cmd)
205 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
206 } else {
207 anyhow::bail!("No feed handler found for dataset: {dataset}");
208 }
209 Ok(())
210 }
211
212 fn initialize_live_feed(
218 &self,
219 dataset: String,
220 ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
221 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
222 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
223
224 let mut feed_handler = DatabentoFeedHandler::new(
225 self.config.api_key.clone(),
226 dataset,
227 cmd_rx,
228 msg_tx,
229 (*self.publisher_venue_map).clone(),
230 self.symbol_venue_map.clone(),
231 self.config.use_exchange_as_venue,
232 self.config.bars_timestamp_on_close,
233 );
234
235 let cancellation_token = self.cancellation_token.clone();
236
237 let feed_handle = tokio::spawn(async move {
239 tokio::select! {
240 result = feed_handler.run() => {
241 if let Err(e) = result {
242 tracing::error!("Feed handler error: {e}");
243 }
244 }
245 () = cancellation_token.cancelled() => {
246 tracing::info!("Feed handler cancelled");
247 }
248 }
249 });
250
251 let cancellation_token = self.cancellation_token.clone();
252 let data_sender = self.data_sender.clone();
253
254 let msg_handle = tokio::spawn(async move {
256 let mut msg_rx = msg_rx;
257 loop {
258 tokio::select! {
259 msg = msg_rx.recv() => {
260 match msg {
261 Some(LiveMessage::Data(data)) => {
262 tracing::debug!("Received data: {data:?}");
263 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
264 tracing::error!("Failed to send data event: {e}");
265 }
266 }
267 Some(LiveMessage::Instrument(instrument)) => {
268 tracing::debug!("Received instrument: {}", instrument.id());
269 }
271 Some(LiveMessage::Status(status)) => {
272 tracing::debug!("Received status: {status:?}");
273 }
275 Some(LiveMessage::Imbalance(imbalance)) => {
276 tracing::debug!("Received imbalance: {imbalance:?}");
277 }
279 Some(LiveMessage::Statistics(statistics)) => {
280 tracing::debug!("Received statistics: {statistics:?}");
281 }
283 Some(LiveMessage::Error(error)) => {
284 tracing::error!("Feed handler error: {error}");
285 }
287 Some(LiveMessage::Close) => {
288 tracing::info!("Feed handler closed");
289 break;
290 }
291 None => {
292 tracing::debug!("Message channel closed");
293 break;
294 }
295 }
296 }
297 () = cancellation_token.cancelled() => {
298 tracing::info!("Message processing cancelled");
299 break;
300 }
301 }
302 }
303 });
304
305 {
306 let mut handles = self.task_handles.lock().unwrap();
307 handles.push(feed_handle);
308 handles.push(msg_handle);
309 }
310
311 Ok(cmd_tx)
312 }
313}
314
315#[async_trait::async_trait]
316impl DataClient for DatabentoDataClient {
317 fn client_id(&self) -> ClientId {
318 self.client_id
319 }
320
321 fn venue(&self) -> Option<Venue> {
322 None
323 }
324
325 fn start(&mut self) -> anyhow::Result<()> {
326 tracing::debug!("Starting Databento data client");
327 Ok(())
328 }
329
330 fn stop(&mut self) -> anyhow::Result<()> {
331 tracing::debug!("Stopping Databento data client");
332
333 self.cancellation_token.cancel();
335
336 let channels = self.cmd_channels.lock().unwrap();
338 for (dataset, tx) in channels.iter() {
339 if let Err(e) = tx.send(LiveCommand::Close) {
340 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
341 }
342 }
343
344 self.is_connected.store(false, Ordering::Relaxed);
345 Ok(())
346 }
347
348 fn reset(&mut self) -> anyhow::Result<()> {
349 tracing::debug!("Resetting Databento data client");
350 self.is_connected.store(false, Ordering::Relaxed);
351 Ok(())
352 }
353
354 fn dispose(&mut self) -> anyhow::Result<()> {
355 tracing::debug!("Disposing Databento data client");
356 self.stop()
357 }
358
359 async fn connect(&mut self) -> anyhow::Result<()> {
360 tracing::debug!("Connecting Databento data client");
361
362 self.is_connected.store(true, Ordering::Relaxed);
365
366 tracing::info!("Databento data client connected");
367 Ok(())
368 }
369
370 async fn disconnect(&mut self) -> anyhow::Result<()> {
371 tracing::debug!("Disconnecting Databento data client");
372
373 self.cancellation_token.cancel();
375
376 {
378 let channels = self.cmd_channels.lock().unwrap();
379 for (dataset, tx) in channels.iter() {
380 if let Err(e) = tx.send(LiveCommand::Close) {
381 tracing::error!("Failed to send close command to dataset {dataset}: {e}");
382 }
383 }
384 }
385
386 let handles = {
388 let mut task_handles = self.task_handles.lock().unwrap();
389 std::mem::take(&mut *task_handles)
390 };
391
392 for handle in handles {
393 if let Err(e) = handle.await
394 && !e.is_cancelled()
395 {
396 tracing::error!("Task join error: {e}");
397 }
398 }
399
400 self.is_connected.store(false, Ordering::Relaxed);
401
402 {
403 let mut channels = self.cmd_channels.lock().unwrap();
404 channels.clear();
405 }
406
407 tracing::info!("Databento data client disconnected");
408 Ok(())
409 }
410
411 fn is_connected(&self) -> bool {
412 self.is_connected.load(Ordering::Relaxed)
413 }
414
415 fn is_disconnected(&self) -> bool {
416 !self.is_connected()
417 }
418
419 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
421 tracing::debug!("Subscribe quotes: {cmd:?}");
422
423 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
424 let was_new_handler = {
425 let channels = self.cmd_channels.lock().unwrap();
426 !channels.contains_key(&dataset)
427 };
428
429 self.get_or_create_feed_handler(&dataset)?;
430
431 if was_new_handler {
433 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
434 }
435
436 let symbol = instrument_id_to_symbol_string(
437 cmd.instrument_id,
438 &mut self.symbol_venue_map.write().unwrap(),
439 );
440
441 let subscription = Subscription::builder()
442 .schema(databento::dbn::Schema::Mbp1) .symbols(symbol)
444 .build();
445
446 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
447
448 Ok(())
449 }
450
451 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
452 tracing::debug!("Subscribe trades: {cmd:?}");
453
454 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
455 let was_new_handler = {
456 let channels = self.cmd_channels.lock().unwrap();
457 !channels.contains_key(&dataset)
458 };
459
460 self.get_or_create_feed_handler(&dataset)?;
461
462 if was_new_handler {
464 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
465 }
466
467 let symbol = instrument_id_to_symbol_string(
468 cmd.instrument_id,
469 &mut self.symbol_venue_map.write().unwrap(),
470 );
471
472 let subscription = Subscription::builder()
473 .schema(databento::dbn::Schema::Trades)
474 .symbols(symbol)
475 .build();
476
477 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
478
479 Ok(())
480 }
481
482 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
483 tracing::debug!("Subscribe book deltas: {cmd:?}");
484
485 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
486 let was_new_handler = {
487 let channels = self.cmd_channels.lock().unwrap();
488 !channels.contains_key(&dataset)
489 };
490
491 self.get_or_create_feed_handler(&dataset)?;
492
493 if was_new_handler {
495 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
496 }
497
498 let symbol = instrument_id_to_symbol_string(
499 cmd.instrument_id,
500 &mut self.symbol_venue_map.write().unwrap(),
501 );
502
503 let subscription = Subscription::builder()
504 .schema(databento::dbn::Schema::Mbo) .symbols(symbol)
506 .build();
507
508 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
509
510 Ok(())
511 }
512
513 fn subscribe_instrument_status(
514 &mut self,
515 cmd: &SubscribeInstrumentStatus,
516 ) -> anyhow::Result<()> {
517 tracing::debug!("Subscribe instrument status: {cmd:?}");
518
519 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
520 let was_new_handler = {
521 let channels = self.cmd_channels.lock().unwrap();
522 !channels.contains_key(&dataset)
523 };
524
525 self.get_or_create_feed_handler(&dataset)?;
526
527 if was_new_handler {
529 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
530 }
531
532 let symbol = instrument_id_to_symbol_string(
533 cmd.instrument_id,
534 &mut self.symbol_venue_map.write().unwrap(),
535 );
536
537 let subscription = Subscription::builder()
538 .schema(databento::dbn::Schema::Status)
539 .symbols(symbol)
540 .build();
541
542 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
543
544 Ok(())
545 }
546
547 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
549 tracing::debug!("Unsubscribe quotes: {cmd:?}");
550
551 tracing::warn!(
555 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
556 cmd.instrument_id
557 );
558
559 Ok(())
560 }
561
562 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
563 tracing::debug!("Unsubscribe trades: {cmd:?}");
564
565 tracing::warn!(
569 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
570 cmd.instrument_id
571 );
572
573 Ok(())
574 }
575
576 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
577 tracing::debug!("Unsubscribe book deltas: {cmd:?}");
578
579 tracing::warn!(
583 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
584 cmd.instrument_id
585 );
586
587 Ok(())
588 }
589
590 fn unsubscribe_instrument_status(
591 &mut self,
592 cmd: &UnsubscribeInstrumentStatus,
593 ) -> anyhow::Result<()> {
594 tracing::debug!("Unsubscribe instrument status: {cmd:?}");
595
596 tracing::warn!(
600 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
601 cmd.instrument_id
602 );
603
604 Ok(())
605 }
606
607 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
609 tracing::debug!("Request instruments: {request:?}");
610
611 let historical_client = self.historical.clone();
612 let request = request.clone();
613
614 tokio::spawn(async move {
615 let symbols = vec!["ALL_SYMBOLS".to_string()]; let params = RangeQueryParams {
620 dataset: "GLBX.MDP3".to_string(), symbols,
622 start: request
623 .start
624 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
625 .into(),
626 end: request
627 .end
628 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
629 .map(Into::into),
630 limit: None,
631 price_precision: None,
632 };
633
634 match historical_client.get_range_instruments(params).await {
635 Ok(instruments) => {
636 tracing::info!("Retrieved {} instruments", instruments.len());
637 }
639 Err(e) => {
640 tracing::error!("Failed to request instruments: {e}");
641 }
642 }
643 });
644
645 Ok(())
646 }
647
648 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
649 tracing::debug!("Request quotes: {request:?}");
650
651 let historical_client = self.historical.clone();
652 let request = request.clone();
653
654 tokio::spawn(async move {
655 let symbols = vec![instrument_id_to_symbol_string(
656 request.instrument_id,
657 &mut AHashMap::new(), )];
659
660 let params = RangeQueryParams {
661 dataset: "GLBX.MDP3".to_string(), symbols,
663 start: request
664 .start
665 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
666 .into(),
667 end: request
668 .end
669 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
670 .map(Into::into),
671 limit: request.limit.map(|l| l.get() as u64),
672 price_precision: None,
673 };
674
675 match historical_client.get_range_quotes(params, None).await {
676 Ok(quotes) => {
677 tracing::info!("Retrieved {} quotes", quotes.len());
678 }
680 Err(e) => {
681 tracing::error!("Failed to request quotes: {e}");
682 }
683 }
684 });
685
686 Ok(())
687 }
688
689 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
690 tracing::debug!("Request trades: {request:?}");
691
692 let historical_client = self.historical.clone();
693 let request = request.clone();
694
695 tokio::spawn(async move {
696 let symbols = vec![instrument_id_to_symbol_string(
697 request.instrument_id,
698 &mut AHashMap::new(), )];
700
701 let params = RangeQueryParams {
702 dataset: "GLBX.MDP3".to_string(), symbols,
704 start: request
705 .start
706 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
707 .into(),
708 end: request
709 .end
710 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
711 .map(Into::into),
712 limit: request.limit.map(|l| l.get() as u64),
713 price_precision: None,
714 };
715
716 match historical_client.get_range_trades(params).await {
717 Ok(trades) => {
718 tracing::info!("Retrieved {} trades", trades.len());
719 }
721 Err(e) => {
722 tracing::error!("Failed to request trades: {e}");
723 }
724 }
725 });
726
727 Ok(())
728 }
729
730 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
731 tracing::debug!("Request bars: {request:?}");
732
733 let historical_client = self.historical.clone();
734 let request = request.clone();
735
736 tokio::spawn(async move {
737 let symbols = vec![instrument_id_to_symbol_string(
738 request.bar_type.instrument_id(),
739 &mut AHashMap::new(), )];
741
742 let params = RangeQueryParams {
743 dataset: "GLBX.MDP3".to_string(), symbols,
745 start: request
746 .start
747 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
748 .into(),
749 end: request
750 .end
751 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
752 .map(Into::into),
753 limit: request.limit.map(|l| l.get() as u64),
754 price_precision: None,
755 };
756
757 let aggregation = match request.bar_type.spec().aggregation {
759 BarAggregation::Second => BarAggregation::Second,
760 BarAggregation::Minute => BarAggregation::Minute,
761 BarAggregation::Hour => BarAggregation::Hour,
762 BarAggregation::Day => BarAggregation::Day,
763 _ => {
764 tracing::error!(
765 "Unsupported bar aggregation: {:?}",
766 request.bar_type.spec().aggregation
767 );
768 return;
769 }
770 };
771
772 match historical_client
773 .get_range_bars(params, aggregation, true)
774 .await
775 {
776 Ok(bars) => {
777 tracing::info!("Retrieved {} bars", bars.len());
778 }
780 Err(e) => {
781 tracing::error!("Failed to request bars: {e}");
782 }
783 }
784 });
785
786 Ok(())
787 }
788}