nautilus_databento/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a unified data client that combines Databento's live streaming and historical data capabilities.
17//!
18//! This module implements a data client that manages connections to multiple Databento datasets,
19//! handles live market data subscriptions, and provides access to historical data on demand.
20
21use std::{
22    fmt::Debug,
23    path::PathBuf,
24    sync::{
25        Arc, Mutex, RwLock,
26        atomic::{AtomicBool, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34    live::{runner::get_data_event_sender, runtime::get_runtime},
35    messages::{
36        DataEvent,
37        data::{
38            RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
39            SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
40            UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43};
44use nautilus_core::{MUTEX_POISONED, time::AtomicTime};
45use nautilus_data::client::DataClient;
46use nautilus_model::{
47    enums::BarAggregation,
48    identifiers::{ClientId, Symbol, Venue},
49    instruments::Instrument,
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    common::Credential,
56    historical::{DatabentoHistoricalClient, RangeQueryParams},
57    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
58    loader::DatabentoDataLoader,
59    symbology::instrument_id_to_symbol_string,
60    types::PublisherId,
61};
62
63/// Configuration for the Databento data client.
64#[derive(Clone)]
65pub struct DatabentoDataClientConfig {
66    /// Databento API credential.
67    credential: Credential,
68    /// Path to publishers.json file.
69    pub publishers_filepath: PathBuf,
70    /// Whether to use exchange as venue for GLBX instruments.
71    pub use_exchange_as_venue: bool,
72    /// Whether to timestamp bars on close.
73    pub bars_timestamp_on_close: bool,
74    /// Reconnection timeout in minutes (None for infinite retries).
75    pub reconnect_timeout_mins: Option<u64>,
76    /// Optional HTTP proxy URL.
77    pub http_proxy_url: Option<String>,
78    /// Optional WebSocket proxy URL.
79    pub ws_proxy_url: Option<String>,
80}
81
82impl Debug for DatabentoDataClientConfig {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        f.debug_struct("DatabentoDataClientConfig")
85            .field("credential", &"<redacted>")
86            .field("publishers_filepath", &self.publishers_filepath)
87            .field("use_exchange_as_venue", &self.use_exchange_as_venue)
88            .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
89            .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
90            .field("http_proxy_url", &self.http_proxy_url)
91            .field("ws_proxy_url", &self.ws_proxy_url)
92            .finish()
93    }
94}
95
96impl DatabentoDataClientConfig {
97    /// Creates a new [`DatabentoDataClientConfig`] instance.
98    #[must_use]
99    pub fn new(
100        api_key: impl Into<String>,
101        publishers_filepath: PathBuf,
102        use_exchange_as_venue: bool,
103        bars_timestamp_on_close: bool,
104    ) -> Self {
105        Self {
106            credential: Credential::new(api_key),
107            publishers_filepath,
108            use_exchange_as_venue,
109            bars_timestamp_on_close,
110            reconnect_timeout_mins: Some(10), // Default: 10 minutes
111            http_proxy_url: None,
112            ws_proxy_url: None,
113        }
114    }
115
116    /// Returns the API key associated with this config.
117    #[must_use]
118    pub fn api_key(&self) -> &str {
119        self.credential.api_key()
120    }
121
122    /// Returns a masked version of the API key for logging purposes.
123    #[must_use]
124    pub fn api_key_masked(&self) -> String {
125        self.credential.api_key_masked()
126    }
127}
128
129/// A Databento data client that combines live streaming and historical data functionality.
130///
131/// This client uses the existing `DatabentoFeedHandler` for live data subscriptions
132/// and `DatabentoHistoricalClient` for historical data requests. It supports multiple
133/// datasets simultaneously, with separate feed handlers per dataset.
134#[cfg_attr(feature = "python", pyo3::pyclass)]
135#[derive(Debug)]
136pub struct DatabentoDataClient {
137    /// Client identifier.
138    client_id: ClientId,
139    /// Client configuration.
140    config: DatabentoDataClientConfig,
141    /// Connection state.
142    is_connected: AtomicBool,
143    /// Historical client for on-demand data requests.
144    historical: DatabentoHistoricalClient,
145    /// Data loader for venue-to-dataset mapping.
146    loader: DatabentoDataLoader,
147    /// Feed handler command senders per dataset.
148    cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
149    /// Task handles for lifecycle management.
150    task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
151    /// Cancellation token for graceful shutdown.
152    cancellation_token: CancellationToken,
153    /// Publisher to venue mapping.
154    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
155    /// Symbol to venue mapping (for caching).
156    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
157    /// Data event sender for forwarding data to the async runner.
158    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
159}
160
161impl DatabentoDataClient {
162    /// Creates a new [`DatabentoDataClient`] instance.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if client creation or publisher configuration loading fails.
167    pub fn new(
168        client_id: ClientId,
169        config: DatabentoDataClientConfig,
170        clock: &'static AtomicTime,
171    ) -> anyhow::Result<Self> {
172        let historical = DatabentoHistoricalClient::new(
173            config.api_key().to_string(),
174            config.publishers_filepath.clone(),
175            clock,
176            config.use_exchange_as_venue,
177        )?;
178
179        // Create data loader for venue-to-dataset mapping
180        let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
181
182        // Load publisher configuration
183        let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
184        let publishers_vec: Vec<crate::types::DatabentoPublisher> =
185            serde_json::from_str(&file_content)?;
186
187        let publisher_venue_map = publishers_vec
188            .into_iter()
189            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
190            .collect::<IndexMap<u16, Venue>>();
191
192        let data_sender = get_data_event_sender();
193
194        Ok(Self {
195            client_id,
196            config,
197            is_connected: AtomicBool::new(false),
198            historical,
199            loader,
200            cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
201            task_handles: Arc::new(Mutex::new(Vec::new())),
202            cancellation_token: CancellationToken::new(),
203            publisher_venue_map: Arc::new(publisher_venue_map),
204            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
205            data_sender,
206        })
207    }
208
209    /// Gets the dataset for a given venue using the data loader.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the venue-to-dataset mapping cannot be found.
214    fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
215        self.loader
216            .get_dataset_for_venue(&venue)
217            .map(ToString::to_string)
218            .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
219    }
220
221    /// Gets or creates a feed handler for the specified dataset.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the feed handler cannot be created.
226    fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
227        let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
228
229        if !channels.contains_key(dataset) {
230            tracing::info!("Creating new feed handler for dataset: {dataset}");
231            let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
232            channels.insert(dataset.to_string(), cmd_tx);
233
234            tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
235        }
236
237        Ok(())
238    }
239
240    /// Sends a command to a specific dataset's feed handler.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the command cannot be sent.
245    fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
246        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
247        if let Some(tx) = channels.get(dataset) {
248            tx.send(cmd)
249                .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
250        } else {
251            anyhow::bail!("No feed handler found for dataset: {dataset}");
252        }
253        Ok(())
254    }
255
256    /// Initializes the live feed handler for streaming data.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the feed handler cannot be started.
261    fn initialize_live_feed(
262        &self,
263        dataset: String,
264    ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
265        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
266        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
267
268        let mut feed_handler = DatabentoFeedHandler::new(
269            self.config.api_key().to_string(),
270            dataset,
271            cmd_rx,
272            msg_tx,
273            (*self.publisher_venue_map).clone(),
274            self.symbol_venue_map.clone(),
275            self.config.use_exchange_as_venue,
276            self.config.bars_timestamp_on_close,
277            self.config.reconnect_timeout_mins,
278        );
279
280        let cancellation_token = self.cancellation_token.clone();
281
282        // Spawn the feed handler task with cancellation support
283        let feed_handle = get_runtime().spawn(async move {
284            tokio::select! {
285                result = feed_handler.run() => {
286                    if let Err(e) = result {
287                        tracing::error!("Feed handler error: {e}");
288                    }
289                }
290                () = cancellation_token.cancelled() => {
291                    tracing::debug!("Feed handler cancelled");
292                }
293            }
294        });
295
296        let cancellation_token = self.cancellation_token.clone();
297        let data_sender = self.data_sender.clone();
298
299        // Spawn message processing task with cancellation support
300        let msg_handle = get_runtime().spawn(async move {
301            let mut msg_rx = msg_rx;
302            loop {
303                tokio::select! {
304                    msg = msg_rx.recv() => {
305                        match msg {
306                            Some(LiveMessage::Data(data)) => {
307                                tracing::debug!("Received data: {data:?}");
308                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
309                                    tracing::error!("Failed to send data event: {e}");
310                                }
311                            }
312                            Some(LiveMessage::Instrument(instrument)) => {
313                                tracing::debug!("Received instrument: {}", instrument.id());
314                                // TODO: Forward to cache or instrument manager
315                            }
316                            Some(LiveMessage::Status(status)) => {
317                                tracing::debug!("Received status: {status:?}");
318                                // TODO: Forward to appropriate handler
319                            }
320                            Some(LiveMessage::Imbalance(imbalance)) => {
321                                tracing::debug!("Received imbalance: {imbalance:?}");
322                                // TODO: Forward to appropriate handler
323                            }
324                            Some(LiveMessage::Statistics(statistics)) => {
325                                tracing::debug!("Received statistics: {statistics:?}");
326                                // TODO: Forward to appropriate handler
327                            }
328                            Some(LiveMessage::SubscriptionAck(ack)) => {
329                                tracing::debug!("Received subscription ack: {}", ack.message);
330                            }
331                            Some(LiveMessage::Error(error)) => {
332                                tracing::error!("Feed handler error: {error}");
333                                // TODO: Handle error appropriately
334                            }
335                            Some(LiveMessage::Close) => {
336                                tracing::info!("Feed handler closed");
337                                break;
338                            }
339                            None => {
340                                tracing::debug!("Message channel closed");
341                                break;
342                            }
343                        }
344                    }
345                    () = cancellation_token.cancelled() => {
346                        tracing::debug!("Message processing cancelled");
347                        break;
348                    }
349                }
350            }
351        });
352
353        {
354            let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
355            handles.push(feed_handle);
356            handles.push(msg_handle);
357        }
358
359        Ok(cmd_tx)
360    }
361}
362
363#[async_trait::async_trait(?Send)]
364impl DataClient for DatabentoDataClient {
365    /// Returns the client identifier.
366    fn client_id(&self) -> ClientId {
367        self.client_id
368    }
369
370    /// Returns the venue associated with this client (None for multi-venue clients).
371    fn venue(&self) -> Option<Venue> {
372        None
373    }
374
375    /// Starts the data client.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the client fails to start.
380    fn start(&mut self) -> anyhow::Result<()> {
381        tracing::debug!("Starting");
382        Ok(())
383    }
384
385    /// Stops the data client and cancels all active subscriptions.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the client fails to stop cleanly.
390    fn stop(&mut self) -> anyhow::Result<()> {
391        tracing::debug!("Stopping");
392
393        // Signal cancellation to all running tasks
394        self.cancellation_token.cancel();
395
396        // Send close command to all active feed handlers
397        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
398        for (dataset, tx) in channels.iter() {
399            if let Err(e) = tx.send(LiveCommand::Close) {
400                tracing::error!("Failed to send close command to dataset {dataset}: {e}");
401            }
402        }
403
404        self.is_connected.store(false, Ordering::Relaxed);
405        Ok(())
406    }
407
408    fn reset(&mut self) -> anyhow::Result<()> {
409        tracing::debug!("Resetting");
410        self.is_connected.store(false, Ordering::Relaxed);
411        Ok(())
412    }
413
414    fn dispose(&mut self) -> anyhow::Result<()> {
415        tracing::debug!("Disposing");
416        self.stop()
417    }
418
419    async fn connect(&mut self) -> anyhow::Result<()> {
420        tracing::debug!("Connecting...");
421
422        // Connection will happen lazily when subscriptions are made
423        // No need to create feed handlers upfront since we don't know which datasets will be needed
424        self.is_connected.store(true, Ordering::Relaxed);
425
426        tracing::info!("Connected");
427        Ok(())
428    }
429
430    async fn disconnect(&mut self) -> anyhow::Result<()> {
431        tracing::debug!("Disconnecting...");
432
433        // Signal cancellation to all running tasks
434        self.cancellation_token.cancel();
435
436        // Send close command to all active feed handlers
437        {
438            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
439            for (dataset, tx) in channels.iter() {
440                if let Err(e) = tx.send(LiveCommand::Close) {
441                    tracing::error!("Failed to send close command to dataset {dataset}: {e}");
442                }
443            }
444        }
445
446        // Wait for all spawned tasks to complete
447        let handles = {
448            let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
449            std::mem::take(&mut *task_handles)
450        };
451
452        for handle in handles {
453            if let Err(e) = handle.await
454                && !e.is_cancelled()
455            {
456                tracing::error!("Task join error: {e}");
457            }
458        }
459
460        self.is_connected.store(false, Ordering::Relaxed);
461
462        {
463            let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
464            channels.clear();
465        }
466
467        tracing::info!("Disconnected");
468        Ok(())
469    }
470
471    /// Returns whether the client is currently connected.
472    fn is_connected(&self) -> bool {
473        self.is_connected.load(Ordering::Relaxed)
474    }
475
476    fn is_disconnected(&self) -> bool {
477        !self.is_connected()
478    }
479
480    /// Subscribes to quote tick data for the specified instruments.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the subscription request fails.
485    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
486        tracing::debug!("Subscribe quotes: {cmd:?}");
487
488        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
489        let was_new_handler = {
490            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
491            !channels.contains_key(&dataset)
492        };
493
494        self.get_or_create_feed_handler(&dataset)?;
495
496        // Start the feed handler if it was newly created
497        if was_new_handler {
498            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
499        }
500
501        let symbol = instrument_id_to_symbol_string(
502            cmd.instrument_id,
503            &mut self.symbol_venue_map.write().unwrap(),
504        );
505
506        let subscription = Subscription::builder()
507            .schema(databento::dbn::Schema::Mbp1) // Market by price level 1 for quotes
508            .symbols(symbol)
509            .build();
510
511        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
512
513        Ok(())
514    }
515
516    /// Subscribes to trade tick data for the specified instruments.
517    ///
518    /// # Errors
519    ///
520    /// Returns an error if the subscription request fails.
521    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
522        tracing::debug!("Subscribe trades: {cmd:?}");
523
524        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
525        let was_new_handler = {
526            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
527            !channels.contains_key(&dataset)
528        };
529
530        self.get_or_create_feed_handler(&dataset)?;
531
532        // Start the feed handler if it was newly created
533        if was_new_handler {
534            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
535        }
536
537        let symbol = instrument_id_to_symbol_string(
538            cmd.instrument_id,
539            &mut self.symbol_venue_map.write().unwrap(),
540        );
541
542        let subscription = Subscription::builder()
543            .schema(databento::dbn::Schema::Trades)
544            .symbols(symbol)
545            .build();
546
547        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
548
549        Ok(())
550    }
551
552    /// Subscribes to order book delta updates for the specified instruments.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if the subscription request fails.
557    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
558        tracing::debug!("Subscribe book deltas: {cmd:?}");
559
560        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
561        let was_new_handler = {
562            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
563            !channels.contains_key(&dataset)
564        };
565
566        self.get_or_create_feed_handler(&dataset)?;
567
568        // Start the feed handler if it was newly created
569        if was_new_handler {
570            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
571        }
572
573        let symbol = instrument_id_to_symbol_string(
574            cmd.instrument_id,
575            &mut self.symbol_venue_map.write().unwrap(),
576        );
577
578        let subscription = Subscription::builder()
579            .schema(databento::dbn::Schema::Mbo) // Market by order for book deltas
580            .symbols(symbol)
581            .build();
582
583        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
584
585        Ok(())
586    }
587
588    /// Subscribes to instrument status updates for the specified instruments.
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if the subscription request fails.
593    fn subscribe_instrument_status(
594        &mut self,
595        cmd: &SubscribeInstrumentStatus,
596    ) -> anyhow::Result<()> {
597        tracing::debug!("Subscribe instrument status: {cmd:?}");
598
599        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
600        let was_new_handler = {
601            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
602            !channels.contains_key(&dataset)
603        };
604
605        self.get_or_create_feed_handler(&dataset)?;
606
607        // Start the feed handler if it was newly created
608        if was_new_handler {
609            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
610        }
611
612        let symbol = instrument_id_to_symbol_string(
613            cmd.instrument_id,
614            &mut self.symbol_venue_map.write().unwrap(),
615        );
616
617        let subscription = Subscription::builder()
618            .schema(databento::dbn::Schema::Status)
619            .symbols(symbol)
620            .build();
621
622        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
623
624        Ok(())
625    }
626
627    // Unsubscribe methods
628    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
629        tracing::debug!("Unsubscribe quotes: {cmd:?}");
630
631        // Note: Databento live API doesn't support granular unsubscribing.
632        // The feed handler manages subscriptions and can handle reconnections
633        // with the appropriate subscription state.
634        tracing::warn!(
635            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
636            cmd.instrument_id
637        );
638
639        Ok(())
640    }
641
642    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
643        tracing::debug!("Unsubscribe trades: {cmd:?}");
644
645        // Note: Databento live API doesn't support granular unsubscribing.
646        // The feed handler manages subscriptions and can handle reconnections
647        // with the appropriate subscription state.
648        tracing::warn!(
649            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
650            cmd.instrument_id
651        );
652
653        Ok(())
654    }
655
656    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
657        tracing::debug!("Unsubscribe book deltas: {cmd:?}");
658
659        // Note: Databento live API doesn't support granular unsubscribing.
660        // The feed handler manages subscriptions and can handle reconnections
661        // with the appropriate subscription state.
662        tracing::warn!(
663            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
664            cmd.instrument_id
665        );
666
667        Ok(())
668    }
669
670    fn unsubscribe_instrument_status(
671        &mut self,
672        cmd: &UnsubscribeInstrumentStatus,
673    ) -> anyhow::Result<()> {
674        tracing::debug!("Unsubscribe instrument status: {cmd:?}");
675
676        // Note: Databento live API doesn't support granular unsubscribing.
677        // The feed handler manages subscriptions and can handle reconnections
678        // with the appropriate subscription state.
679        tracing::warn!(
680            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
681            cmd.instrument_id
682        );
683
684        Ok(())
685    }
686
687    // Historical data request methods using the historical client
688    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
689        tracing::debug!("Request instruments: {request:?}");
690
691        let historical_client = self.historical.clone();
692        let request = request.clone();
693
694        get_runtime().spawn(async move {
695            // Convert request to historical query parameters
696            // For now, use a default symbol set or derive from venue
697            let symbols = vec!["ALL_SYMBOLS".to_string()]; // TODO: Improve symbol handling
698
699            let params = RangeQueryParams {
700                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
701                symbols,
702                start: request
703                    .start
704                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
705                    .into(),
706                end: request
707                    .end
708                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
709                    .map(Into::into),
710                limit: None,
711                price_precision: None,
712            };
713
714            match historical_client.get_range_instruments(params).await {
715                Ok(instruments) => {
716                    tracing::info!("Retrieved {} instruments", instruments.len());
717                    // TODO: Send instruments to message bus or cache
718                }
719                Err(e) => {
720                    tracing::error!("Failed to request instruments: {e}");
721                }
722            }
723        });
724
725        Ok(())
726    }
727
728    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
729        tracing::debug!("Request quotes: {request:?}");
730
731        let historical_client = self.historical.clone();
732        let request = request.clone();
733
734        get_runtime().spawn(async move {
735            let symbols = vec![instrument_id_to_symbol_string(
736                request.instrument_id,
737                &mut AHashMap::new(), // TODO: Use proper symbol map
738            )];
739
740            let params = RangeQueryParams {
741                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
742                symbols,
743                start: request
744                    .start
745                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
746                    .into(),
747                end: request
748                    .end
749                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
750                    .map(Into::into),
751                limit: request.limit.map(|l| l.get() as u64),
752                price_precision: None,
753            };
754
755            match historical_client.get_range_quotes(params, None).await {
756                Ok(quotes) => {
757                    tracing::info!("Retrieved {} quotes", quotes.len());
758                    // TODO: Send quotes to message bus
759                }
760                Err(e) => {
761                    tracing::error!("Failed to request quotes: {e}");
762                }
763            }
764        });
765
766        Ok(())
767    }
768
769    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
770        tracing::debug!("Request trades: {request:?}");
771
772        let historical_client = self.historical.clone();
773        let request = request.clone();
774
775        get_runtime().spawn(async move {
776            let symbols = vec![instrument_id_to_symbol_string(
777                request.instrument_id,
778                &mut AHashMap::new(), // TODO: Use proper symbol map
779            )];
780
781            let params = RangeQueryParams {
782                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
783                symbols,
784                start: request
785                    .start
786                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
787                    .into(),
788                end: request
789                    .end
790                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
791                    .map(Into::into),
792                limit: request.limit.map(|l| l.get() as u64),
793                price_precision: None,
794            };
795
796            match historical_client.get_range_trades(params).await {
797                Ok(trades) => {
798                    tracing::info!("Retrieved {} trades", trades.len());
799                    // TODO: Send trades to message bus
800                }
801                Err(e) => {
802                    tracing::error!("Failed to request trades: {e}");
803                }
804            }
805        });
806
807        Ok(())
808    }
809
810    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
811        tracing::debug!("Request bars: {request:?}");
812
813        let historical_client = self.historical.clone();
814        let request = request.clone();
815
816        get_runtime().spawn(async move {
817            let symbols = vec![instrument_id_to_symbol_string(
818                request.bar_type.instrument_id(),
819                &mut AHashMap::new(), // TODO: Use proper symbol map
820            )];
821
822            let params = RangeQueryParams {
823                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
824                symbols,
825                start: request
826                    .start
827                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
828                    .into(),
829                end: request
830                    .end
831                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
832                    .map(Into::into),
833                limit: request.limit.map(|l| l.get() as u64),
834                price_precision: None,
835            };
836
837            // Map bar aggregation from the request
838            let aggregation = match request.bar_type.spec().aggregation {
839                BarAggregation::Second => BarAggregation::Second,
840                BarAggregation::Minute => BarAggregation::Minute,
841                BarAggregation::Hour => BarAggregation::Hour,
842                BarAggregation::Day => BarAggregation::Day,
843                _ => {
844                    tracing::error!(
845                        "Unsupported bar aggregation: {:?}",
846                        request.bar_type.spec().aggregation
847                    );
848                    return;
849                }
850            };
851
852            match historical_client
853                .get_range_bars(params, aggregation, true)
854                .await
855            {
856                Ok(bars) => {
857                    tracing::info!("Retrieved {} bars", bars.len());
858                    // TODO: Send bars to message bus
859                }
860                Err(e) => {
861                    tracing::error!("Failed to request bars: {e}");
862                }
863            }
864        });
865
866        Ok(())
867    }
868}