nautilus_databento/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Databento data client implementation leveraging existing live and historical clients.
17
18use std::{
19    path::PathBuf,
20    sync::{
21        Arc, Mutex, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use databento::live::Subscription;
28use indexmap::IndexMap;
29use nautilus_common::{
30    messages::{
31        DataEvent,
32        data::{
33            RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
34            SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
35            UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
36        },
37    },
38    runner::get_data_event_sender,
39};
40use nautilus_core::time::AtomicTime;
41use nautilus_data::client::DataClient;
42use nautilus_model::{
43    enums::BarAggregation,
44    identifiers::{ClientId, Symbol, Venue},
45    instruments::Instrument,
46};
47use tokio::task::JoinHandle;
48use tokio_util::sync::CancellationToken;
49
50use crate::{
51    historical::{DatabentoHistoricalClient, RangeQueryParams},
52    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
53    loader::DatabentoDataLoader,
54    symbology::instrument_id_to_symbol_string,
55    types::PublisherId,
56};
57
58/// Configuration for the Databento data client.
59#[derive(Debug, Clone)]
60pub struct DatabentoDataClientConfig {
61    /// Databento API key.
62    pub api_key: String,
63    /// Path to publishers.json file.
64    pub publishers_filepath: PathBuf,
65    /// Whether to use exchange as venue for GLBX instruments.
66    pub use_exchange_as_venue: bool,
67    /// Whether to timestamp bars on close.
68    pub bars_timestamp_on_close: bool,
69}
70
71impl DatabentoDataClientConfig {
72    /// Creates a new [`DatabentoDataClientConfig`] instance.
73    #[must_use]
74    pub const fn new(
75        api_key: String,
76        publishers_filepath: PathBuf,
77        use_exchange_as_venue: bool,
78        bars_timestamp_on_close: bool,
79    ) -> Self {
80        Self {
81            api_key,
82            publishers_filepath,
83            use_exchange_as_venue,
84            bars_timestamp_on_close,
85        }
86    }
87}
88
89/// A Databento data client that combines live streaming and historical data functionality.
90///
91/// This client uses the existing `DatabentoFeedHandler` for live data subscriptions
92/// and `DatabentoHistoricalClient` for historical data requests. It supports multiple
93/// datasets simultaneously, with separate feed handlers per dataset.
94#[cfg_attr(feature = "python", pyo3::pyclass)]
95#[derive(Debug)]
96pub struct DatabentoDataClient {
97    /// Client identifier.
98    client_id: ClientId,
99    /// Client configuration.
100    config: DatabentoDataClientConfig,
101    /// Connection state.
102    is_connected: AtomicBool,
103    /// Historical client for on-demand data requests.
104    historical: DatabentoHistoricalClient,
105    /// Data loader for venue-to-dataset mapping.
106    loader: DatabentoDataLoader,
107    /// Feed handler command senders per dataset.
108    cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
109    /// Task handles for lifecycle management.
110    task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
111    /// Cancellation token for graceful shutdown.
112    cancellation_token: CancellationToken,
113    /// Publisher to venue mapping.
114    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
115    /// Symbol to venue mapping (for caching).
116    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
117    /// Data event sender for forwarding data to the async runner.
118    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
119}
120
121impl DatabentoDataClient {
122    /// Creates a new [`DatabentoDataClient`] instance.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if client creation or publisher configuration loading fails.
127    pub fn new(
128        client_id: ClientId,
129        config: DatabentoDataClientConfig,
130        clock: &'static AtomicTime,
131    ) -> anyhow::Result<Self> {
132        let historical = DatabentoHistoricalClient::new(
133            config.api_key.clone(),
134            config.publishers_filepath.clone(),
135            clock,
136            config.use_exchange_as_venue,
137        )?;
138
139        // Create data loader for venue-to-dataset mapping
140        let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
141
142        // Load publisher configuration
143        let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
144        let publishers_vec: Vec<crate::types::DatabentoPublisher> =
145            serde_json::from_str(&file_content)?;
146
147        let publisher_venue_map = publishers_vec
148            .into_iter()
149            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
150            .collect::<IndexMap<u16, Venue>>();
151
152        let data_sender = get_data_event_sender();
153
154        Ok(Self {
155            client_id,
156            config,
157            is_connected: AtomicBool::new(false),
158            historical,
159            loader,
160            cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
161            task_handles: Arc::new(Mutex::new(Vec::new())),
162            cancellation_token: CancellationToken::new(),
163            publisher_venue_map: Arc::new(publisher_venue_map),
164            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
165            data_sender,
166        })
167    }
168
169    /// Gets the dataset for a given venue using the data loader.
170    fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
171        self.loader
172            .get_dataset_for_venue(&venue)
173            .map(ToString::to_string)
174            .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
175    }
176
177    /// Gets or creates a feed handler for the specified dataset.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if the feed handler cannot be created.
182    fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
183        let mut channels = self.cmd_channels.lock().unwrap();
184
185        if !channels.contains_key(dataset) {
186            tracing::info!("Creating new feed handler for dataset: {dataset}");
187            let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
188            channels.insert(dataset.to_string(), cmd_tx);
189
190            tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
191        }
192
193        Ok(())
194    }
195
196    /// Sends a command to a specific dataset's feed handler.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the command cannot be sent.
201    fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
202        let channels = self.cmd_channels.lock().unwrap();
203        if let Some(tx) = channels.get(dataset) {
204            tx.send(cmd)
205                .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
206        } else {
207            anyhow::bail!("No feed handler found for dataset: {dataset}");
208        }
209        Ok(())
210    }
211
212    /// Initializes the live feed handler for streaming data.
213    ///
214    /// # Errors
215    ///
216    /// Returns an error if the feed handler cannot be started.
217    fn initialize_live_feed(
218        &self,
219        dataset: String,
220    ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
221        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
222        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
223
224        let mut feed_handler = DatabentoFeedHandler::new(
225            self.config.api_key.clone(),
226            dataset,
227            cmd_rx,
228            msg_tx,
229            (*self.publisher_venue_map).clone(),
230            self.symbol_venue_map.clone(),
231            self.config.use_exchange_as_venue,
232            self.config.bars_timestamp_on_close,
233        );
234
235        let cancellation_token = self.cancellation_token.clone();
236
237        // Spawn the feed handler task with cancellation support
238        let feed_handle = tokio::spawn(async move {
239            tokio::select! {
240                result = feed_handler.run() => {
241                    if let Err(e) = result {
242                        tracing::error!("Feed handler error: {e}");
243                    }
244                }
245                () = cancellation_token.cancelled() => {
246                    tracing::info!("Feed handler cancelled");
247                }
248            }
249        });
250
251        let cancellation_token = self.cancellation_token.clone();
252        let data_sender = self.data_sender.clone();
253
254        // Spawn message processing task with cancellation support
255        let msg_handle = tokio::spawn(async move {
256            let mut msg_rx = msg_rx;
257            loop {
258                tokio::select! {
259                    msg = msg_rx.recv() => {
260                        match msg {
261                            Some(LiveMessage::Data(data)) => {
262                                tracing::debug!("Received data: {data:?}");
263                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
264                                    tracing::error!("Failed to send data event: {e}");
265                                }
266                            }
267                            Some(LiveMessage::Instrument(instrument)) => {
268                                tracing::debug!("Received instrument: {}", instrument.id());
269                                // TODO: Forward to cache or instrument manager
270                            }
271                            Some(LiveMessage::Status(status)) => {
272                                tracing::debug!("Received status: {status:?}");
273                                // TODO: Forward to appropriate handler
274                            }
275                            Some(LiveMessage::Imbalance(imbalance)) => {
276                                tracing::debug!("Received imbalance: {imbalance:?}");
277                                // TODO: Forward to appropriate handler
278                            }
279                            Some(LiveMessage::Statistics(statistics)) => {
280                                tracing::debug!("Received statistics: {statistics:?}");
281                                // TODO: Forward to appropriate handler
282                            }
283                            Some(LiveMessage::Error(error)) => {
284                                tracing::error!("Feed handler error: {error}");
285                                // TODO: Handle error appropriately
286                            }
287                            Some(LiveMessage::Close) => {
288                                tracing::info!("Feed handler closed");
289                                break;
290                            }
291                            None => {
292                                tracing::debug!("Message channel closed");
293                                break;
294                            }
295                        }
296                    }
297                    () = cancellation_token.cancelled() => {
298                        tracing::info!("Message processing cancelled");
299                        break;
300                    }
301                }
302            }
303        });
304
305        {
306            let mut handles = self.task_handles.lock().unwrap();
307            handles.push(feed_handle);
308            handles.push(msg_handle);
309        }
310
311        Ok(cmd_tx)
312    }
313}
314
315#[async_trait::async_trait]
316impl DataClient for DatabentoDataClient {
317    fn client_id(&self) -> ClientId {
318        self.client_id
319    }
320
321    fn venue(&self) -> Option<Venue> {
322        None
323    }
324
325    fn start(&mut self) -> anyhow::Result<()> {
326        tracing::debug!("Starting Databento data client");
327        Ok(())
328    }
329
330    fn stop(&mut self) -> anyhow::Result<()> {
331        tracing::debug!("Stopping Databento data client");
332
333        // Signal cancellation to all running tasks
334        self.cancellation_token.cancel();
335
336        // Send close command to all active feed handlers
337        let channels = self.cmd_channels.lock().unwrap();
338        for (dataset, tx) in channels.iter() {
339            if let Err(e) = tx.send(LiveCommand::Close) {
340                tracing::error!("Failed to send close command to dataset {dataset}: {e}");
341            }
342        }
343
344        self.is_connected.store(false, Ordering::Relaxed);
345        Ok(())
346    }
347
348    fn reset(&mut self) -> anyhow::Result<()> {
349        tracing::debug!("Resetting Databento data client");
350        self.is_connected.store(false, Ordering::Relaxed);
351        Ok(())
352    }
353
354    fn dispose(&mut self) -> anyhow::Result<()> {
355        tracing::debug!("Disposing Databento data client");
356        self.stop()
357    }
358
359    async fn connect(&mut self) -> anyhow::Result<()> {
360        tracing::debug!("Connecting Databento data client");
361
362        // Connection will happen lazily when subscriptions are made
363        // No need to create feed handlers upfront since we don't know which datasets will be needed
364        self.is_connected.store(true, Ordering::Relaxed);
365
366        tracing::info!("Databento data client connected");
367        Ok(())
368    }
369
370    async fn disconnect(&mut self) -> anyhow::Result<()> {
371        tracing::debug!("Disconnecting Databento data client");
372
373        // Signal cancellation to all running tasks
374        self.cancellation_token.cancel();
375
376        // Send close command to all active feed handlers
377        {
378            let channels = self.cmd_channels.lock().unwrap();
379            for (dataset, tx) in channels.iter() {
380                if let Err(e) = tx.send(LiveCommand::Close) {
381                    tracing::error!("Failed to send close command to dataset {dataset}: {e}");
382                }
383            }
384        }
385
386        // Wait for all spawned tasks to complete
387        let handles = {
388            let mut task_handles = self.task_handles.lock().unwrap();
389            std::mem::take(&mut *task_handles)
390        };
391
392        for handle in handles {
393            if let Err(e) = handle.await
394                && !e.is_cancelled()
395            {
396                tracing::error!("Task join error: {e}");
397            }
398        }
399
400        self.is_connected.store(false, Ordering::Relaxed);
401
402        {
403            let mut channels = self.cmd_channels.lock().unwrap();
404            channels.clear();
405        }
406
407        tracing::info!("Databento data client disconnected");
408        Ok(())
409    }
410
411    fn is_connected(&self) -> bool {
412        self.is_connected.load(Ordering::Relaxed)
413    }
414
415    fn is_disconnected(&self) -> bool {
416        !self.is_connected()
417    }
418
419    // Live subscription methods using the feed handler
420    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
421        tracing::debug!("Subscribe quotes: {cmd:?}");
422
423        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
424        let was_new_handler = {
425            let channels = self.cmd_channels.lock().unwrap();
426            !channels.contains_key(&dataset)
427        };
428
429        self.get_or_create_feed_handler(&dataset)?;
430
431        // Start the feed handler if it was newly created
432        if was_new_handler {
433            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
434        }
435
436        let symbol = instrument_id_to_symbol_string(
437            cmd.instrument_id,
438            &mut self.symbol_venue_map.write().unwrap(),
439        );
440
441        let subscription = Subscription::builder()
442            .schema(databento::dbn::Schema::Mbp1) // Market by price level 1 for quotes
443            .symbols(symbol)
444            .build();
445
446        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
447
448        Ok(())
449    }
450
451    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
452        tracing::debug!("Subscribe trades: {cmd:?}");
453
454        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
455        let was_new_handler = {
456            let channels = self.cmd_channels.lock().unwrap();
457            !channels.contains_key(&dataset)
458        };
459
460        self.get_or_create_feed_handler(&dataset)?;
461
462        // Start the feed handler if it was newly created
463        if was_new_handler {
464            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
465        }
466
467        let symbol = instrument_id_to_symbol_string(
468            cmd.instrument_id,
469            &mut self.symbol_venue_map.write().unwrap(),
470        );
471
472        let subscription = Subscription::builder()
473            .schema(databento::dbn::Schema::Trades)
474            .symbols(symbol)
475            .build();
476
477        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
478
479        Ok(())
480    }
481
482    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
483        tracing::debug!("Subscribe book deltas: {cmd:?}");
484
485        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
486        let was_new_handler = {
487            let channels = self.cmd_channels.lock().unwrap();
488            !channels.contains_key(&dataset)
489        };
490
491        self.get_or_create_feed_handler(&dataset)?;
492
493        // Start the feed handler if it was newly created
494        if was_new_handler {
495            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
496        }
497
498        let symbol = instrument_id_to_symbol_string(
499            cmd.instrument_id,
500            &mut self.symbol_venue_map.write().unwrap(),
501        );
502
503        let subscription = Subscription::builder()
504            .schema(databento::dbn::Schema::Mbo) // Market by order for book deltas
505            .symbols(symbol)
506            .build();
507
508        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
509
510        Ok(())
511    }
512
513    fn subscribe_instrument_status(
514        &mut self,
515        cmd: &SubscribeInstrumentStatus,
516    ) -> anyhow::Result<()> {
517        tracing::debug!("Subscribe instrument status: {cmd:?}");
518
519        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
520        let was_new_handler = {
521            let channels = self.cmd_channels.lock().unwrap();
522            !channels.contains_key(&dataset)
523        };
524
525        self.get_or_create_feed_handler(&dataset)?;
526
527        // Start the feed handler if it was newly created
528        if was_new_handler {
529            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
530        }
531
532        let symbol = instrument_id_to_symbol_string(
533            cmd.instrument_id,
534            &mut self.symbol_venue_map.write().unwrap(),
535        );
536
537        let subscription = Subscription::builder()
538            .schema(databento::dbn::Schema::Status)
539            .symbols(symbol)
540            .build();
541
542        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
543
544        Ok(())
545    }
546
547    // Unsubscribe methods
548    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
549        tracing::debug!("Unsubscribe quotes: {cmd:?}");
550
551        // Note: Databento live API doesn't support granular unsubscribing.
552        // The feed handler manages subscriptions and can handle reconnections
553        // with the appropriate subscription state.
554        tracing::warn!(
555            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
556            cmd.instrument_id
557        );
558
559        Ok(())
560    }
561
562    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
563        tracing::debug!("Unsubscribe trades: {cmd:?}");
564
565        // Note: Databento live API doesn't support granular unsubscribing.
566        // The feed handler manages subscriptions and can handle reconnections
567        // with the appropriate subscription state.
568        tracing::warn!(
569            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
570            cmd.instrument_id
571        );
572
573        Ok(())
574    }
575
576    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
577        tracing::debug!("Unsubscribe book deltas: {cmd:?}");
578
579        // Note: Databento live API doesn't support granular unsubscribing.
580        // The feed handler manages subscriptions and can handle reconnections
581        // with the appropriate subscription state.
582        tracing::warn!(
583            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
584            cmd.instrument_id
585        );
586
587        Ok(())
588    }
589
590    fn unsubscribe_instrument_status(
591        &mut self,
592        cmd: &UnsubscribeInstrumentStatus,
593    ) -> anyhow::Result<()> {
594        tracing::debug!("Unsubscribe instrument status: {cmd:?}");
595
596        // Note: Databento live API doesn't support granular unsubscribing.
597        // The feed handler manages subscriptions and can handle reconnections
598        // with the appropriate subscription state.
599        tracing::warn!(
600            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
601            cmd.instrument_id
602        );
603
604        Ok(())
605    }
606
607    // Historical data request methods using the historical client
608    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
609        tracing::debug!("Request instruments: {request:?}");
610
611        let historical_client = self.historical.clone();
612        let request = request.clone();
613
614        tokio::spawn(async move {
615            // Convert request to historical query parameters
616            // For now, use a default symbol set or derive from venue
617            let symbols = vec!["ALL_SYMBOLS".to_string()]; // TODO: Improve symbol handling
618
619            let params = RangeQueryParams {
620                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
621                symbols,
622                start: request
623                    .start
624                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
625                    .into(),
626                end: request
627                    .end
628                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
629                    .map(Into::into),
630                limit: None,
631                price_precision: None,
632            };
633
634            match historical_client.get_range_instruments(params).await {
635                Ok(instruments) => {
636                    tracing::info!("Retrieved {} instruments", instruments.len());
637                    // TODO: Send instruments to message bus or cache
638                }
639                Err(e) => {
640                    tracing::error!("Failed to request instruments: {e}");
641                }
642            }
643        });
644
645        Ok(())
646    }
647
648    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
649        tracing::debug!("Request quotes: {request:?}");
650
651        let historical_client = self.historical.clone();
652        let request = request.clone();
653
654        tokio::spawn(async move {
655            let symbols = vec![instrument_id_to_symbol_string(
656                request.instrument_id,
657                &mut AHashMap::new(), // TODO: Use proper symbol map
658            )];
659
660            let params = RangeQueryParams {
661                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
662                symbols,
663                start: request
664                    .start
665                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
666                    .into(),
667                end: request
668                    .end
669                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
670                    .map(Into::into),
671                limit: request.limit.map(|l| l.get() as u64),
672                price_precision: None,
673            };
674
675            match historical_client.get_range_quotes(params, None).await {
676                Ok(quotes) => {
677                    tracing::info!("Retrieved {} quotes", quotes.len());
678                    // TODO: Send quotes to message bus
679                }
680                Err(e) => {
681                    tracing::error!("Failed to request quotes: {e}");
682                }
683            }
684        });
685
686        Ok(())
687    }
688
689    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
690        tracing::debug!("Request trades: {request:?}");
691
692        let historical_client = self.historical.clone();
693        let request = request.clone();
694
695        tokio::spawn(async move {
696            let symbols = vec![instrument_id_to_symbol_string(
697                request.instrument_id,
698                &mut AHashMap::new(), // TODO: Use proper symbol map
699            )];
700
701            let params = RangeQueryParams {
702                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
703                symbols,
704                start: request
705                    .start
706                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
707                    .into(),
708                end: request
709                    .end
710                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
711                    .map(Into::into),
712                limit: request.limit.map(|l| l.get() as u64),
713                price_precision: None,
714            };
715
716            match historical_client.get_range_trades(params).await {
717                Ok(trades) => {
718                    tracing::info!("Retrieved {} trades", trades.len());
719                    // TODO: Send trades to message bus
720                }
721                Err(e) => {
722                    tracing::error!("Failed to request trades: {e}");
723                }
724            }
725        });
726
727        Ok(())
728    }
729
730    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
731        tracing::debug!("Request bars: {request:?}");
732
733        let historical_client = self.historical.clone();
734        let request = request.clone();
735
736        tokio::spawn(async move {
737            let symbols = vec![instrument_id_to_symbol_string(
738                request.bar_type.instrument_id(),
739                &mut AHashMap::new(), // TODO: Use proper symbol map
740            )];
741
742            let params = RangeQueryParams {
743                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
744                symbols,
745                start: request
746                    .start
747                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
748                    .into(),
749                end: request
750                    .end
751                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
752                    .map(Into::into),
753                limit: request.limit.map(|l| l.get() as u64),
754                price_precision: None,
755            };
756
757            // Map bar aggregation from the request
758            let aggregation = match request.bar_type.spec().aggregation {
759                BarAggregation::Second => BarAggregation::Second,
760                BarAggregation::Minute => BarAggregation::Minute,
761                BarAggregation::Hour => BarAggregation::Hour,
762                BarAggregation::Day => BarAggregation::Day,
763                _ => {
764                    tracing::error!(
765                        "Unsupported bar aggregation: {:?}",
766                        request.bar_type.spec().aggregation
767                    );
768                    return;
769                }
770            };
771
772            match historical_client
773                .get_range_bars(params, aggregation, true)
774                .await
775            {
776                Ok(bars) => {
777                    tracing::info!("Retrieved {} bars", bars.len());
778                    // TODO: Send bars to message bus
779                }
780                Err(e) => {
781                    tracing::error!("Failed to request bars: {e}");
782                }
783            }
784        });
785
786        Ok(())
787    }
788}