nautilus_databento/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a unified data client that combines Databento's live streaming and historical data capabilities.
17//!
18//! This module implements a data client that manages connections to multiple Databento datasets,
19//! handles live market data subscriptions, and provides access to historical data on demand.
20
21use std::{
22    fmt::Debug,
23    path::PathBuf,
24    sync::{
25        Arc, Mutex, RwLock,
26        atomic::{AtomicBool, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34    messages::{
35        DataEvent,
36        data::{
37            RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
38            SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
39            UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
40        },
41    },
42    runner::get_data_event_sender,
43};
44use nautilus_core::{MUTEX_POISONED, time::AtomicTime};
45use nautilus_data::client::DataClient;
46use nautilus_model::{
47    enums::BarAggregation,
48    identifiers::{ClientId, Symbol, Venue},
49    instruments::Instrument,
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    common::Credential,
56    historical::{DatabentoHistoricalClient, RangeQueryParams},
57    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
58    loader::DatabentoDataLoader,
59    symbology::instrument_id_to_symbol_string,
60    types::PublisherId,
61};
62
63/// Configuration for the Databento data client.
64#[derive(Clone)]
65pub struct DatabentoDataClientConfig {
66    /// Databento API credential.
67    credential: Credential,
68    /// Path to publishers.json file.
69    pub publishers_filepath: PathBuf,
70    /// Whether to use exchange as venue for GLBX instruments.
71    pub use_exchange_as_venue: bool,
72    /// Whether to timestamp bars on close.
73    pub bars_timestamp_on_close: bool,
74    /// Reconnection timeout in minutes (None for infinite retries).
75    pub reconnect_timeout_mins: Option<u64>,
76    /// Optional HTTP proxy URL.
77    pub http_proxy_url: Option<String>,
78    /// Optional WebSocket proxy URL.
79    pub ws_proxy_url: Option<String>,
80}
81
82impl Debug for DatabentoDataClientConfig {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        f.debug_struct("DatabentoDataClientConfig")
85            .field("credential", &"<redacted>")
86            .field("publishers_filepath", &self.publishers_filepath)
87            .field("use_exchange_as_venue", &self.use_exchange_as_venue)
88            .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
89            .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
90            .field("http_proxy_url", &self.http_proxy_url)
91            .field("ws_proxy_url", &self.ws_proxy_url)
92            .finish()
93    }
94}
95
96impl DatabentoDataClientConfig {
97    /// Creates a new [`DatabentoDataClientConfig`] instance.
98    #[must_use]
99    pub fn new(
100        api_key: impl Into<String>,
101        publishers_filepath: PathBuf,
102        use_exchange_as_venue: bool,
103        bars_timestamp_on_close: bool,
104    ) -> Self {
105        Self {
106            credential: Credential::new(api_key),
107            publishers_filepath,
108            use_exchange_as_venue,
109            bars_timestamp_on_close,
110            reconnect_timeout_mins: Some(10), // Default: 10 minutes
111            http_proxy_url: None,
112            ws_proxy_url: None,
113        }
114    }
115
116    /// Returns the API key associated with this config.
117    #[must_use]
118    pub fn api_key(&self) -> &str {
119        self.credential.api_key()
120    }
121
122    /// Returns a masked version of the API key for logging purposes.
123    #[must_use]
124    pub fn api_key_masked(&self) -> String {
125        self.credential.api_key_masked()
126    }
127}
128
129/// A Databento data client that combines live streaming and historical data functionality.
130///
131/// This client uses the existing `DatabentoFeedHandler` for live data subscriptions
132/// and `DatabentoHistoricalClient` for historical data requests. It supports multiple
133/// datasets simultaneously, with separate feed handlers per dataset.
134#[cfg_attr(feature = "python", pyo3::pyclass)]
135#[derive(Debug)]
136pub struct DatabentoDataClient {
137    /// Client identifier.
138    client_id: ClientId,
139    /// Client configuration.
140    config: DatabentoDataClientConfig,
141    /// Connection state.
142    is_connected: AtomicBool,
143    /// Historical client for on-demand data requests.
144    historical: DatabentoHistoricalClient,
145    /// Data loader for venue-to-dataset mapping.
146    loader: DatabentoDataLoader,
147    /// Feed handler command senders per dataset.
148    cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
149    /// Task handles for lifecycle management.
150    task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
151    /// Cancellation token for graceful shutdown.
152    cancellation_token: CancellationToken,
153    /// Publisher to venue mapping.
154    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
155    /// Symbol to venue mapping (for caching).
156    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
157    /// Data event sender for forwarding data to the async runner.
158    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
159}
160
161impl DatabentoDataClient {
162    /// Creates a new [`DatabentoDataClient`] instance.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if client creation or publisher configuration loading fails.
167    pub fn new(
168        client_id: ClientId,
169        config: DatabentoDataClientConfig,
170        clock: &'static AtomicTime,
171    ) -> anyhow::Result<Self> {
172        let historical = DatabentoHistoricalClient::new(
173            config.api_key().to_string(),
174            config.publishers_filepath.clone(),
175            clock,
176            config.use_exchange_as_venue,
177        )?;
178
179        // Create data loader for venue-to-dataset mapping
180        let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
181
182        // Load publisher configuration
183        let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
184        let publishers_vec: Vec<crate::types::DatabentoPublisher> =
185            serde_json::from_str(&file_content)?;
186
187        let publisher_venue_map = publishers_vec
188            .into_iter()
189            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
190            .collect::<IndexMap<u16, Venue>>();
191
192        let data_sender = get_data_event_sender();
193
194        Ok(Self {
195            client_id,
196            config,
197            is_connected: AtomicBool::new(false),
198            historical,
199            loader,
200            cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
201            task_handles: Arc::new(Mutex::new(Vec::new())),
202            cancellation_token: CancellationToken::new(),
203            publisher_venue_map: Arc::new(publisher_venue_map),
204            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
205            data_sender,
206        })
207    }
208
209    /// Gets the dataset for a given venue using the data loader.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the venue-to-dataset mapping cannot be found.
214    fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
215        self.loader
216            .get_dataset_for_venue(&venue)
217            .map(ToString::to_string)
218            .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
219    }
220
221    /// Gets or creates a feed handler for the specified dataset.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the feed handler cannot be created.
226    fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
227        let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
228
229        if !channels.contains_key(dataset) {
230            tracing::info!("Creating new feed handler for dataset: {dataset}");
231            let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
232            channels.insert(dataset.to_string(), cmd_tx);
233
234            tracing::debug!("Feed handler created for dataset: {dataset}, channel stored");
235        }
236
237        Ok(())
238    }
239
240    /// Sends a command to a specific dataset's feed handler.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the command cannot be sent.
245    fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
246        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
247        if let Some(tx) = channels.get(dataset) {
248            tx.send(cmd)
249                .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
250        } else {
251            anyhow::bail!("No feed handler found for dataset: {dataset}");
252        }
253        Ok(())
254    }
255
256    /// Initializes the live feed handler for streaming data.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the feed handler cannot be started.
261    fn initialize_live_feed(
262        &self,
263        dataset: String,
264    ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
265        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
266        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
267
268        let mut feed_handler = DatabentoFeedHandler::new(
269            self.config.api_key().to_string(),
270            dataset,
271            cmd_rx,
272            msg_tx,
273            (*self.publisher_venue_map).clone(),
274            self.symbol_venue_map.clone(),
275            self.config.use_exchange_as_venue,
276            self.config.bars_timestamp_on_close,
277            self.config.reconnect_timeout_mins,
278        );
279
280        let cancellation_token = self.cancellation_token.clone();
281
282        // Spawn the feed handler task with cancellation support
283        let feed_handle = tokio::spawn(async move {
284            tokio::select! {
285                result = feed_handler.run() => {
286                    if let Err(e) = result {
287                        tracing::error!("Feed handler error: {e}");
288                    }
289                }
290                () = cancellation_token.cancelled() => {
291                    tracing::debug!("Feed handler cancelled");
292                }
293            }
294        });
295
296        let cancellation_token = self.cancellation_token.clone();
297        let data_sender = self.data_sender.clone();
298
299        // Spawn message processing task with cancellation support
300        let msg_handle = tokio::spawn(async move {
301            let mut msg_rx = msg_rx;
302            loop {
303                tokio::select! {
304                    msg = msg_rx.recv() => {
305                        match msg {
306                            Some(LiveMessage::Data(data)) => {
307                                tracing::debug!("Received data: {data:?}");
308                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
309                                    tracing::error!("Failed to send data event: {e}");
310                                }
311                            }
312                            Some(LiveMessage::Instrument(instrument)) => {
313                                tracing::debug!("Received instrument: {}", instrument.id());
314                                // TODO: Forward to cache or instrument manager
315                            }
316                            Some(LiveMessage::Status(status)) => {
317                                tracing::debug!("Received status: {status:?}");
318                                // TODO: Forward to appropriate handler
319                            }
320                            Some(LiveMessage::Imbalance(imbalance)) => {
321                                tracing::debug!("Received imbalance: {imbalance:?}");
322                                // TODO: Forward to appropriate handler
323                            }
324                            Some(LiveMessage::Statistics(statistics)) => {
325                                tracing::debug!("Received statistics: {statistics:?}");
326                                // TODO: Forward to appropriate handler
327                            }
328                            Some(LiveMessage::Error(error)) => {
329                                tracing::error!("Feed handler error: {error}");
330                                // TODO: Handle error appropriately
331                            }
332                            Some(LiveMessage::Close) => {
333                                tracing::info!("Feed handler closed");
334                                break;
335                            }
336                            None => {
337                                tracing::debug!("Message channel closed");
338                                break;
339                            }
340                        }
341                    }
342                    () = cancellation_token.cancelled() => {
343                        tracing::debug!("Message processing cancelled");
344                        break;
345                    }
346                }
347            }
348        });
349
350        {
351            let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
352            handles.push(feed_handle);
353            handles.push(msg_handle);
354        }
355
356        Ok(cmd_tx)
357    }
358}
359
360#[async_trait::async_trait]
361impl DataClient for DatabentoDataClient {
362    /// Returns the client identifier.
363    fn client_id(&self) -> ClientId {
364        self.client_id
365    }
366
367    /// Returns the venue associated with this client (None for multi-venue clients).
368    fn venue(&self) -> Option<Venue> {
369        None
370    }
371
372    /// Starts the data client.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if the client fails to start.
377    fn start(&mut self) -> anyhow::Result<()> {
378        tracing::debug!("Starting");
379        Ok(())
380    }
381
382    /// Stops the data client and cancels all active subscriptions.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the client fails to stop cleanly.
387    fn stop(&mut self) -> anyhow::Result<()> {
388        tracing::debug!("Stopping");
389
390        // Signal cancellation to all running tasks
391        self.cancellation_token.cancel();
392
393        // Send close command to all active feed handlers
394        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
395        for (dataset, tx) in channels.iter() {
396            if let Err(e) = tx.send(LiveCommand::Close) {
397                tracing::error!("Failed to send close command to dataset {dataset}: {e}");
398            }
399        }
400
401        self.is_connected.store(false, Ordering::Relaxed);
402        Ok(())
403    }
404
405    fn reset(&mut self) -> anyhow::Result<()> {
406        tracing::debug!("Resetting");
407        self.is_connected.store(false, Ordering::Relaxed);
408        Ok(())
409    }
410
411    fn dispose(&mut self) -> anyhow::Result<()> {
412        tracing::debug!("Disposing");
413        self.stop()
414    }
415
416    async fn connect(&mut self) -> anyhow::Result<()> {
417        tracing::debug!("Connecting...");
418
419        // Connection will happen lazily when subscriptions are made
420        // No need to create feed handlers upfront since we don't know which datasets will be needed
421        self.is_connected.store(true, Ordering::Relaxed);
422
423        tracing::info!("Connected");
424        Ok(())
425    }
426
427    async fn disconnect(&mut self) -> anyhow::Result<()> {
428        tracing::debug!("Disconnecting...");
429
430        // Signal cancellation to all running tasks
431        self.cancellation_token.cancel();
432
433        // Send close command to all active feed handlers
434        {
435            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
436            for (dataset, tx) in channels.iter() {
437                if let Err(e) = tx.send(LiveCommand::Close) {
438                    tracing::error!("Failed to send close command to dataset {dataset}: {e}");
439                }
440            }
441        }
442
443        // Wait for all spawned tasks to complete
444        let handles = {
445            let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
446            std::mem::take(&mut *task_handles)
447        };
448
449        for handle in handles {
450            if let Err(e) = handle.await
451                && !e.is_cancelled()
452            {
453                tracing::error!("Task join error: {e}");
454            }
455        }
456
457        self.is_connected.store(false, Ordering::Relaxed);
458
459        {
460            let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
461            channels.clear();
462        }
463
464        tracing::info!("Disconnected");
465        Ok(())
466    }
467
468    /// Returns whether the client is currently connected.
469    fn is_connected(&self) -> bool {
470        self.is_connected.load(Ordering::Relaxed)
471    }
472
473    fn is_disconnected(&self) -> bool {
474        !self.is_connected()
475    }
476
477    /// Subscribes to quote tick data for the specified instruments.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if the subscription request fails.
482    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
483        tracing::debug!("Subscribe quotes: {cmd:?}");
484
485        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
486        let was_new_handler = {
487            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
488            !channels.contains_key(&dataset)
489        };
490
491        self.get_or_create_feed_handler(&dataset)?;
492
493        // Start the feed handler if it was newly created
494        if was_new_handler {
495            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
496        }
497
498        let symbol = instrument_id_to_symbol_string(
499            cmd.instrument_id,
500            &mut self.symbol_venue_map.write().unwrap(),
501        );
502
503        let subscription = Subscription::builder()
504            .schema(databento::dbn::Schema::Mbp1) // Market by price level 1 for quotes
505            .symbols(symbol)
506            .build();
507
508        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
509
510        Ok(())
511    }
512
513    /// Subscribes to trade tick data for the specified instruments.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the subscription request fails.
518    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
519        tracing::debug!("Subscribe trades: {cmd:?}");
520
521        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
522        let was_new_handler = {
523            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
524            !channels.contains_key(&dataset)
525        };
526
527        self.get_or_create_feed_handler(&dataset)?;
528
529        // Start the feed handler if it was newly created
530        if was_new_handler {
531            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
532        }
533
534        let symbol = instrument_id_to_symbol_string(
535            cmd.instrument_id,
536            &mut self.symbol_venue_map.write().unwrap(),
537        );
538
539        let subscription = Subscription::builder()
540            .schema(databento::dbn::Schema::Trades)
541            .symbols(symbol)
542            .build();
543
544        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
545
546        Ok(())
547    }
548
549    /// Subscribes to order book delta updates for the specified instruments.
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if the subscription request fails.
554    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
555        tracing::debug!("Subscribe book deltas: {cmd:?}");
556
557        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
558        let was_new_handler = {
559            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
560            !channels.contains_key(&dataset)
561        };
562
563        self.get_or_create_feed_handler(&dataset)?;
564
565        // Start the feed handler if it was newly created
566        if was_new_handler {
567            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
568        }
569
570        let symbol = instrument_id_to_symbol_string(
571            cmd.instrument_id,
572            &mut self.symbol_venue_map.write().unwrap(),
573        );
574
575        let subscription = Subscription::builder()
576            .schema(databento::dbn::Schema::Mbo) // Market by order for book deltas
577            .symbols(symbol)
578            .build();
579
580        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
581
582        Ok(())
583    }
584
585    /// Subscribes to instrument status updates for the specified instruments.
586    ///
587    /// # Errors
588    ///
589    /// Returns an error if the subscription request fails.
590    fn subscribe_instrument_status(
591        &mut self,
592        cmd: &SubscribeInstrumentStatus,
593    ) -> anyhow::Result<()> {
594        tracing::debug!("Subscribe instrument status: {cmd:?}");
595
596        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
597        let was_new_handler = {
598            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
599            !channels.contains_key(&dataset)
600        };
601
602        self.get_or_create_feed_handler(&dataset)?;
603
604        // Start the feed handler if it was newly created
605        if was_new_handler {
606            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
607        }
608
609        let symbol = instrument_id_to_symbol_string(
610            cmd.instrument_id,
611            &mut self.symbol_venue_map.write().unwrap(),
612        );
613
614        let subscription = Subscription::builder()
615            .schema(databento::dbn::Schema::Status)
616            .symbols(symbol)
617            .build();
618
619        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
620
621        Ok(())
622    }
623
624    // Unsubscribe methods
625    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
626        tracing::debug!("Unsubscribe quotes: {cmd:?}");
627
628        // Note: Databento live API doesn't support granular unsubscribing.
629        // The feed handler manages subscriptions and can handle reconnections
630        // with the appropriate subscription state.
631        tracing::warn!(
632            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
633            cmd.instrument_id
634        );
635
636        Ok(())
637    }
638
639    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
640        tracing::debug!("Unsubscribe trades: {cmd:?}");
641
642        // Note: Databento live API doesn't support granular unsubscribing.
643        // The feed handler manages subscriptions and can handle reconnections
644        // with the appropriate subscription state.
645        tracing::warn!(
646            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
647            cmd.instrument_id
648        );
649
650        Ok(())
651    }
652
653    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
654        tracing::debug!("Unsubscribe book deltas: {cmd:?}");
655
656        // Note: Databento live API doesn't support granular unsubscribing.
657        // The feed handler manages subscriptions and can handle reconnections
658        // with the appropriate subscription state.
659        tracing::warn!(
660            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
661            cmd.instrument_id
662        );
663
664        Ok(())
665    }
666
667    fn unsubscribe_instrument_status(
668        &mut self,
669        cmd: &UnsubscribeInstrumentStatus,
670    ) -> anyhow::Result<()> {
671        tracing::debug!("Unsubscribe instrument status: {cmd:?}");
672
673        // Note: Databento live API doesn't support granular unsubscribing.
674        // The feed handler manages subscriptions and can handle reconnections
675        // with the appropriate subscription state.
676        tracing::warn!(
677            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
678            cmd.instrument_id
679        );
680
681        Ok(())
682    }
683
684    // Historical data request methods using the historical client
685    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
686        tracing::debug!("Request instruments: {request:?}");
687
688        let historical_client = self.historical.clone();
689        let request = request.clone();
690
691        tokio::spawn(async move {
692            // Convert request to historical query parameters
693            // For now, use a default symbol set or derive from venue
694            let symbols = vec!["ALL_SYMBOLS".to_string()]; // TODO: Improve symbol handling
695
696            let params = RangeQueryParams {
697                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
698                symbols,
699                start: request
700                    .start
701                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
702                    .into(),
703                end: request
704                    .end
705                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
706                    .map(Into::into),
707                limit: None,
708                price_precision: None,
709            };
710
711            match historical_client.get_range_instruments(params).await {
712                Ok(instruments) => {
713                    tracing::info!("Retrieved {} instruments", instruments.len());
714                    // TODO: Send instruments to message bus or cache
715                }
716                Err(e) => {
717                    tracing::error!("Failed to request instruments: {e}");
718                }
719            }
720        });
721
722        Ok(())
723    }
724
725    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
726        tracing::debug!("Request quotes: {request:?}");
727
728        let historical_client = self.historical.clone();
729        let request = request.clone();
730
731        tokio::spawn(async move {
732            let symbols = vec![instrument_id_to_symbol_string(
733                request.instrument_id,
734                &mut AHashMap::new(), // TODO: Use proper symbol map
735            )];
736
737            let params = RangeQueryParams {
738                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
739                symbols,
740                start: request
741                    .start
742                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
743                    .into(),
744                end: request
745                    .end
746                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
747                    .map(Into::into),
748                limit: request.limit.map(|l| l.get() as u64),
749                price_precision: None,
750            };
751
752            match historical_client.get_range_quotes(params, None).await {
753                Ok(quotes) => {
754                    tracing::info!("Retrieved {} quotes", quotes.len());
755                    // TODO: Send quotes to message bus
756                }
757                Err(e) => {
758                    tracing::error!("Failed to request quotes: {e}");
759                }
760            }
761        });
762
763        Ok(())
764    }
765
766    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
767        tracing::debug!("Request trades: {request:?}");
768
769        let historical_client = self.historical.clone();
770        let request = request.clone();
771
772        tokio::spawn(async move {
773            let symbols = vec![instrument_id_to_symbol_string(
774                request.instrument_id,
775                &mut AHashMap::new(), // TODO: Use proper symbol map
776            )];
777
778            let params = RangeQueryParams {
779                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
780                symbols,
781                start: request
782                    .start
783                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
784                    .into(),
785                end: request
786                    .end
787                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
788                    .map(Into::into),
789                limit: request.limit.map(|l| l.get() as u64),
790                price_precision: None,
791            };
792
793            match historical_client.get_range_trades(params).await {
794                Ok(trades) => {
795                    tracing::info!("Retrieved {} trades", trades.len());
796                    // TODO: Send trades to message bus
797                }
798                Err(e) => {
799                    tracing::error!("Failed to request trades: {e}");
800                }
801            }
802        });
803
804        Ok(())
805    }
806
807    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
808        tracing::debug!("Request bars: {request:?}");
809
810        let historical_client = self.historical.clone();
811        let request = request.clone();
812
813        tokio::spawn(async move {
814            let symbols = vec![instrument_id_to_symbol_string(
815                request.bar_type.instrument_id(),
816                &mut AHashMap::new(), // TODO: Use proper symbol map
817            )];
818
819            let params = RangeQueryParams {
820                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
821                symbols,
822                start: request
823                    .start
824                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
825                    .into(),
826                end: request
827                    .end
828                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
829                    .map(Into::into),
830                limit: request.limit.map(|l| l.get() as u64),
831                price_precision: None,
832            };
833
834            // Map bar aggregation from the request
835            let aggregation = match request.bar_type.spec().aggregation {
836                BarAggregation::Second => BarAggregation::Second,
837                BarAggregation::Minute => BarAggregation::Minute,
838                BarAggregation::Hour => BarAggregation::Hour,
839                BarAggregation::Day => BarAggregation::Day,
840                _ => {
841                    tracing::error!(
842                        "Unsupported bar aggregation: {:?}",
843                        request.bar_type.spec().aggregation
844                    );
845                    return;
846                }
847            };
848
849            match historical_client
850                .get_range_bars(params, aggregation, true)
851                .await
852            {
853                Ok(bars) => {
854                    tracing::info!("Retrieved {} bars", bars.len());
855                    // TODO: Send bars to message bus
856                }
857                Err(e) => {
858                    tracing::error!("Failed to request bars: {e}");
859                }
860            }
861        });
862
863        Ok(())
864    }
865}