nautilus_databento/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a unified data client that combines Databento's live streaming and historical data capabilities.
17//!
18//! This module implements a data client that manages connections to multiple Databento datasets,
19//! handles live market data subscriptions, and provides access to historical data on demand.
20
21use std::{
22    path::PathBuf,
23    sync::{
24        Arc, Mutex, RwLock,
25        atomic::{AtomicBool, Ordering},
26    },
27};
28
29use ahash::AHashMap;
30use databento::live::Subscription;
31use indexmap::IndexMap;
32use nautilus_common::{
33    messages::{
34        DataEvent,
35        data::{
36            RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
37            SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
38            UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
39        },
40    },
41    runner::get_data_event_sender,
42};
43use nautilus_core::time::AtomicTime;
44use nautilus_data::client::DataClient;
45use nautilus_model::{
46    enums::BarAggregation,
47    identifiers::{ClientId, Symbol, Venue},
48    instruments::Instrument,
49};
50use tokio::task::JoinHandle;
51use tokio_util::sync::CancellationToken;
52
53use crate::{
54    historical::{DatabentoHistoricalClient, RangeQueryParams},
55    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
56    loader::DatabentoDataLoader,
57    symbology::instrument_id_to_symbol_string,
58    types::PublisherId,
59};
60
61/// Configuration for the Databento data client.
62#[derive(Debug, Clone)]
63pub struct DatabentoDataClientConfig {
64    /// Databento API key.
65    pub api_key: String,
66    /// Path to publishers.json file.
67    pub publishers_filepath: PathBuf,
68    /// Whether to use exchange as venue for GLBX instruments.
69    pub use_exchange_as_venue: bool,
70    /// Whether to timestamp bars on close.
71    pub bars_timestamp_on_close: bool,
72}
73
74impl DatabentoDataClientConfig {
75    /// Creates a new [`DatabentoDataClientConfig`] instance.
76    #[must_use]
77    pub const fn new(
78        api_key: String,
79        publishers_filepath: PathBuf,
80        use_exchange_as_venue: bool,
81        bars_timestamp_on_close: bool,
82    ) -> Self {
83        Self {
84            api_key,
85            publishers_filepath,
86            use_exchange_as_venue,
87            bars_timestamp_on_close,
88        }
89    }
90}
91
92/// A Databento data client that combines live streaming and historical data functionality.
93///
94/// This client uses the existing `DatabentoFeedHandler` for live data subscriptions
95/// and `DatabentoHistoricalClient` for historical data requests. It supports multiple
96/// datasets simultaneously, with separate feed handlers per dataset.
97#[cfg_attr(feature = "python", pyo3::pyclass)]
98#[derive(Debug)]
99pub struct DatabentoDataClient {
100    /// Client identifier.
101    client_id: ClientId,
102    /// Client configuration.
103    config: DatabentoDataClientConfig,
104    /// Connection state.
105    is_connected: AtomicBool,
106    /// Historical client for on-demand data requests.
107    historical: DatabentoHistoricalClient,
108    /// Data loader for venue-to-dataset mapping.
109    loader: DatabentoDataLoader,
110    /// Feed handler command senders per dataset.
111    cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
112    /// Task handles for lifecycle management.
113    task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
114    /// Cancellation token for graceful shutdown.
115    cancellation_token: CancellationToken,
116    /// Publisher to venue mapping.
117    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
118    /// Symbol to venue mapping (for caching).
119    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
120    /// Data event sender for forwarding data to the async runner.
121    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
122}
123
124impl DatabentoDataClient {
125    /// Creates a new [`DatabentoDataClient`] instance.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if client creation or publisher configuration loading fails.
130    pub fn new(
131        client_id: ClientId,
132        config: DatabentoDataClientConfig,
133        clock: &'static AtomicTime,
134    ) -> anyhow::Result<Self> {
135        let historical = DatabentoHistoricalClient::new(
136            config.api_key.clone(),
137            config.publishers_filepath.clone(),
138            clock,
139            config.use_exchange_as_venue,
140        )?;
141
142        // Create data loader for venue-to-dataset mapping
143        let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
144
145        // Load publisher configuration
146        let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
147        let publishers_vec: Vec<crate::types::DatabentoPublisher> =
148            serde_json::from_str(&file_content)?;
149
150        let publisher_venue_map = publishers_vec
151            .into_iter()
152            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
153            .collect::<IndexMap<u16, Venue>>();
154
155        let data_sender = get_data_event_sender();
156
157        Ok(Self {
158            client_id,
159            config,
160            is_connected: AtomicBool::new(false),
161            historical,
162            loader,
163            cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
164            task_handles: Arc::new(Mutex::new(Vec::new())),
165            cancellation_token: CancellationToken::new(),
166            publisher_venue_map: Arc::new(publisher_venue_map),
167            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
168            data_sender,
169        })
170    }
171
172    /// Gets the dataset for a given venue using the data loader.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if the venue-to-dataset mapping cannot be found.
177    fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
178        self.loader
179            .get_dataset_for_venue(&venue)
180            .map(ToString::to_string)
181            .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
182    }
183
184    /// Gets or creates a feed handler for the specified dataset.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the feed handler cannot be created.
189    fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
190        let mut channels = self.cmd_channels.lock().unwrap();
191
192        if !channels.contains_key(dataset) {
193            tracing::info!("Creating new feed handler for dataset: {dataset}");
194            let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
195            channels.insert(dataset.to_string(), cmd_tx);
196
197            tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
198        }
199
200        Ok(())
201    }
202
203    /// Sends a command to a specific dataset's feed handler.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if the command cannot be sent.
208    fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
209        let channels = self.cmd_channels.lock().unwrap();
210        if let Some(tx) = channels.get(dataset) {
211            tx.send(cmd)
212                .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
213        } else {
214            anyhow::bail!("No feed handler found for dataset: {dataset}");
215        }
216        Ok(())
217    }
218
219    /// Initializes the live feed handler for streaming data.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the feed handler cannot be started.
224    fn initialize_live_feed(
225        &self,
226        dataset: String,
227    ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
228        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
229        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
230
231        let mut feed_handler = DatabentoFeedHandler::new(
232            self.config.api_key.clone(),
233            dataset,
234            cmd_rx,
235            msg_tx,
236            (*self.publisher_venue_map).clone(),
237            self.symbol_venue_map.clone(),
238            self.config.use_exchange_as_venue,
239            self.config.bars_timestamp_on_close,
240        );
241
242        let cancellation_token = self.cancellation_token.clone();
243
244        // Spawn the feed handler task with cancellation support
245        let feed_handle = tokio::spawn(async move {
246            tokio::select! {
247                result = feed_handler.run() => {
248                    if let Err(e) = result {
249                        tracing::error!("Feed handler error: {e}");
250                    }
251                }
252                () = cancellation_token.cancelled() => {
253                    tracing::debug!("Feed handler cancelled");
254                }
255            }
256        });
257
258        let cancellation_token = self.cancellation_token.clone();
259        let data_sender = self.data_sender.clone();
260
261        // Spawn message processing task with cancellation support
262        let msg_handle = tokio::spawn(async move {
263            let mut msg_rx = msg_rx;
264            loop {
265                tokio::select! {
266                    msg = msg_rx.recv() => {
267                        match msg {
268                            Some(LiveMessage::Data(data)) => {
269                                tracing::debug!("Received data: {data:?}");
270                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
271                                    tracing::error!("Failed to send data event: {e}");
272                                }
273                            }
274                            Some(LiveMessage::Instrument(instrument)) => {
275                                tracing::debug!("Received instrument: {}", instrument.id());
276                                // TODO: Forward to cache or instrument manager
277                            }
278                            Some(LiveMessage::Status(status)) => {
279                                tracing::debug!("Received status: {status:?}");
280                                // TODO: Forward to appropriate handler
281                            }
282                            Some(LiveMessage::Imbalance(imbalance)) => {
283                                tracing::debug!("Received imbalance: {imbalance:?}");
284                                // TODO: Forward to appropriate handler
285                            }
286                            Some(LiveMessage::Statistics(statistics)) => {
287                                tracing::debug!("Received statistics: {statistics:?}");
288                                // TODO: Forward to appropriate handler
289                            }
290                            Some(LiveMessage::Error(error)) => {
291                                tracing::error!("Feed handler error: {error}");
292                                // TODO: Handle error appropriately
293                            }
294                            Some(LiveMessage::Close) => {
295                                tracing::info!("Feed handler closed");
296                                break;
297                            }
298                            None => {
299                                tracing::debug!("Message channel closed");
300                                break;
301                            }
302                        }
303                    }
304                    () = cancellation_token.cancelled() => {
305                        tracing::debug!("Message processing cancelled");
306                        break;
307                    }
308                }
309            }
310        });
311
312        {
313            let mut handles = self.task_handles.lock().unwrap();
314            handles.push(feed_handle);
315            handles.push(msg_handle);
316        }
317
318        Ok(cmd_tx)
319    }
320}
321
322#[async_trait::async_trait]
323impl DataClient for DatabentoDataClient {
324    /// Returns the client identifier.
325    fn client_id(&self) -> ClientId {
326        self.client_id
327    }
328
329    /// Returns the venue associated with this client (None for multi-venue clients).
330    fn venue(&self) -> Option<Venue> {
331        None
332    }
333
334    /// Starts the data client.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the client fails to start.
339    fn start(&mut self) -> anyhow::Result<()> {
340        tracing::debug!("Starting");
341        Ok(())
342    }
343
344    /// Stops the data client and cancels all active subscriptions.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the client fails to stop cleanly.
349    fn stop(&mut self) -> anyhow::Result<()> {
350        tracing::debug!("Stopping");
351
352        // Signal cancellation to all running tasks
353        self.cancellation_token.cancel();
354
355        // Send close command to all active feed handlers
356        let channels = self.cmd_channels.lock().unwrap();
357        for (dataset, tx) in channels.iter() {
358            if let Err(e) = tx.send(LiveCommand::Close) {
359                tracing::error!("Failed to send close command to dataset {dataset}: {e}");
360            }
361        }
362
363        self.is_connected.store(false, Ordering::Relaxed);
364        Ok(())
365    }
366
367    fn reset(&mut self) -> anyhow::Result<()> {
368        tracing::debug!("Resetting");
369        self.is_connected.store(false, Ordering::Relaxed);
370        Ok(())
371    }
372
373    fn dispose(&mut self) -> anyhow::Result<()> {
374        tracing::debug!("Disposing");
375        self.stop()
376    }
377
378    async fn connect(&mut self) -> anyhow::Result<()> {
379        tracing::debug!("Connecting...");
380
381        // Connection will happen lazily when subscriptions are made
382        // No need to create feed handlers upfront since we don't know which datasets will be needed
383        self.is_connected.store(true, Ordering::Relaxed);
384
385        tracing::info!("Connected");
386        Ok(())
387    }
388
389    async fn disconnect(&mut self) -> anyhow::Result<()> {
390        tracing::debug!("Disconnecting...");
391
392        // Signal cancellation to all running tasks
393        self.cancellation_token.cancel();
394
395        // Send close command to all active feed handlers
396        {
397            let channels = self.cmd_channels.lock().unwrap();
398            for (dataset, tx) in channels.iter() {
399                if let Err(e) = tx.send(LiveCommand::Close) {
400                    tracing::error!("Failed to send close command to dataset {dataset}: {e}");
401                }
402            }
403        }
404
405        // Wait for all spawned tasks to complete
406        let handles = {
407            let mut task_handles = self.task_handles.lock().unwrap();
408            std::mem::take(&mut *task_handles)
409        };
410
411        for handle in handles {
412            if let Err(e) = handle.await
413                && !e.is_cancelled()
414            {
415                tracing::error!("Task join error: {e}");
416            }
417        }
418
419        self.is_connected.store(false, Ordering::Relaxed);
420
421        {
422            let mut channels = self.cmd_channels.lock().unwrap();
423            channels.clear();
424        }
425
426        tracing::info!("Disconnected");
427        Ok(())
428    }
429
430    /// Returns whether the client is currently connected.
431    fn is_connected(&self) -> bool {
432        self.is_connected.load(Ordering::Relaxed)
433    }
434
435    fn is_disconnected(&self) -> bool {
436        !self.is_connected()
437    }
438
439    /// Subscribes to quote tick data for the specified instruments.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if the subscription request fails.
444    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
445        tracing::debug!("Subscribe quotes: {cmd:?}");
446
447        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
448        let was_new_handler = {
449            let channels = self.cmd_channels.lock().unwrap();
450            !channels.contains_key(&dataset)
451        };
452
453        self.get_or_create_feed_handler(&dataset)?;
454
455        // Start the feed handler if it was newly created
456        if was_new_handler {
457            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
458        }
459
460        let symbol = instrument_id_to_symbol_string(
461            cmd.instrument_id,
462            &mut self.symbol_venue_map.write().unwrap(),
463        );
464
465        let subscription = Subscription::builder()
466            .schema(databento::dbn::Schema::Mbp1) // Market by price level 1 for quotes
467            .symbols(symbol)
468            .build();
469
470        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
471
472        Ok(())
473    }
474
475    /// Subscribes to trade tick data for the specified instruments.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the subscription request fails.
480    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
481        tracing::debug!("Subscribe trades: {cmd:?}");
482
483        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
484        let was_new_handler = {
485            let channels = self.cmd_channels.lock().unwrap();
486            !channels.contains_key(&dataset)
487        };
488
489        self.get_or_create_feed_handler(&dataset)?;
490
491        // Start the feed handler if it was newly created
492        if was_new_handler {
493            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
494        }
495
496        let symbol = instrument_id_to_symbol_string(
497            cmd.instrument_id,
498            &mut self.symbol_venue_map.write().unwrap(),
499        );
500
501        let subscription = Subscription::builder()
502            .schema(databento::dbn::Schema::Trades)
503            .symbols(symbol)
504            .build();
505
506        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
507
508        Ok(())
509    }
510
511    /// Subscribes to order book delta updates for the specified instruments.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the subscription request fails.
516    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
517        tracing::debug!("Subscribe book deltas: {cmd:?}");
518
519        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
520        let was_new_handler = {
521            let channels = self.cmd_channels.lock().unwrap();
522            !channels.contains_key(&dataset)
523        };
524
525        self.get_or_create_feed_handler(&dataset)?;
526
527        // Start the feed handler if it was newly created
528        if was_new_handler {
529            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
530        }
531
532        let symbol = instrument_id_to_symbol_string(
533            cmd.instrument_id,
534            &mut self.symbol_venue_map.write().unwrap(),
535        );
536
537        let subscription = Subscription::builder()
538            .schema(databento::dbn::Schema::Mbo) // Market by order for book deltas
539            .symbols(symbol)
540            .build();
541
542        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
543
544        Ok(())
545    }
546
547    /// Subscribes to instrument status updates for the specified instruments.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the subscription request fails.
552    fn subscribe_instrument_status(
553        &mut self,
554        cmd: &SubscribeInstrumentStatus,
555    ) -> anyhow::Result<()> {
556        tracing::debug!("Subscribe instrument status: {cmd:?}");
557
558        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
559        let was_new_handler = {
560            let channels = self.cmd_channels.lock().unwrap();
561            !channels.contains_key(&dataset)
562        };
563
564        self.get_or_create_feed_handler(&dataset)?;
565
566        // Start the feed handler if it was newly created
567        if was_new_handler {
568            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
569        }
570
571        let symbol = instrument_id_to_symbol_string(
572            cmd.instrument_id,
573            &mut self.symbol_venue_map.write().unwrap(),
574        );
575
576        let subscription = Subscription::builder()
577            .schema(databento::dbn::Schema::Status)
578            .symbols(symbol)
579            .build();
580
581        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
582
583        Ok(())
584    }
585
586    // Unsubscribe methods
587    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
588        tracing::debug!("Unsubscribe quotes: {cmd:?}");
589
590        // Note: Databento live API doesn't support granular unsubscribing.
591        // The feed handler manages subscriptions and can handle reconnections
592        // with the appropriate subscription state.
593        tracing::warn!(
594            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
595            cmd.instrument_id
596        );
597
598        Ok(())
599    }
600
601    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
602        tracing::debug!("Unsubscribe trades: {cmd:?}");
603
604        // Note: Databento live API doesn't support granular unsubscribing.
605        // The feed handler manages subscriptions and can handle reconnections
606        // with the appropriate subscription state.
607        tracing::warn!(
608            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
609            cmd.instrument_id
610        );
611
612        Ok(())
613    }
614
615    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
616        tracing::debug!("Unsubscribe book deltas: {cmd:?}");
617
618        // Note: Databento live API doesn't support granular unsubscribing.
619        // The feed handler manages subscriptions and can handle reconnections
620        // with the appropriate subscription state.
621        tracing::warn!(
622            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
623            cmd.instrument_id
624        );
625
626        Ok(())
627    }
628
629    fn unsubscribe_instrument_status(
630        &mut self,
631        cmd: &UnsubscribeInstrumentStatus,
632    ) -> anyhow::Result<()> {
633        tracing::debug!("Unsubscribe instrument status: {cmd:?}");
634
635        // Note: Databento live API doesn't support granular unsubscribing.
636        // The feed handler manages subscriptions and can handle reconnections
637        // with the appropriate subscription state.
638        tracing::warn!(
639            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
640            cmd.instrument_id
641        );
642
643        Ok(())
644    }
645
646    // Historical data request methods using the historical client
647    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
648        tracing::debug!("Request instruments: {request:?}");
649
650        let historical_client = self.historical.clone();
651        let request = request.clone();
652
653        tokio::spawn(async move {
654            // Convert request to historical query parameters
655            // For now, use a default symbol set or derive from venue
656            let symbols = vec!["ALL_SYMBOLS".to_string()]; // TODO: Improve symbol handling
657
658            let params = RangeQueryParams {
659                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
660                symbols,
661                start: request
662                    .start
663                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
664                    .into(),
665                end: request
666                    .end
667                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
668                    .map(Into::into),
669                limit: None,
670                price_precision: None,
671            };
672
673            match historical_client.get_range_instruments(params).await {
674                Ok(instruments) => {
675                    tracing::info!("Retrieved {} instruments", instruments.len());
676                    // TODO: Send instruments to message bus or cache
677                }
678                Err(e) => {
679                    tracing::error!("Failed to request instruments: {e}");
680                }
681            }
682        });
683
684        Ok(())
685    }
686
687    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
688        tracing::debug!("Request quotes: {request:?}");
689
690        let historical_client = self.historical.clone();
691        let request = request.clone();
692
693        tokio::spawn(async move {
694            let symbols = vec![instrument_id_to_symbol_string(
695                request.instrument_id,
696                &mut AHashMap::new(), // TODO: Use proper symbol map
697            )];
698
699            let params = RangeQueryParams {
700                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
701                symbols,
702                start: request
703                    .start
704                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
705                    .into(),
706                end: request
707                    .end
708                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
709                    .map(Into::into),
710                limit: request.limit.map(|l| l.get() as u64),
711                price_precision: None,
712            };
713
714            match historical_client.get_range_quotes(params, None).await {
715                Ok(quotes) => {
716                    tracing::info!("Retrieved {} quotes", quotes.len());
717                    // TODO: Send quotes to message bus
718                }
719                Err(e) => {
720                    tracing::error!("Failed to request quotes: {e}");
721                }
722            }
723        });
724
725        Ok(())
726    }
727
728    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
729        tracing::debug!("Request trades: {request:?}");
730
731        let historical_client = self.historical.clone();
732        let request = request.clone();
733
734        tokio::spawn(async move {
735            let symbols = vec![instrument_id_to_symbol_string(
736                request.instrument_id,
737                &mut AHashMap::new(), // TODO: Use proper symbol map
738            )];
739
740            let params = RangeQueryParams {
741                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
742                symbols,
743                start: request
744                    .start
745                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
746                    .into(),
747                end: request
748                    .end
749                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
750                    .map(Into::into),
751                limit: request.limit.map(|l| l.get() as u64),
752                price_precision: None,
753            };
754
755            match historical_client.get_range_trades(params).await {
756                Ok(trades) => {
757                    tracing::info!("Retrieved {} trades", trades.len());
758                    // TODO: Send trades to message bus
759                }
760                Err(e) => {
761                    tracing::error!("Failed to request trades: {e}");
762                }
763            }
764        });
765
766        Ok(())
767    }
768
769    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
770        tracing::debug!("Request bars: {request:?}");
771
772        let historical_client = self.historical.clone();
773        let request = request.clone();
774
775        tokio::spawn(async move {
776            let symbols = vec![instrument_id_to_symbol_string(
777                request.bar_type.instrument_id(),
778                &mut AHashMap::new(), // TODO: Use proper symbol map
779            )];
780
781            let params = RangeQueryParams {
782                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
783                symbols,
784                start: request
785                    .start
786                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
787                    .into(),
788                end: request
789                    .end
790                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
791                    .map(Into::into),
792                limit: request.limit.map(|l| l.get() as u64),
793                price_precision: None,
794            };
795
796            // Map bar aggregation from the request
797            let aggregation = match request.bar_type.spec().aggregation {
798                BarAggregation::Second => BarAggregation::Second,
799                BarAggregation::Minute => BarAggregation::Minute,
800                BarAggregation::Hour => BarAggregation::Hour,
801                BarAggregation::Day => BarAggregation::Day,
802                _ => {
803                    tracing::error!(
804                        "Unsupported bar aggregation: {:?}",
805                        request.bar_type.spec().aggregation
806                    );
807                    return;
808                }
809            };
810
811            match historical_client
812                .get_range_bars(params, aggregation, true)
813                .await
814            {
815                Ok(bars) => {
816                    tracing::info!("Retrieved {} bars", bars.len());
817                    // TODO: Send bars to message bus
818                }
819                Err(e) => {
820                    tracing::error!("Failed to request bars: {e}");
821                }
822            }
823        });
824
825        Ok(())
826    }
827}