1use std::{
22 fmt::Debug,
23 path::PathBuf,
24 sync::{
25 Arc, Mutex, RwLock,
26 atomic::{AtomicBool, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34 clients::DataClient,
35 live::{runner::get_data_event_sender, runtime::get_runtime},
36 messages::{
37 DataEvent,
38 data::{
39 RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
40 SubscribeInstrument, SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades,
41 UnsubscribeBookDeltas, UnsubscribeInstrumentStatus, UnsubscribeQuotes,
42 UnsubscribeTrades,
43 },
44 },
45};
46use nautilus_core::{MUTEX_POISONED, time::AtomicTime};
47use nautilus_model::{
48 enums::BarAggregation,
49 identifiers::{ClientId, Symbol, Venue},
50 instruments::Instrument,
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56 common::Credential,
57 historical::{DatabentoHistoricalClient, RangeQueryParams},
58 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
59 loader::DatabentoDataLoader,
60 symbology::instrument_id_to_symbol_string,
61 types::PublisherId,
62};
63
64#[derive(Clone)]
66pub struct DatabentoDataClientConfig {
67 credential: Credential,
69 pub publishers_filepath: PathBuf,
71 pub use_exchange_as_venue: bool,
73 pub bars_timestamp_on_close: bool,
75 pub reconnect_timeout_mins: Option<u64>,
77 pub http_proxy_url: Option<String>,
79 pub ws_proxy_url: Option<String>,
81}
82
83impl Debug for DatabentoDataClientConfig {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct(stringify!(DatabentoDataClientConfig))
86 .field("credential", &"<redacted>")
87 .field("publishers_filepath", &self.publishers_filepath)
88 .field("use_exchange_as_venue", &self.use_exchange_as_venue)
89 .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
90 .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
91 .field("http_proxy_url", &self.http_proxy_url)
92 .field("ws_proxy_url", &self.ws_proxy_url)
93 .finish()
94 }
95}
96
97impl DatabentoDataClientConfig {
98 #[must_use]
100 pub fn new(
101 api_key: impl Into<String>,
102 publishers_filepath: PathBuf,
103 use_exchange_as_venue: bool,
104 bars_timestamp_on_close: bool,
105 ) -> Self {
106 Self {
107 credential: Credential::new(api_key),
108 publishers_filepath,
109 use_exchange_as_venue,
110 bars_timestamp_on_close,
111 reconnect_timeout_mins: Some(10), http_proxy_url: None,
113 ws_proxy_url: None,
114 }
115 }
116
117 #[must_use]
119 pub fn api_key(&self) -> &str {
120 self.credential.api_key()
121 }
122
123 #[must_use]
125 pub fn api_key_masked(&self) -> String {
126 self.credential.api_key_masked()
127 }
128}
129
130#[cfg_attr(feature = "python", pyo3::pyclass)]
136#[derive(Debug)]
137pub struct DatabentoDataClient {
138 client_id: ClientId,
140 config: DatabentoDataClientConfig,
142 is_connected: AtomicBool,
144 historical: DatabentoHistoricalClient,
146 loader: DatabentoDataLoader,
148 cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
150 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
152 cancellation_token: CancellationToken,
154 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
156 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
158 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
160}
161
162impl DatabentoDataClient {
163 pub fn new(
169 client_id: ClientId,
170 config: DatabentoDataClientConfig,
171 clock: &'static AtomicTime,
172 ) -> anyhow::Result<Self> {
173 let historical = DatabentoHistoricalClient::new(
174 config.api_key().to_string(),
175 config.publishers_filepath.clone(),
176 clock,
177 config.use_exchange_as_venue,
178 )?;
179
180 let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
182
183 let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
185 let publishers_vec: Vec<crate::types::DatabentoPublisher> =
186 serde_json::from_str(&file_content)?;
187
188 let publisher_venue_map = publishers_vec
189 .into_iter()
190 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
191 .collect::<IndexMap<u16, Venue>>();
192
193 let data_sender = get_data_event_sender();
194
195 Ok(Self {
196 client_id,
197 config,
198 is_connected: AtomicBool::new(false),
199 historical,
200 loader,
201 cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
202 task_handles: Arc::new(Mutex::new(Vec::new())),
203 cancellation_token: CancellationToken::new(),
204 publisher_venue_map: Arc::new(publisher_venue_map),
205 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
206 data_sender,
207 })
208 }
209
210 fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
216 self.loader
217 .get_dataset_for_venue(&venue)
218 .map(ToString::to_string)
219 .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
220 }
221
222 fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
228 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
229
230 if !channels.contains_key(dataset) {
231 log::info!("Creating new feed handler for dataset: {dataset}");
232 let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
233 channels.insert(dataset.to_string(), cmd_tx);
234
235 log::debug!("Feed handler created for dataset: {dataset}, channel stored");
236 }
237
238 Ok(())
239 }
240
241 fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
247 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
248 if let Some(tx) = channels.get(dataset) {
249 tx.send(cmd)
250 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
251 } else {
252 anyhow::bail!("No feed handler found for dataset: {dataset}");
253 }
254 Ok(())
255 }
256
257 fn initialize_live_feed(
263 &self,
264 dataset: String,
265 ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
266 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
267 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
268
269 let mut feed_handler = DatabentoFeedHandler::new(
270 self.config.api_key().to_string(),
271 dataset,
272 cmd_rx,
273 msg_tx,
274 (*self.publisher_venue_map).clone(),
275 self.symbol_venue_map.clone(),
276 self.config.use_exchange_as_venue,
277 self.config.bars_timestamp_on_close,
278 self.config.reconnect_timeout_mins,
279 );
280
281 let cancellation_token = self.cancellation_token.clone();
282
283 let feed_handle = get_runtime().spawn(async move {
285 tokio::select! {
286 result = feed_handler.run() => {
287 if let Err(e) = result {
288 log::error!("Feed handler error: {e}");
289 }
290 }
291 () = cancellation_token.cancelled() => {
292 log::debug!("Feed handler cancelled");
293 }
294 }
295 });
296
297 let cancellation_token = self.cancellation_token.clone();
298 let data_sender = self.data_sender.clone();
299
300 let msg_handle = get_runtime().spawn(async move {
302 let mut msg_rx = msg_rx;
303 loop {
304 tokio::select! {
305 msg = msg_rx.recv() => {
306 match msg {
307 Some(LiveMessage::Data(data)) => {
308 log::debug!("Received data: {data:?}");
309 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
310 log::error!("Failed to send data event: {e}");
311 }
312 }
313 Some(LiveMessage::Instrument(instrument)) => {
314 log::info!("Received instrument definition: {}", instrument.id());
315 if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
316 log::error!("Failed to send instrument: {e}");
317 }
318 }
319 Some(LiveMessage::Status(status)) => {
320 log::debug!("Received status: {status:?}");
321 }
323 Some(LiveMessage::Imbalance(imbalance)) => {
324 log::debug!("Received imbalance: {imbalance:?}");
325 }
327 Some(LiveMessage::Statistics(statistics)) => {
328 log::debug!("Received statistics: {statistics:?}");
329 }
331 Some(LiveMessage::SubscriptionAck(ack)) => {
332 log::debug!("Received subscription ack: {}", ack.message);
333 }
334 Some(LiveMessage::Error(error)) => {
335 log::error!("Feed handler error: {error}");
336 }
338 Some(LiveMessage::Close) => {
339 log::info!("Feed handler closed");
340 break;
341 }
342 None => {
343 log::debug!("Message channel closed");
344 break;
345 }
346 }
347 }
348 () = cancellation_token.cancelled() => {
349 log::debug!("Message processing cancelled");
350 break;
351 }
352 }
353 }
354 });
355
356 {
357 let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
358 handles.push(feed_handle);
359 handles.push(msg_handle);
360 }
361
362 Ok(cmd_tx)
363 }
364}
365
366#[async_trait::async_trait(?Send)]
367impl DataClient for DatabentoDataClient {
368 fn client_id(&self) -> ClientId {
370 self.client_id
371 }
372
373 fn venue(&self) -> Option<Venue> {
375 None
376 }
377
378 fn start(&mut self) -> anyhow::Result<()> {
384 log::debug!("Starting");
385 Ok(())
386 }
387
388 fn stop(&mut self) -> anyhow::Result<()> {
394 log::debug!("Stopping");
395
396 self.cancellation_token.cancel();
398
399 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
401 for (dataset, tx) in channels.iter() {
402 if let Err(e) = tx.send(LiveCommand::Close) {
403 log::error!("Failed to send close command to dataset {dataset}: {e}");
404 }
405 }
406
407 self.is_connected.store(false, Ordering::Relaxed);
408 Ok(())
409 }
410
411 fn reset(&mut self) -> anyhow::Result<()> {
412 log::debug!("Resetting");
413 self.is_connected.store(false, Ordering::Relaxed);
414 Ok(())
415 }
416
417 fn dispose(&mut self) -> anyhow::Result<()> {
418 log::debug!("Disposing");
419 self.stop()
420 }
421
422 async fn connect(&mut self) -> anyhow::Result<()> {
423 log::debug!("Connecting...");
424
425 self.is_connected.store(true, Ordering::Relaxed);
428
429 log::info!("Connected");
430 Ok(())
431 }
432
433 async fn disconnect(&mut self) -> anyhow::Result<()> {
434 log::debug!("Disconnecting...");
435
436 self.cancellation_token.cancel();
438
439 {
441 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
442 for (dataset, tx) in channels.iter() {
443 if let Err(e) = tx.send(LiveCommand::Close) {
444 log::error!("Failed to send close command to dataset {dataset}: {e}");
445 }
446 }
447 }
448
449 let handles = {
451 let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
452 std::mem::take(&mut *task_handles)
453 };
454
455 for handle in handles {
456 if let Err(e) = handle.await
457 && !e.is_cancelled()
458 {
459 log::error!("Task join error: {e}");
460 }
461 }
462
463 self.is_connected.store(false, Ordering::Relaxed);
464
465 {
466 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
467 channels.clear();
468 }
469
470 log::info!("Disconnected");
471 Ok(())
472 }
473
474 fn is_connected(&self) -> bool {
476 self.is_connected.load(Ordering::Relaxed)
477 }
478
479 fn is_disconnected(&self) -> bool {
480 !self.is_connected()
481 }
482
483 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
489 log::debug!("Subscribe instrument: {cmd:?}");
490
491 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
492 let was_new_handler = {
493 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
494 !channels.contains_key(&dataset)
495 };
496
497 self.get_or_create_feed_handler(&dataset)?;
498
499 if was_new_handler {
501 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
502 }
503
504 let symbol = instrument_id_to_symbol_string(
505 cmd.instrument_id,
506 &mut self.symbol_venue_map.write().unwrap(),
507 );
508
509 let subscription = Subscription::builder()
510 .schema(databento::dbn::Schema::Definition)
511 .symbols(symbol)
512 .build();
513
514 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
515
516 Ok(())
517 }
518
519 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
525 log::debug!("Subscribe quotes: {cmd:?}");
526
527 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
528 let was_new_handler = {
529 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
530 !channels.contains_key(&dataset)
531 };
532
533 self.get_or_create_feed_handler(&dataset)?;
534
535 if was_new_handler {
537 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
538 }
539
540 let symbol = instrument_id_to_symbol_string(
541 cmd.instrument_id,
542 &mut self.symbol_venue_map.write().unwrap(),
543 );
544
545 let subscription = Subscription::builder()
546 .schema(databento::dbn::Schema::Mbp1) .symbols(symbol)
548 .build();
549
550 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
551
552 Ok(())
553 }
554
555 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
561 log::debug!("Subscribe trades: {cmd:?}");
562
563 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
564 let was_new_handler = {
565 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
566 !channels.contains_key(&dataset)
567 };
568
569 self.get_or_create_feed_handler(&dataset)?;
570
571 if was_new_handler {
573 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
574 }
575
576 let symbol = instrument_id_to_symbol_string(
577 cmd.instrument_id,
578 &mut self.symbol_venue_map.write().unwrap(),
579 );
580
581 let subscription = Subscription::builder()
582 .schema(databento::dbn::Schema::Trades)
583 .symbols(symbol)
584 .build();
585
586 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
587
588 Ok(())
589 }
590
591 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
597 log::debug!("Subscribe book deltas: {cmd:?}");
598
599 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
600 let was_new_handler = {
601 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
602 !channels.contains_key(&dataset)
603 };
604
605 self.get_or_create_feed_handler(&dataset)?;
606
607 if was_new_handler {
609 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
610 }
611
612 let symbol = instrument_id_to_symbol_string(
613 cmd.instrument_id,
614 &mut self.symbol_venue_map.write().unwrap(),
615 );
616
617 let subscription = Subscription::builder()
618 .schema(databento::dbn::Schema::Mbo) .symbols(symbol)
620 .build();
621
622 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
623
624 Ok(())
625 }
626
627 fn subscribe_instrument_status(
633 &mut self,
634 cmd: &SubscribeInstrumentStatus,
635 ) -> anyhow::Result<()> {
636 log::debug!("Subscribe instrument status: {cmd:?}");
637
638 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
639 let was_new_handler = {
640 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
641 !channels.contains_key(&dataset)
642 };
643
644 self.get_or_create_feed_handler(&dataset)?;
645
646 if was_new_handler {
648 self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
649 }
650
651 let symbol = instrument_id_to_symbol_string(
652 cmd.instrument_id,
653 &mut self.symbol_venue_map.write().unwrap(),
654 );
655
656 let subscription = Subscription::builder()
657 .schema(databento::dbn::Schema::Status)
658 .symbols(symbol)
659 .build();
660
661 self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
662
663 Ok(())
664 }
665
666 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
668 log::debug!("Unsubscribe quotes: {cmd:?}");
669
670 log::warn!(
674 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
675 cmd.instrument_id
676 );
677
678 Ok(())
679 }
680
681 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
682 log::debug!("Unsubscribe trades: {cmd:?}");
683
684 log::warn!(
688 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
689 cmd.instrument_id
690 );
691
692 Ok(())
693 }
694
695 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
696 log::debug!("Unsubscribe book deltas: {cmd:?}");
697
698 log::warn!(
702 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
703 cmd.instrument_id
704 );
705
706 Ok(())
707 }
708
709 fn unsubscribe_instrument_status(
710 &mut self,
711 cmd: &UnsubscribeInstrumentStatus,
712 ) -> anyhow::Result<()> {
713 log::debug!("Unsubscribe instrument status: {cmd:?}");
714
715 log::warn!(
719 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
720 cmd.instrument_id
721 );
722
723 Ok(())
724 }
725
726 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
727 log::debug!("Request instruments: {request:?}");
728
729 let historical_client = self.historical.clone();
730 let data_sender = self.data_sender.clone();
731
732 get_runtime().spawn(async move {
733 let symbols = vec!["ALL_SYMBOLS".to_string()]; let params = RangeQueryParams {
736 dataset: "GLBX.MDP3".to_string(), symbols,
738 start: request
739 .start
740 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
741 .into(),
742 end: request
743 .end
744 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
745 .map(Into::into),
746 limit: None,
747 price_precision: None,
748 };
749
750 match historical_client.get_range_instruments(params).await {
751 Ok(instruments) => {
752 log::info!("Retrieved {} instruments", instruments.len());
753 for instrument in instruments {
754 if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
755 log::error!("Failed to send instrument: {e}");
756 }
757 }
758 }
759 Err(e) => {
760 log::error!("Failed to request instruments: {e}");
761 }
762 }
763 });
764
765 Ok(())
766 }
767
768 fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
769 log::debug!("Request quotes: {request:?}");
770
771 let historical_client = self.historical.clone();
772
773 get_runtime().spawn(async move {
774 let symbols = vec![instrument_id_to_symbol_string(
775 request.instrument_id,
776 &mut AHashMap::new(), )];
778
779 let params = RangeQueryParams {
780 dataset: "GLBX.MDP3".to_string(), symbols,
782 start: request
783 .start
784 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
785 .into(),
786 end: request
787 .end
788 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
789 .map(Into::into),
790 limit: request.limit.map(|l| l.get() as u64),
791 price_precision: None,
792 };
793
794 match historical_client.get_range_quotes(params, None).await {
795 Ok(quotes) => {
796 log::info!("Retrieved {} quotes", quotes.len());
797 }
799 Err(e) => {
800 log::error!("Failed to request quotes: {e}");
801 }
802 }
803 });
804
805 Ok(())
806 }
807
808 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
809 log::debug!("Request trades: {request:?}");
810
811 let historical_client = self.historical.clone();
812
813 get_runtime().spawn(async move {
814 let symbols = vec![instrument_id_to_symbol_string(
815 request.instrument_id,
816 &mut AHashMap::new(), )];
818
819 let params = RangeQueryParams {
820 dataset: "GLBX.MDP3".to_string(), symbols,
822 start: request
823 .start
824 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
825 .into(),
826 end: request
827 .end
828 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
829 .map(Into::into),
830 limit: request.limit.map(|l| l.get() as u64),
831 price_precision: None,
832 };
833
834 match historical_client.get_range_trades(params).await {
835 Ok(trades) => {
836 log::info!("Retrieved {} trades", trades.len());
837 }
839 Err(e) => {
840 log::error!("Failed to request trades: {e}");
841 }
842 }
843 });
844
845 Ok(())
846 }
847
848 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
849 log::debug!("Request bars: {request:?}");
850
851 let historical_client = self.historical.clone();
852
853 get_runtime().spawn(async move {
854 let symbols = vec![instrument_id_to_symbol_string(
855 request.bar_type.instrument_id(),
856 &mut AHashMap::new(), )];
858
859 let params = RangeQueryParams {
860 dataset: "GLBX.MDP3".to_string(), symbols,
862 start: request
863 .start
864 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
865 .into(),
866 end: request
867 .end
868 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
869 .map(Into::into),
870 limit: request.limit.map(|l| l.get() as u64),
871 price_precision: None,
872 };
873
874 let aggregation = match request.bar_type.spec().aggregation {
876 BarAggregation::Second => BarAggregation::Second,
877 BarAggregation::Minute => BarAggregation::Minute,
878 BarAggregation::Hour => BarAggregation::Hour,
879 BarAggregation::Day => BarAggregation::Day,
880 _ => {
881 log::error!(
882 "Unsupported bar aggregation: {:?}",
883 request.bar_type.spec().aggregation
884 );
885 return;
886 }
887 };
888
889 match historical_client
890 .get_range_bars(params, aggregation, true)
891 .await
892 {
893 Ok(bars) => {
894 log::info!("Retrieved {} bars", bars.len());
895 }
897 Err(e) => {
898 log::error!("Failed to request bars: {e}");
899 }
900 }
901 });
902
903 Ok(())
904 }
905}