Skip to main content

nautilus_databento/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a unified data client that combines Databento's live streaming and historical data capabilities.
17//!
18//! This module implements a data client that manages connections to multiple Databento datasets,
19//! handles live market data subscriptions, and provides access to historical data on demand.
20
21use std::{
22    fmt::Debug,
23    path::PathBuf,
24    sync::{
25        Arc, Mutex, RwLock,
26        atomic::{AtomicBool, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34    clients::DataClient,
35    live::{runner::get_data_event_sender, runtime::get_runtime},
36    messages::{
37        DataEvent,
38        data::{
39            RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
40            SubscribeInstrument, SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades,
41            UnsubscribeBookDeltas, UnsubscribeInstrumentStatus, UnsubscribeQuotes,
42            UnsubscribeTrades,
43        },
44    },
45};
46use nautilus_core::{MUTEX_POISONED, time::AtomicTime};
47use nautilus_model::{
48    enums::BarAggregation,
49    identifiers::{ClientId, Symbol, Venue},
50    instruments::Instrument,
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56    common::Credential,
57    historical::{DatabentoHistoricalClient, RangeQueryParams},
58    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
59    loader::DatabentoDataLoader,
60    symbology::instrument_id_to_symbol_string,
61    types::PublisherId,
62};
63
64/// Configuration for the Databento data client.
65#[derive(Clone)]
66pub struct DatabentoDataClientConfig {
67    /// Databento API credential.
68    credential: Credential,
69    /// Path to publishers.json file.
70    pub publishers_filepath: PathBuf,
71    /// Whether to use exchange as venue for GLBX instruments.
72    pub use_exchange_as_venue: bool,
73    /// Whether to timestamp bars on close.
74    pub bars_timestamp_on_close: bool,
75    /// Reconnection timeout in minutes (None for infinite retries).
76    pub reconnect_timeout_mins: Option<u64>,
77    /// Optional HTTP proxy URL.
78    pub http_proxy_url: Option<String>,
79    /// Optional WebSocket proxy URL.
80    pub ws_proxy_url: Option<String>,
81}
82
83impl Debug for DatabentoDataClientConfig {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct(stringify!(DatabentoDataClientConfig))
86            .field("credential", &"<redacted>")
87            .field("publishers_filepath", &self.publishers_filepath)
88            .field("use_exchange_as_venue", &self.use_exchange_as_venue)
89            .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
90            .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
91            .field("http_proxy_url", &self.http_proxy_url)
92            .field("ws_proxy_url", &self.ws_proxy_url)
93            .finish()
94    }
95}
96
97impl DatabentoDataClientConfig {
98    /// Creates a new [`DatabentoDataClientConfig`] instance.
99    #[must_use]
100    pub fn new(
101        api_key: impl Into<String>,
102        publishers_filepath: PathBuf,
103        use_exchange_as_venue: bool,
104        bars_timestamp_on_close: bool,
105    ) -> Self {
106        Self {
107            credential: Credential::new(api_key),
108            publishers_filepath,
109            use_exchange_as_venue,
110            bars_timestamp_on_close,
111            reconnect_timeout_mins: Some(10), // Default: 10 minutes
112            http_proxy_url: None,
113            ws_proxy_url: None,
114        }
115    }
116
117    /// Returns the API key associated with this config.
118    #[must_use]
119    pub fn api_key(&self) -> &str {
120        self.credential.api_key()
121    }
122
123    /// Returns a masked version of the API key for logging purposes.
124    #[must_use]
125    pub fn api_key_masked(&self) -> String {
126        self.credential.api_key_masked()
127    }
128}
129
130/// A Databento data client that combines live streaming and historical data functionality.
131///
132/// This client uses the existing `DatabentoFeedHandler` for live data subscriptions
133/// and `DatabentoHistoricalClient` for historical data requests. It supports multiple
134/// datasets simultaneously, with separate feed handlers per dataset.
135#[cfg_attr(feature = "python", pyo3::pyclass)]
136#[derive(Debug)]
137pub struct DatabentoDataClient {
138    /// Client identifier.
139    client_id: ClientId,
140    /// Client configuration.
141    config: DatabentoDataClientConfig,
142    /// Connection state.
143    is_connected: AtomicBool,
144    /// Historical client for on-demand data requests.
145    historical: DatabentoHistoricalClient,
146    /// Data loader for venue-to-dataset mapping.
147    loader: DatabentoDataLoader,
148    /// Feed handler command senders per dataset.
149    cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<LiveCommand>>>>,
150    /// Task handles for lifecycle management.
151    task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
152    /// Cancellation token for graceful shutdown.
153    cancellation_token: CancellationToken,
154    /// Publisher to venue mapping.
155    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
156    /// Symbol to venue mapping (for caching).
157    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
158    /// Data event sender for forwarding data to the async runner.
159    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
160}
161
162impl DatabentoDataClient {
163    /// Creates a new [`DatabentoDataClient`] instance.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if client creation or publisher configuration loading fails.
168    pub fn new(
169        client_id: ClientId,
170        config: DatabentoDataClientConfig,
171        clock: &'static AtomicTime,
172    ) -> anyhow::Result<Self> {
173        let historical = DatabentoHistoricalClient::new(
174            config.api_key().to_string(),
175            config.publishers_filepath.clone(),
176            clock,
177            config.use_exchange_as_venue,
178        )?;
179
180        // Create data loader for venue-to-dataset mapping
181        let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
182
183        // Load publisher configuration
184        let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
185        let publishers_vec: Vec<crate::types::DatabentoPublisher> =
186            serde_json::from_str(&file_content)?;
187
188        let publisher_venue_map = publishers_vec
189            .into_iter()
190            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
191            .collect::<IndexMap<u16, Venue>>();
192
193        let data_sender = get_data_event_sender();
194
195        Ok(Self {
196            client_id,
197            config,
198            is_connected: AtomicBool::new(false),
199            historical,
200            loader,
201            cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
202            task_handles: Arc::new(Mutex::new(Vec::new())),
203            cancellation_token: CancellationToken::new(),
204            publisher_venue_map: Arc::new(publisher_venue_map),
205            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
206            data_sender,
207        })
208    }
209
210    /// Gets the dataset for a given venue using the data loader.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the venue-to-dataset mapping cannot be found.
215    fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
216        self.loader
217            .get_dataset_for_venue(&venue)
218            .map(ToString::to_string)
219            .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
220    }
221
222    /// Gets or creates a feed handler for the specified dataset.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if the feed handler cannot be created.
227    fn get_or_create_feed_handler(&self, dataset: &str) -> anyhow::Result<()> {
228        let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
229
230        if !channels.contains_key(dataset) {
231            log::info!("Creating new feed handler for dataset: {dataset}");
232            let cmd_tx = self.initialize_live_feed(dataset.to_string())?;
233            channels.insert(dataset.to_string(), cmd_tx);
234
235            log::debug!("Feed handler created for dataset: {dataset}, channel stored");
236        }
237
238        Ok(())
239    }
240
241    /// Sends a command to a specific dataset's feed handler.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the command cannot be sent.
246    fn send_command_to_dataset(&self, dataset: &str, cmd: LiveCommand) -> anyhow::Result<()> {
247        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
248        if let Some(tx) = channels.get(dataset) {
249            tx.send(cmd)
250                .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
251        } else {
252            anyhow::bail!("No feed handler found for dataset: {dataset}");
253        }
254        Ok(())
255    }
256
257    /// Initializes the live feed handler for streaming data.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the feed handler cannot be started.
262    fn initialize_live_feed(
263        &self,
264        dataset: String,
265    ) -> anyhow::Result<tokio::sync::mpsc::UnboundedSender<LiveCommand>> {
266        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
267        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
268
269        let mut feed_handler = DatabentoFeedHandler::new(
270            self.config.api_key().to_string(),
271            dataset,
272            cmd_rx,
273            msg_tx,
274            (*self.publisher_venue_map).clone(),
275            self.symbol_venue_map.clone(),
276            self.config.use_exchange_as_venue,
277            self.config.bars_timestamp_on_close,
278            self.config.reconnect_timeout_mins,
279        );
280
281        let cancellation_token = self.cancellation_token.clone();
282
283        // Spawn the feed handler task with cancellation support
284        let feed_handle = get_runtime().spawn(async move {
285            tokio::select! {
286                result = feed_handler.run() => {
287                    if let Err(e) = result {
288                        log::error!("Feed handler error: {e}");
289                    }
290                }
291                () = cancellation_token.cancelled() => {
292                    log::debug!("Feed handler cancelled");
293                }
294            }
295        });
296
297        let cancellation_token = self.cancellation_token.clone();
298        let data_sender = self.data_sender.clone();
299
300        // Spawn message processing task with cancellation support
301        let msg_handle = get_runtime().spawn(async move {
302            let mut msg_rx = msg_rx;
303            loop {
304                tokio::select! {
305                    msg = msg_rx.recv() => {
306                        match msg {
307                            Some(LiveMessage::Data(data)) => {
308                                log::debug!("Received data: {data:?}");
309                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
310                                    log::error!("Failed to send data event: {e}");
311                                }
312                            }
313                            Some(LiveMessage::Instrument(instrument)) => {
314                                log::info!("Received instrument definition: {}", instrument.id());
315                                if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
316                                    log::error!("Failed to send instrument: {e}");
317                                }
318                            }
319                            Some(LiveMessage::Status(status)) => {
320                                log::debug!("Received status: {status:?}");
321                                // TODO: Forward to appropriate handler
322                            }
323                            Some(LiveMessage::Imbalance(imbalance)) => {
324                                log::debug!("Received imbalance: {imbalance:?}");
325                                // TODO: Forward to appropriate handler
326                            }
327                            Some(LiveMessage::Statistics(statistics)) => {
328                                log::debug!("Received statistics: {statistics:?}");
329                                // TODO: Forward to appropriate handler
330                            }
331                            Some(LiveMessage::SubscriptionAck(ack)) => {
332                                log::debug!("Received subscription ack: {}", ack.message);
333                            }
334                            Some(LiveMessage::Error(error)) => {
335                                log::error!("Feed handler error: {error}");
336                                // TODO: Handle error appropriately
337                            }
338                            Some(LiveMessage::Close) => {
339                                log::info!("Feed handler closed");
340                                break;
341                            }
342                            None => {
343                                log::debug!("Message channel closed");
344                                break;
345                            }
346                        }
347                    }
348                    () = cancellation_token.cancelled() => {
349                        log::debug!("Message processing cancelled");
350                        break;
351                    }
352                }
353            }
354        });
355
356        {
357            let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
358            handles.push(feed_handle);
359            handles.push(msg_handle);
360        }
361
362        Ok(cmd_tx)
363    }
364}
365
366#[async_trait::async_trait(?Send)]
367impl DataClient for DatabentoDataClient {
368    /// Returns the client identifier.
369    fn client_id(&self) -> ClientId {
370        self.client_id
371    }
372
373    /// Returns the venue associated with this client (None for multi-venue clients).
374    fn venue(&self) -> Option<Venue> {
375        None
376    }
377
378    /// Starts the data client.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the client fails to start.
383    fn start(&mut self) -> anyhow::Result<()> {
384        log::debug!("Starting");
385        Ok(())
386    }
387
388    /// Stops the data client and cancels all active subscriptions.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the client fails to stop cleanly.
393    fn stop(&mut self) -> anyhow::Result<()> {
394        log::debug!("Stopping");
395
396        // Signal cancellation to all running tasks
397        self.cancellation_token.cancel();
398
399        // Send close command to all active feed handlers
400        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
401        for (dataset, tx) in channels.iter() {
402            if let Err(e) = tx.send(LiveCommand::Close) {
403                log::error!("Failed to send close command to dataset {dataset}: {e}");
404            }
405        }
406
407        self.is_connected.store(false, Ordering::Relaxed);
408        Ok(())
409    }
410
411    fn reset(&mut self) -> anyhow::Result<()> {
412        log::debug!("Resetting");
413        self.is_connected.store(false, Ordering::Relaxed);
414        Ok(())
415    }
416
417    fn dispose(&mut self) -> anyhow::Result<()> {
418        log::debug!("Disposing");
419        self.stop()
420    }
421
422    async fn connect(&mut self) -> anyhow::Result<()> {
423        log::debug!("Connecting...");
424
425        // Connection will happen lazily when subscriptions are made
426        // No need to create feed handlers upfront since we don't know which datasets will be needed
427        self.is_connected.store(true, Ordering::Relaxed);
428
429        log::info!("Connected");
430        Ok(())
431    }
432
433    async fn disconnect(&mut self) -> anyhow::Result<()> {
434        log::debug!("Disconnecting...");
435
436        // Signal cancellation to all running tasks
437        self.cancellation_token.cancel();
438
439        // Send close command to all active feed handlers
440        {
441            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
442            for (dataset, tx) in channels.iter() {
443                if let Err(e) = tx.send(LiveCommand::Close) {
444                    log::error!("Failed to send close command to dataset {dataset}: {e}");
445                }
446            }
447        }
448
449        // Wait for all spawned tasks to complete
450        let handles = {
451            let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
452            std::mem::take(&mut *task_handles)
453        };
454
455        for handle in handles {
456            if let Err(e) = handle.await
457                && !e.is_cancelled()
458            {
459                log::error!("Task join error: {e}");
460            }
461        }
462
463        self.is_connected.store(false, Ordering::Relaxed);
464
465        {
466            let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
467            channels.clear();
468        }
469
470        log::info!("Disconnected");
471        Ok(())
472    }
473
474    /// Returns whether the client is currently connected.
475    fn is_connected(&self) -> bool {
476        self.is_connected.load(Ordering::Relaxed)
477    }
478
479    fn is_disconnected(&self) -> bool {
480        !self.is_connected()
481    }
482
483    /// Subscribes to instrument definition data for the specified instrument.
484    ///
485    /// # Errors
486    ///
487    /// Returns an error if the subscription request fails.
488    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
489        log::debug!("Subscribe instrument: {cmd:?}");
490
491        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
492        let was_new_handler = {
493            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
494            !channels.contains_key(&dataset)
495        };
496
497        self.get_or_create_feed_handler(&dataset)?;
498
499        // Start the feed handler if it was newly created
500        if was_new_handler {
501            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
502        }
503
504        let symbol = instrument_id_to_symbol_string(
505            cmd.instrument_id,
506            &mut self.symbol_venue_map.write().unwrap(),
507        );
508
509        let subscription = Subscription::builder()
510            .schema(databento::dbn::Schema::Definition)
511            .symbols(symbol)
512            .build();
513
514        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
515
516        Ok(())
517    }
518
519    /// Subscribes to quote tick data for the specified instruments.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the subscription request fails.
524    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
525        log::debug!("Subscribe quotes: {cmd:?}");
526
527        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
528        let was_new_handler = {
529            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
530            !channels.contains_key(&dataset)
531        };
532
533        self.get_or_create_feed_handler(&dataset)?;
534
535        // Start the feed handler if it was newly created
536        if was_new_handler {
537            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
538        }
539
540        let symbol = instrument_id_to_symbol_string(
541            cmd.instrument_id,
542            &mut self.symbol_venue_map.write().unwrap(),
543        );
544
545        let subscription = Subscription::builder()
546            .schema(databento::dbn::Schema::Mbp1) // Market by price level 1 for quotes
547            .symbols(symbol)
548            .build();
549
550        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
551
552        Ok(())
553    }
554
555    /// Subscribes to trade tick data for the specified instruments.
556    ///
557    /// # Errors
558    ///
559    /// Returns an error if the subscription request fails.
560    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
561        log::debug!("Subscribe trades: {cmd:?}");
562
563        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
564        let was_new_handler = {
565            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
566            !channels.contains_key(&dataset)
567        };
568
569        self.get_or_create_feed_handler(&dataset)?;
570
571        // Start the feed handler if it was newly created
572        if was_new_handler {
573            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
574        }
575
576        let symbol = instrument_id_to_symbol_string(
577            cmd.instrument_id,
578            &mut self.symbol_venue_map.write().unwrap(),
579        );
580
581        let subscription = Subscription::builder()
582            .schema(databento::dbn::Schema::Trades)
583            .symbols(symbol)
584            .build();
585
586        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
587
588        Ok(())
589    }
590
591    /// Subscribes to order book delta updates for the specified instruments.
592    ///
593    /// # Errors
594    ///
595    /// Returns an error if the subscription request fails.
596    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
597        log::debug!("Subscribe book deltas: {cmd:?}");
598
599        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
600        let was_new_handler = {
601            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
602            !channels.contains_key(&dataset)
603        };
604
605        self.get_or_create_feed_handler(&dataset)?;
606
607        // Start the feed handler if it was newly created
608        if was_new_handler {
609            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
610        }
611
612        let symbol = instrument_id_to_symbol_string(
613            cmd.instrument_id,
614            &mut self.symbol_venue_map.write().unwrap(),
615        );
616
617        let subscription = Subscription::builder()
618            .schema(databento::dbn::Schema::Mbo) // Market by order for book deltas
619            .symbols(symbol)
620            .build();
621
622        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
623
624        Ok(())
625    }
626
627    /// Subscribes to instrument status updates for the specified instruments.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the subscription request fails.
632    fn subscribe_instrument_status(
633        &mut self,
634        cmd: &SubscribeInstrumentStatus,
635    ) -> anyhow::Result<()> {
636        log::debug!("Subscribe instrument status: {cmd:?}");
637
638        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
639        let was_new_handler = {
640            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
641            !channels.contains_key(&dataset)
642        };
643
644        self.get_or_create_feed_handler(&dataset)?;
645
646        // Start the feed handler if it was newly created
647        if was_new_handler {
648            self.send_command_to_dataset(&dataset, LiveCommand::Start)?;
649        }
650
651        let symbol = instrument_id_to_symbol_string(
652            cmd.instrument_id,
653            &mut self.symbol_venue_map.write().unwrap(),
654        );
655
656        let subscription = Subscription::builder()
657            .schema(databento::dbn::Schema::Status)
658            .symbols(symbol)
659            .build();
660
661        self.send_command_to_dataset(&dataset, LiveCommand::Subscribe(subscription))?;
662
663        Ok(())
664    }
665
666    // Unsubscribe methods
667    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
668        log::debug!("Unsubscribe quotes: {cmd:?}");
669
670        // Note: Databento live API doesn't support granular unsubscribing.
671        // The feed handler manages subscriptions and can handle reconnections
672        // with the appropriate subscription state.
673        log::warn!(
674            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
675            cmd.instrument_id
676        );
677
678        Ok(())
679    }
680
681    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
682        log::debug!("Unsubscribe trades: {cmd:?}");
683
684        // Note: Databento live API doesn't support granular unsubscribing.
685        // The feed handler manages subscriptions and can handle reconnections
686        // with the appropriate subscription state.
687        log::warn!(
688            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
689            cmd.instrument_id
690        );
691
692        Ok(())
693    }
694
695    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
696        log::debug!("Unsubscribe book deltas: {cmd:?}");
697
698        // Note: Databento live API doesn't support granular unsubscribing.
699        // The feed handler manages subscriptions and can handle reconnections
700        // with the appropriate subscription state.
701        log::warn!(
702            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
703            cmd.instrument_id
704        );
705
706        Ok(())
707    }
708
709    fn unsubscribe_instrument_status(
710        &mut self,
711        cmd: &UnsubscribeInstrumentStatus,
712    ) -> anyhow::Result<()> {
713        log::debug!("Unsubscribe instrument status: {cmd:?}");
714
715        // Note: Databento live API doesn't support granular unsubscribing.
716        // The feed handler manages subscriptions and can handle reconnections
717        // with the appropriate subscription state.
718        log::warn!(
719            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
720            cmd.instrument_id
721        );
722
723        Ok(())
724    }
725
726    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
727        log::debug!("Request instruments: {request:?}");
728
729        let historical_client = self.historical.clone();
730        let data_sender = self.data_sender.clone();
731
732        get_runtime().spawn(async move {
733            let symbols = vec!["ALL_SYMBOLS".to_string()]; // TODO: Improve symbol handling
734
735            let params = RangeQueryParams {
736                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
737                symbols,
738                start: request
739                    .start
740                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
741                    .into(),
742                end: request
743                    .end
744                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
745                    .map(Into::into),
746                limit: None,
747                price_precision: None,
748            };
749
750            match historical_client.get_range_instruments(params).await {
751                Ok(instruments) => {
752                    log::info!("Retrieved {} instruments", instruments.len());
753                    for instrument in instruments {
754                        if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
755                            log::error!("Failed to send instrument: {e}");
756                        }
757                    }
758                }
759                Err(e) => {
760                    log::error!("Failed to request instruments: {e}");
761                }
762            }
763        });
764
765        Ok(())
766    }
767
768    fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
769        log::debug!("Request quotes: {request:?}");
770
771        let historical_client = self.historical.clone();
772
773        get_runtime().spawn(async move {
774            let symbols = vec![instrument_id_to_symbol_string(
775                request.instrument_id,
776                &mut AHashMap::new(), // TODO: Use proper symbol map
777            )];
778
779            let params = RangeQueryParams {
780                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
781                symbols,
782                start: request
783                    .start
784                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
785                    .into(),
786                end: request
787                    .end
788                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
789                    .map(Into::into),
790                limit: request.limit.map(|l| l.get() as u64),
791                price_precision: None,
792            };
793
794            match historical_client.get_range_quotes(params, None).await {
795                Ok(quotes) => {
796                    log::info!("Retrieved {} quotes", quotes.len());
797                    // TODO: Send quotes to message bus
798                }
799                Err(e) => {
800                    log::error!("Failed to request quotes: {e}");
801                }
802            }
803        });
804
805        Ok(())
806    }
807
808    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
809        log::debug!("Request trades: {request:?}");
810
811        let historical_client = self.historical.clone();
812
813        get_runtime().spawn(async move {
814            let symbols = vec![instrument_id_to_symbol_string(
815                request.instrument_id,
816                &mut AHashMap::new(), // TODO: Use proper symbol map
817            )];
818
819            let params = RangeQueryParams {
820                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
821                symbols,
822                start: request
823                    .start
824                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
825                    .into(),
826                end: request
827                    .end
828                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
829                    .map(Into::into),
830                limit: request.limit.map(|l| l.get() as u64),
831                price_precision: None,
832            };
833
834            match historical_client.get_range_trades(params).await {
835                Ok(trades) => {
836                    log::info!("Retrieved {} trades", trades.len());
837                    // TODO: Send trades to message bus
838                }
839                Err(e) => {
840                    log::error!("Failed to request trades: {e}");
841                }
842            }
843        });
844
845        Ok(())
846    }
847
848    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
849        log::debug!("Request bars: {request:?}");
850
851        let historical_client = self.historical.clone();
852
853        get_runtime().spawn(async move {
854            let symbols = vec![instrument_id_to_symbol_string(
855                request.bar_type.instrument_id(),
856                &mut AHashMap::new(), // TODO: Use proper symbol map
857            )];
858
859            let params = RangeQueryParams {
860                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
861                symbols,
862                start: request
863                    .start
864                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
865                    .into(),
866                end: request
867                    .end
868                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
869                    .map(Into::into),
870                limit: request.limit.map(|l| l.get() as u64),
871                price_precision: None,
872            };
873
874            // Map bar aggregation from the request
875            let aggregation = match request.bar_type.spec().aggregation {
876                BarAggregation::Second => BarAggregation::Second,
877                BarAggregation::Minute => BarAggregation::Minute,
878                BarAggregation::Hour => BarAggregation::Hour,
879                BarAggregation::Day => BarAggregation::Day,
880                _ => {
881                    log::error!(
882                        "Unsupported bar aggregation: {:?}",
883                        request.bar_type.spec().aggregation
884                    );
885                    return;
886                }
887            };
888
889            match historical_client
890                .get_range_bars(params, aggregation, true)
891                .await
892            {
893                Ok(bars) => {
894                    log::info!("Retrieved {} bars", bars.len());
895                    // TODO: Send bars to message bus
896                }
897                Err(e) => {
898                    log::error!("Failed to request bars: {e}");
899                }
900            }
901        });
902
903        Ok(())
904    }
905}