nautilus_databento/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core Databento historical client for both Rust and Python usage.
17
18use std::{
19    fs,
20    num::NonZeroU64,
21    path::PathBuf,
22    str::FromStr,
23    sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28    dbn::{self, decode::DbnMetadata},
29    historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34    data::{Bar, Data, InstrumentStatus, QuoteTick, TradeTick},
35    enums::BarAggregation,
36    identifiers::{InstrumentId, Symbol, Venue},
37    instruments::InstrumentAny,
38    types::Currency,
39};
40use tokio::sync::Mutex;
41
42use crate::{
43    common::get_date_time_range,
44    decode::{
45        decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
46        decode_status_msg,
47    },
48    symbology::{
49        MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
50        infer_symbology_type, instrument_id_to_symbol_string,
51    },
52    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
53};
54
55/// Core Databento historical client for fetching historical market data.
56///
57/// This client provides both synchronous and asynchronous interfaces for fetching
58/// various types of historical market data from Databento.
59#[derive(Debug, Clone)]
60pub struct DatabentoHistoricalClient {
61    pub key: String,
62    clock: &'static AtomicTime,
63    inner: Arc<Mutex<databento::HistoricalClient>>,
64    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
65    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
66    use_exchange_as_venue: bool,
67}
68
69/// Parameters for range queries to Databento historical API.
70#[derive(Debug)]
71pub struct RangeQueryParams {
72    pub dataset: String,
73    pub symbols: Vec<String>,
74    pub start: UnixNanos,
75    pub end: Option<UnixNanos>,
76    pub limit: Option<u64>,
77    pub price_precision: Option<u8>,
78}
79
80/// Result containing dataset date range information.
81#[derive(Debug, Clone)]
82pub struct DatasetRange {
83    pub start: String,
84    pub end: String,
85}
86
87impl DatabentoHistoricalClient {
88    /// Creates a new [`DatabentoHistoricalClient`] instance.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if client creation or publisher loading fails.
93    pub fn new(
94        key: String,
95        publishers_filepath: PathBuf,
96        clock: &'static AtomicTime,
97        use_exchange_as_venue: bool,
98    ) -> anyhow::Result<Self> {
99        let client = databento::HistoricalClient::builder()
100            .user_agent_extension(NAUTILUS_USER_AGENT.into())
101            .key(key.clone())
102            .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
103            .build()
104            .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
105
106        let file_content = fs::read_to_string(publishers_filepath)?;
107        let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
108
109        let publisher_venue_map = publishers_vec
110            .into_iter()
111            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
112            .collect::<IndexMap<u16, Venue>>();
113
114        Ok(Self {
115            clock,
116            inner: Arc::new(Mutex::new(client)),
117            publisher_venue_map: Arc::new(publisher_venue_map),
118            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
119            key,
120            use_exchange_as_venue,
121        })
122    }
123
124    /// Gets the date range for a specific dataset.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the API request fails.
129    pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
130        let mut client = self.inner.lock().await;
131        let response = client
132            .metadata()
133            .get_dataset_range(dataset)
134            .await
135            .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
136
137        Ok(DatasetRange {
138            start: response.start.to_string(),
139            end: response.end.to_string(),
140        })
141    }
142
143    /// Fetches instrument definitions for the given parameters.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the API request or data processing fails.
148    pub async fn get_range_instruments(
149        &self,
150        params: RangeQueryParams,
151    ) -> anyhow::Result<Vec<InstrumentAny>> {
152        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
153        check_consistent_symbology(&symbols)?;
154
155        let first_symbol = params
156            .symbols
157            .first()
158            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
159        let stype_in = infer_symbology_type(first_symbol);
160        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
161        let time_range = get_date_time_range(params.start, end)?;
162
163        let range_params = GetRangeParams::builder()
164            .dataset(params.dataset)
165            .date_time_range(time_range)
166            .symbols(symbols)
167            .stype_in(stype_in)
168            .schema(dbn::Schema::Definition)
169            .limit(params.limit.and_then(NonZeroU64::new))
170            .build();
171
172        let mut client = self.inner.lock().await;
173        let mut decoder = client
174            .timeseries()
175            .get_range(&range_params)
176            .await
177            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
178
179        let metadata = decoder.metadata().clone();
180        let mut metadata_cache = MetadataCache::new(metadata);
181        let mut instruments = Vec::new();
182
183        while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
184            let record = dbn::RecordRef::from(msg);
185            let sym_map = self
186                .symbol_venue_map
187                .read()
188                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
189            let mut instrument_id = decode_nautilus_instrument_id(
190                &record,
191                &mut metadata_cache,
192                &self.publisher_venue_map,
193                &sym_map,
194            )?;
195
196            if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
197                let exchange = msg
198                    .exchange()
199                    .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
200                let venue = Venue::from_code(exchange)
201                    .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
202                instrument_id.venue = venue;
203            }
204
205            match decode_instrument_def_msg(msg, instrument_id, None) {
206                Ok(instrument) => instruments.push(instrument),
207                Err(e) => tracing::error!("Failed to decode instrument: {e:?}"),
208            }
209        }
210
211        Ok(instruments)
212    }
213
214    /// Fetches quote ticks for the given parameters.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the API request or data processing fails.
219    pub async fn get_range_quotes(
220        &self,
221        params: RangeQueryParams,
222        schema: Option<String>,
223    ) -> anyhow::Result<Vec<QuoteTick>> {
224        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
225        check_consistent_symbology(&symbols)?;
226
227        let first_symbol = params
228            .symbols
229            .first()
230            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
231        let stype_in = infer_symbology_type(first_symbol);
232        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
233        let time_range = get_date_time_range(params.start, end)?;
234        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
235        let dbn_schema = dbn::Schema::from_str(&schema)?;
236
237        match dbn_schema {
238            dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
239            _ => anyhow::bail!("Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m"),
240        }
241
242        let range_params = GetRangeParams::builder()
243            .dataset(params.dataset)
244            .date_time_range(time_range)
245            .symbols(symbols)
246            .stype_in(stype_in)
247            .schema(dbn_schema)
248            .limit(params.limit.and_then(NonZeroU64::new))
249            .build();
250
251        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
252
253        let mut client = self.inner.lock().await;
254        let mut decoder = client
255            .timeseries()
256            .get_range(&range_params)
257            .await
258            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
259
260        let metadata = decoder.metadata().clone();
261        let mut metadata_cache = MetadataCache::new(metadata);
262        let mut result: Vec<QuoteTick> = Vec::new();
263
264        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
265            let sym_map = self
266                .symbol_venue_map
267                .read()
268                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
269            let instrument_id = decode_nautilus_instrument_id(
270                &record,
271                &mut metadata_cache,
272                &self.publisher_venue_map,
273                &sym_map,
274            )?;
275
276            let (data, _) = decode_record(
277                &record,
278                instrument_id,
279                price_precision,
280                None,
281                false, // Don't include trades
282                true,
283            )?;
284
285            match data {
286                Some(Data::Quote(quote)) => {
287                    result.push(quote);
288                    Ok(())
289                }
290                _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
291            }
292        };
293
294        match dbn_schema {
295            dbn::Schema::Mbp1 => {
296                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
297                    process_record(dbn::RecordRef::from(msg))?;
298                }
299            }
300            dbn::Schema::Bbo1M => {
301                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
302                    process_record(dbn::RecordRef::from(msg))?;
303                }
304            }
305            dbn::Schema::Bbo1S => {
306                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
307                    process_record(dbn::RecordRef::from(msg))?;
308                }
309            }
310            _ => anyhow::bail!("Invalid schema {dbn_schema}"),
311        }
312
313        Ok(result)
314    }
315
316    /// Fetches trade ticks for the given parameters.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the API request or data processing fails.
321    pub async fn get_range_trades(
322        &self,
323        params: RangeQueryParams,
324    ) -> anyhow::Result<Vec<TradeTick>> {
325        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
326        check_consistent_symbology(&symbols)?;
327
328        let first_symbol = params
329            .symbols
330            .first()
331            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
332        let stype_in = infer_symbology_type(first_symbol);
333        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
334        let time_range = get_date_time_range(params.start, end)?;
335
336        let range_params = GetRangeParams::builder()
337            .dataset(params.dataset)
338            .date_time_range(time_range)
339            .symbols(symbols)
340            .stype_in(stype_in)
341            .schema(dbn::Schema::Trades)
342            .limit(params.limit.and_then(NonZeroU64::new))
343            .build();
344
345        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
346
347        let mut client = self.inner.lock().await;
348        let mut decoder = client
349            .timeseries()
350            .get_range(&range_params)
351            .await
352            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
353
354        let metadata = decoder.metadata().clone();
355        let mut metadata_cache = MetadataCache::new(metadata);
356        let mut result: Vec<TradeTick> = Vec::new();
357
358        while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
359            let record = dbn::RecordRef::from(msg);
360            let sym_map = self
361                .symbol_venue_map
362                .read()
363                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
364            let instrument_id = decode_nautilus_instrument_id(
365                &record,
366                &mut metadata_cache,
367                &self.publisher_venue_map,
368                &sym_map,
369            )?;
370
371            let (data, _) = decode_record(
372                &record,
373                instrument_id,
374                price_precision,
375                None,
376                false, // Not applicable (trade will be decoded regardless)
377                true,
378            )?;
379
380            match data {
381                Some(Data::Trade(trade)) => {
382                    result.push(trade);
383                }
384                _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
385            }
386        }
387
388        Ok(result)
389    }
390
391    /// Fetches bars for the given parameters.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the API request or data processing fails.
396    pub async fn get_range_bars(
397        &self,
398        params: RangeQueryParams,
399        aggregation: BarAggregation,
400        timestamp_on_close: bool,
401    ) -> anyhow::Result<Vec<Bar>> {
402        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
403        check_consistent_symbology(&symbols)?;
404
405        let first_symbol = params
406            .symbols
407            .first()
408            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
409        let stype_in = infer_symbology_type(first_symbol);
410        let schema = match aggregation {
411            BarAggregation::Second => dbn::Schema::Ohlcv1S,
412            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
413            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
414            BarAggregation::Day => dbn::Schema::Ohlcv1D,
415            _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
416        };
417
418        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
419        let time_range = get_date_time_range(params.start, end)?;
420
421        let range_params = GetRangeParams::builder()
422            .dataset(params.dataset)
423            .date_time_range(time_range)
424            .symbols(symbols)
425            .stype_in(stype_in)
426            .schema(schema)
427            .limit(params.limit.and_then(NonZeroU64::new))
428            .build();
429
430        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
431
432        let mut client = self.inner.lock().await;
433        let mut decoder = client
434            .timeseries()
435            .get_range(&range_params)
436            .await
437            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
438
439        let metadata = decoder.metadata().clone();
440        let mut metadata_cache = MetadataCache::new(metadata);
441        let mut result: Vec<Bar> = Vec::new();
442
443        while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
444            let record = dbn::RecordRef::from(msg);
445            let sym_map = self
446                .symbol_venue_map
447                .read()
448                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
449            let instrument_id = decode_nautilus_instrument_id(
450                &record,
451                &mut metadata_cache,
452                &self.publisher_venue_map,
453                &sym_map,
454            )?;
455
456            let (data, _) = decode_record(
457                &record,
458                instrument_id,
459                price_precision,
460                None,
461                false, // Not applicable
462                timestamp_on_close,
463            )?;
464
465            match data {
466                Some(Data::Bar(bar)) => {
467                    result.push(bar);
468                }
469                _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
470            }
471        }
472
473        Ok(result)
474    }
475
476    /// Fetches imbalance data for the given parameters.
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if the API request or data processing fails.
481    pub async fn get_range_imbalance(
482        &self,
483        params: RangeQueryParams,
484    ) -> anyhow::Result<Vec<DatabentoImbalance>> {
485        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
486        check_consistent_symbology(&symbols)?;
487
488        let first_symbol = params
489            .symbols
490            .first()
491            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
492        let stype_in = infer_symbology_type(first_symbol);
493        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
494        let time_range = get_date_time_range(params.start, end)?;
495
496        let range_params = GetRangeParams::builder()
497            .dataset(params.dataset)
498            .date_time_range(time_range)
499            .symbols(symbols)
500            .stype_in(stype_in)
501            .schema(dbn::Schema::Imbalance)
502            .limit(params.limit.and_then(NonZeroU64::new))
503            .build();
504
505        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
506
507        let mut client = self.inner.lock().await;
508        let mut decoder = client
509            .timeseries()
510            .get_range(&range_params)
511            .await
512            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
513
514        let metadata = decoder.metadata().clone();
515        let mut metadata_cache = MetadataCache::new(metadata);
516        let mut result: Vec<DatabentoImbalance> = Vec::new();
517
518        while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
519            let record = dbn::RecordRef::from(msg);
520            let sym_map = self
521                .symbol_venue_map
522                .read()
523                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
524            let instrument_id = decode_nautilus_instrument_id(
525                &record,
526                &mut metadata_cache,
527                &self.publisher_venue_map,
528                &sym_map,
529            )?;
530
531            let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
532            result.push(imbalance);
533        }
534
535        Ok(result)
536    }
537
538    /// Fetches statistics data for the given parameters.
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if the API request or data processing fails.
543    pub async fn get_range_statistics(
544        &self,
545        params: RangeQueryParams,
546    ) -> anyhow::Result<Vec<DatabentoStatistics>> {
547        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
548        check_consistent_symbology(&symbols)?;
549
550        let first_symbol = params
551            .symbols
552            .first()
553            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
554        let stype_in = infer_symbology_type(first_symbol);
555        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
556        let time_range = get_date_time_range(params.start, end)?;
557
558        let range_params = GetRangeParams::builder()
559            .dataset(params.dataset)
560            .date_time_range(time_range)
561            .symbols(symbols)
562            .stype_in(stype_in)
563            .schema(dbn::Schema::Statistics)
564            .limit(params.limit.and_then(NonZeroU64::new))
565            .build();
566
567        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
568
569        let mut client = self.inner.lock().await;
570        let mut decoder = client
571            .timeseries()
572            .get_range(&range_params)
573            .await
574            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
575
576        let metadata = decoder.metadata().clone();
577        let mut metadata_cache = MetadataCache::new(metadata);
578        let mut result: Vec<DatabentoStatistics> = Vec::new();
579
580        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
581            let record = dbn::RecordRef::from(msg);
582            let sym_map = self
583                .symbol_venue_map
584                .read()
585                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
586            let instrument_id = decode_nautilus_instrument_id(
587                &record,
588                &mut metadata_cache,
589                &self.publisher_venue_map,
590                &sym_map,
591            )?;
592
593            let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
594            result.push(statistics);
595        }
596
597        Ok(result)
598    }
599
600    /// Fetches status data for the given parameters.
601    ///
602    /// # Errors
603    ///
604    /// Returns an error if the API request or data processing fails.
605    pub async fn get_range_status(
606        &self,
607        params: RangeQueryParams,
608    ) -> anyhow::Result<Vec<InstrumentStatus>> {
609        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
610        check_consistent_symbology(&symbols)?;
611
612        let first_symbol = params
613            .symbols
614            .first()
615            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
616        let stype_in = infer_symbology_type(first_symbol);
617        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
618        let time_range = get_date_time_range(params.start, end)?;
619
620        let range_params = GetRangeParams::builder()
621            .dataset(params.dataset)
622            .date_time_range(time_range)
623            .symbols(symbols)
624            .stype_in(stype_in)
625            .schema(dbn::Schema::Status)
626            .limit(params.limit.and_then(NonZeroU64::new))
627            .build();
628
629        let mut client = self.inner.lock().await;
630        let mut decoder = client
631            .timeseries()
632            .get_range(&range_params)
633            .await
634            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
635
636        let metadata = decoder.metadata().clone();
637        let mut metadata_cache = MetadataCache::new(metadata);
638        let mut result: Vec<InstrumentStatus> = Vec::new();
639
640        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
641            let record = dbn::RecordRef::from(msg);
642            let sym_map = self
643                .symbol_venue_map
644                .read()
645                .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
646            let instrument_id = decode_nautilus_instrument_id(
647                &record,
648                &mut metadata_cache,
649                &self.publisher_venue_map,
650                &sym_map,
651            )?;
652
653            let status = decode_status_msg(msg, instrument_id, None)?;
654            result.push(status);
655        }
656
657        Ok(result)
658    }
659
660    /// Helper method to prepare symbols from instrument IDs.
661    ///
662    /// # Errors
663    ///
664    /// Returns an error if the symbol venue map lock is poisoned.
665    pub fn prepare_symbols_from_instrument_ids(
666        &self,
667        instrument_ids: &[InstrumentId],
668    ) -> anyhow::Result<Vec<String>> {
669        let mut symbol_venue_map = self
670            .symbol_venue_map
671            .write()
672            .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
673
674        let symbols: Vec<String> = instrument_ids
675            .iter()
676            .map(|instrument_id| {
677                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
678            })
679            .collect();
680
681        Ok(symbols)
682    }
683}