nautilus_persistence/python/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_core::UnixNanos;
19use nautilus_model::data::{
20    Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
21};
22use pyo3::{exceptions::PyIOError, prelude::*};
23
24use crate::backend::catalog::ParquetDataCatalog;
25
26/// A catalog for writing data to Parquet files.
27#[cfg_attr(
28    feature = "python",
29    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.persistence")
30)]
31pub struct ParquetDataCatalogV2 {
32    inner: ParquetDataCatalog,
33}
34
35#[pymethods]
36impl ParquetDataCatalogV2 {
37    /// Create a new `ParquetCatalog` with the given base path and optional parameters.
38    ///
39    /// # Parameters
40    ///
41    /// - `base_path`: The base path for the catalog
42    /// - `storage_options`: Optional storage configuration for cloud backends
43    /// - `batch_size`: Optional batch size for processing (default: 5000)
44    /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
45    /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
46    #[new]
47    #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
48    #[must_use]
49    pub fn new(
50        base_path: String,
51        storage_options: Option<HashMap<String, String>>,
52        batch_size: Option<usize>,
53        compression: Option<u8>,
54        max_row_group_size: Option<usize>,
55    ) -> Self {
56        let compression = compression.map(|c| match c {
57            0 => parquet::basic::Compression::UNCOMPRESSED,
58            1 => parquet::basic::Compression::SNAPPY,
59            // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
60            // since we can't pass the level parameter through PyO3
61            2 => {
62                let level = Default::default();
63                parquet::basic::Compression::GZIP(level)
64            }
65            3 => parquet::basic::Compression::LZO,
66            4 => {
67                let level = Default::default();
68                parquet::basic::Compression::BROTLI(level)
69            }
70            5 => parquet::basic::Compression::LZ4,
71            6 => {
72                let level = Default::default();
73                parquet::basic::Compression::ZSTD(level)
74            }
75            _ => parquet::basic::Compression::SNAPPY,
76        });
77
78        // Convert HashMap to AHashMap for internal use
79        let storage_options = storage_options.map(|m| m.into_iter().collect());
80
81        Self {
82            inner: ParquetDataCatalog::from_uri(
83                &base_path,
84                storage_options,
85                batch_size,
86                compression,
87                max_row_group_size,
88            )
89            .expect("Failed to create ParquetDataCatalog"),
90        }
91    }
92
93    // TODO: Cannot pass mixed data across pyo3 as a single type
94    // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: NautilusDataType, data: Vec<Data>) {}
95
96    /// Write quote tick data to Parquet files.
97    ///
98    /// # Parameters
99    ///
100    /// - `data`: Vector of quote ticks to write
101    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
102    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
103    ///
104    /// # Returns
105    ///
106    /// Returns the path of the created file as a string.
107    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
108    pub fn write_quote_ticks(
109        &self,
110        data: Vec<QuoteTick>,
111        start: Option<u64>,
112        end: Option<u64>,
113        skip_disjoint_check: bool,
114    ) -> PyResult<String> {
115        // Convert u64 timestamps to UnixNanos
116        let start_nanos = start.map(UnixNanos::from);
117        let end_nanos = end.map(UnixNanos::from);
118
119        self.inner
120            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
121            .map(|path| path.to_string_lossy().to_string())
122            .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
123    }
124
125    /// Write trade tick data to Parquet files.
126    ///
127    /// # Parameters
128    ///
129    /// - `data`: Vector of trade ticks to write
130    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
131    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
132    ///
133    /// # Returns
134    ///
135    /// Returns the path of the created file as a string.
136    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
137    pub fn write_trade_ticks(
138        &self,
139        data: Vec<TradeTick>,
140        start: Option<u64>,
141        end: Option<u64>,
142        skip_disjoint_check: bool,
143    ) -> PyResult<String> {
144        // Convert u64 timestamps to UnixNanos
145        let start_nanos = start.map(UnixNanos::from);
146        let end_nanos = end.map(UnixNanos::from);
147
148        self.inner
149            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
150            .map(|path| path.to_string_lossy().to_string())
151            .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
152    }
153
154    /// Write order book delta data to Parquet files.
155    ///
156    /// # Parameters
157    ///
158    /// - `data`: Vector of order book deltas to write
159    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
160    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
161    ///
162    /// # Returns
163    ///
164    /// Returns the path of the created file as a string.
165    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
166    pub fn write_order_book_deltas(
167        &self,
168        data: Vec<OrderBookDelta>,
169        start: Option<u64>,
170        end: Option<u64>,
171        skip_disjoint_check: bool,
172    ) -> PyResult<String> {
173        // Convert u64 timestamps to UnixNanos
174        let start_nanos = start.map(UnixNanos::from);
175        let end_nanos = end.map(UnixNanos::from);
176
177        self.inner
178            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
179            .map(|path| path.to_string_lossy().to_string())
180            .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
181    }
182
183    /// Write bar data to Parquet files.
184    ///
185    /// # Parameters
186    ///
187    /// - `data`: Vector of bars to write
188    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
189    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
190    ///
191    /// # Returns
192    ///
193    /// Returns the path of the created file as a string.
194    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
195    pub fn write_bars(
196        &self,
197        data: Vec<Bar>,
198        start: Option<u64>,
199        end: Option<u64>,
200        skip_disjoint_check: bool,
201    ) -> PyResult<String> {
202        // Convert u64 timestamps to UnixNanos
203        let start_nanos = start.map(UnixNanos::from);
204        let end_nanos = end.map(UnixNanos::from);
205
206        self.inner
207            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
208            .map(|path| path.to_string_lossy().to_string())
209            .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
210    }
211
212    /// Write order book depth data to Parquet files.
213    ///
214    /// # Parameters
215    ///
216    /// - `data`: Vector of order book depths to write
217    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
218    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
219    ///
220    /// # Returns
221    ///
222    /// Returns the path of the created file as a string.
223    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
224    pub fn write_order_book_depths(
225        &self,
226        data: Vec<OrderBookDepth10>,
227        start: Option<u64>,
228        end: Option<u64>,
229        skip_disjoint_check: bool,
230    ) -> PyResult<String> {
231        // Convert u64 timestamps to UnixNanos
232        let start_nanos = start.map(UnixNanos::from);
233        let end_nanos = end.map(UnixNanos::from);
234
235        self.inner
236            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
237            .map(|path| path.to_string_lossy().to_string())
238            .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
239    }
240
241    /// Write mark price update data to Parquet files.
242    ///
243    /// # Parameters
244    ///
245    /// - `data`: Vector of mark price updates to write
246    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
247    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
248    ///
249    /// # Returns
250    ///
251    /// Returns the path of the created file as a string.
252    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
253    pub fn write_mark_price_updates(
254        &self,
255        data: Vec<MarkPriceUpdate>,
256        start: Option<u64>,
257        end: Option<u64>,
258        skip_disjoint_check: bool,
259    ) -> PyResult<String> {
260        // Convert u64 timestamps to UnixNanos
261        let start_nanos = start.map(UnixNanos::from);
262        let end_nanos = end.map(UnixNanos::from);
263
264        self.inner
265            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
266            .map(|path| path.to_string_lossy().to_string())
267            .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
268    }
269
270    /// Write index price update data to Parquet files.
271    ///
272    /// # Parameters
273    ///
274    /// - `data`: Vector of index price updates to write
275    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
276    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
277    ///
278    /// # Returns
279    ///
280    /// Returns the path of the created file as a string.
281    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
282    pub fn write_index_price_updates(
283        &self,
284        data: Vec<IndexPriceUpdate>,
285        start: Option<u64>,
286        end: Option<u64>,
287        skip_disjoint_check: bool,
288    ) -> PyResult<String> {
289        // Convert u64 timestamps to UnixNanos
290        let start_nanos = start.map(UnixNanos::from);
291        let end_nanos = end.map(UnixNanos::from);
292
293        self.inner
294            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
295            .map(|path| path.to_string_lossy().to_string())
296            .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
297    }
298
299    /// Extend file names in the catalog with additional timestamp information.
300    ///
301    /// # Parameters
302    ///
303    /// - `data_cls`: The data class name
304    /// - `instrument_id`: Optional instrument ID filter
305    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
306    /// - `end`: End timestamp (nanoseconds since Unix epoch)
307    #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
308    pub fn extend_file_name(
309        &self,
310        data_cls: &str,
311        instrument_id: Option<String>,
312        start: u64,
313        end: u64,
314    ) -> PyResult<()> {
315        // Convert u64 timestamps to UnixNanos
316        let start_nanos = UnixNanos::from(start);
317        let end_nanos = UnixNanos::from(end);
318
319        self.inner
320            .extend_file_name(data_cls, instrument_id, start_nanos, end_nanos)
321            .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
322    }
323
324    /// Consolidate all data files in the catalog within the specified time range.
325    ///
326    /// # Parameters
327    ///
328    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
329    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
330    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
331    #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None))]
332    pub fn consolidate_catalog(
333        &self,
334        start: Option<u64>,
335        end: Option<u64>,
336        ensure_contiguous_files: Option<bool>,
337    ) -> PyResult<()> {
338        // Convert u64 timestamps to UnixNanos
339        let start_nanos = start.map(UnixNanos::from);
340        let end_nanos = end.map(UnixNanos::from);
341
342        self.inner
343            .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files)
344            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
345    }
346
347    /// Consolidate data files for a specific data type within the specified time range.
348    ///
349    /// # Parameters
350    ///
351    /// - `type_name`: The data type name to consolidate
352    /// - `instrument_id`: Optional instrument ID filter
353    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
354    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
355    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
356    #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None))]
357    pub fn consolidate_data(
358        &self,
359        type_name: &str,
360        instrument_id: Option<String>,
361        start: Option<u64>,
362        end: Option<u64>,
363        ensure_contiguous_files: Option<bool>,
364    ) -> PyResult<()> {
365        // Convert u64 timestamps to UnixNanos
366        let start_nanos = start.map(UnixNanos::from);
367        let end_nanos = end.map(UnixNanos::from);
368
369        self.inner
370            .consolidate_data(
371                type_name,
372                instrument_id,
373                start_nanos,
374                end_nanos,
375                ensure_contiguous_files,
376            )
377            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
378    }
379
380    /// Consolidate all data files in the catalog by splitting them into fixed time periods.
381    ///
382    /// This method identifies all leaf directories in the catalog that contain parquet files
383    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
384    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
385    /// and instrument IDs in the catalog.
386    ///
387    /// # Parameters
388    ///
389    /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
390    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
391    /// - `start`: Optional start timestamp for the consolidation range (nanoseconds since Unix epoch)
392    /// - `end`: Optional end timestamp for the consolidation range (nanoseconds since Unix epoch)
393    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
394    #[pyo3(signature = (period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
395    pub fn consolidate_catalog_by_period(
396        &mut self,
397        period_nanos: Option<u64>,
398        start: Option<u64>,
399        end: Option<u64>,
400        ensure_contiguous_files: Option<bool>,
401    ) -> PyResult<()> {
402        // Convert u64 timestamps to UnixNanos
403        let start_nanos = start.map(UnixNanos::from);
404        let end_nanos = end.map(UnixNanos::from);
405
406        self.inner
407            .consolidate_catalog_by_period(
408                period_nanos,
409                start_nanos,
410                end_nanos,
411                ensure_contiguous_files,
412            )
413            .map_err(|e| {
414                PyIOError::new_err(format!("Failed to consolidate catalog by period: {e}"))
415            })
416    }
417
418    /// Consolidate data files by splitting them into fixed time periods.
419    ///
420    /// This method queries data by period and writes consolidated files immediately,
421    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
422    /// the function automatically splits those files to preserve all data.
423    ///
424    /// # Parameters
425    ///
426    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
427    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments
428    /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
429    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
430    /// - `start`: Optional start timestamp for consolidation range (nanoseconds since Unix epoch)
431    /// - `end`: Optional end timestamp for consolidation range (nanoseconds since Unix epoch)
432    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
433    #[pyo3(signature = (type_name, identifier=None, period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
434    pub fn consolidate_data_by_period(
435        &mut self,
436        type_name: &str,
437        identifier: Option<String>,
438        period_nanos: Option<u64>,
439        start: Option<u64>,
440        end: Option<u64>,
441        ensure_contiguous_files: Option<bool>,
442    ) -> PyResult<()> {
443        // Convert u64 timestamps to UnixNanos
444        let start_nanos = start.map(UnixNanos::from);
445        let end_nanos = end.map(UnixNanos::from);
446
447        self.inner
448            .consolidate_data_by_period(
449                type_name,
450                identifier,
451                period_nanos,
452                start_nanos,
453                end_nanos,
454                ensure_contiguous_files,
455            )
456            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data by period: {e}")))
457    }
458
459    /// Reset all catalog file names to their canonical form.
460    pub fn reset_all_file_names(&self) -> PyResult<()> {
461        self.inner
462            .reset_all_file_names()
463            .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
464    }
465
466    /// Reset data file names for a specific data class to their canonical form.
467    ///
468    /// # Parameters
469    ///
470    /// - `data_cls`: The data class name
471    /// - `instrument_id`: Optional instrument ID filter
472    #[pyo3(signature = (data_cls, instrument_id=None))]
473    pub fn reset_data_file_names(
474        &self,
475        data_cls: &str,
476        instrument_id: Option<String>,
477    ) -> PyResult<()> {
478        self.inner
479            .reset_data_file_names(data_cls, instrument_id)
480            .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
481    }
482
483    /// Delete data within a specified time range across the entire catalog.
484    ///
485    /// This method identifies all leaf directories in the catalog that contain parquet files
486    /// and deletes data within the specified time range from each directory. A leaf directory
487    /// is one that contains files but no subdirectories. This is a convenience method that
488    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
489    ///
490    /// # Parameters
491    ///
492    /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
493    /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
494    ///
495    /// # Notes
496    ///
497    /// - This operation permanently removes data and cannot be undone
498    /// - The deletion process handles file intersections intelligently by splitting files
499    ///   when they partially overlap with the deletion range
500    /// - Files completely within the deletion range are removed entirely
501    /// - Files partially overlapping the deletion range are split to preserve data outside the range
502    /// - This method is useful for bulk data cleanup operations across the entire catalog
503    /// - Empty directories are not automatically removed after deletion
504    #[pyo3(signature = (start=None, end=None))]
505    pub fn delete_catalog_range(&mut self, start: Option<u64>, end: Option<u64>) -> PyResult<()> {
506        // Convert u64 timestamps to UnixNanos
507        let start_nanos = start.map(UnixNanos::from);
508        let end_nanos = end.map(UnixNanos::from);
509
510        self.inner
511            .delete_catalog_range(start_nanos, end_nanos)
512            .map_err(|e| PyIOError::new_err(format!("Failed to delete catalog range: {e}")))
513    }
514
515    /// Delete data within a specified time range for a specific data type and instrument.
516    ///
517    /// This method identifies all parquet files that intersect with the specified time range
518    /// and handles them appropriately:
519    /// - Files completely within the range are deleted
520    /// - Files partially overlapping the range are split to preserve data outside the range
521    /// - The original intersecting files are removed after processing
522    ///
523    /// # Parameters
524    ///
525    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
526    /// - `instrument_id`: Optional instrument ID to delete data for. If None, deletes data across all instruments
527    /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
528    /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
529    ///
530    /// # Notes
531    ///
532    /// - This operation permanently removes data and cannot be undone
533    /// - Files that partially overlap the deletion range are split to preserve data outside the range
534    /// - The method ensures data integrity by using atomic operations where possible
535    /// - Empty directories are not automatically removed after deletion
536    #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None))]
537    pub fn delete_data_range(
538        &mut self,
539        type_name: &str,
540        instrument_id: Option<String>,
541        start: Option<u64>,
542        end: Option<u64>,
543    ) -> PyResult<()> {
544        // Convert u64 timestamps to UnixNanos
545        let start_nanos = start.map(UnixNanos::from);
546        let end_nanos = end.map(UnixNanos::from);
547
548        self.inner
549            .delete_data_range(type_name, instrument_id, start_nanos, end_nanos)
550            .map_err(|e| PyIOError::new_err(format!("Failed to delete data range: {e}")))
551    }
552
553    /// Query files in the catalog matching the specified criteria.
554    ///
555    /// # Parameters
556    ///
557    /// - `data_cls`: The data class name to query
558    /// - `instrument_ids`: Optional list of instrument IDs to filter by
559    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
560    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
561    ///
562    /// # Returns
563    ///
564    /// Returns a list of file paths matching the criteria.
565    #[pyo3(signature = (data_cls, instrument_ids=None, start=None, end=None))]
566    pub fn query_files(
567        &self,
568        data_cls: &str,
569        instrument_ids: Option<Vec<String>>,
570        start: Option<u64>,
571        end: Option<u64>,
572    ) -> PyResult<Vec<String>> {
573        // Convert u64 timestamps to UnixNanos
574        let start_nanos = start.map(UnixNanos::from);
575        let end_nanos = end.map(UnixNanos::from);
576
577        self.inner
578            .query_files(data_cls, instrument_ids, start_nanos, end_nanos)
579            .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
580    }
581
582    /// Get missing time intervals for a data request.
583    ///
584    /// # Parameters
585    ///
586    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
587    /// - `end`: End timestamp (nanoseconds since Unix epoch)
588    /// - `data_cls`: The data class name
589    /// - `instrument_id`: Optional instrument ID filter
590    ///
591    /// # Returns
592    ///
593    /// Returns a list of (start, end) timestamp tuples representing missing intervals.
594    #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
595    pub fn get_missing_intervals_for_request(
596        &self,
597        start: u64,
598        end: u64,
599        data_cls: &str,
600        instrument_id: Option<String>,
601    ) -> PyResult<Vec<(u64, u64)>> {
602        self.inner
603            .get_missing_intervals_for_request(start, end, data_cls, instrument_id)
604            .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
605    }
606
607    /// Query the last timestamp for a specific data class and instrument.
608    ///
609    /// # Parameters
610    ///
611    /// - `data_cls`: The data class name
612    /// - `instrument_id`: Optional instrument ID filter
613    ///
614    /// # Returns
615    ///
616    /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
617    #[pyo3(signature = (data_cls, instrument_id=None))]
618    pub fn query_last_timestamp(
619        &self,
620        data_cls: &str,
621        instrument_id: Option<String>,
622    ) -> PyResult<Option<u64>> {
623        self.inner
624            .query_last_timestamp(data_cls, instrument_id)
625            .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
626    }
627
628    /// Get time intervals covered by data for a specific data class and instrument.
629    ///
630    /// # Parameters
631    ///
632    /// - `data_cls`: The data class name
633    /// - `instrument_id`: Optional instrument ID filter
634    ///
635    /// # Returns
636    ///
637    /// Returns a list of (start, end) timestamp tuples representing covered intervals.
638    #[pyo3(signature = (data_cls, instrument_id=None))]
639    pub fn get_intervals(
640        &self,
641        data_cls: &str,
642        instrument_id: Option<String>,
643    ) -> PyResult<Vec<(u64, u64)>> {
644        self.inner
645            .get_intervals(data_cls, instrument_id)
646            .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
647    }
648
649    /// Query quote tick data from Parquet files.
650    ///
651    /// # Parameters
652    ///
653    /// - `instrument_ids`: Optional list of instrument IDs to filter by
654    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
655    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
656    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
657    ///
658    /// # Returns
659    ///
660    /// Returns a vector of `QuoteTick` objects matching the query criteria.
661    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
662    pub fn query_quote_ticks(
663        &mut self,
664        instrument_ids: Option<Vec<String>>,
665        start: Option<u64>,
666        end: Option<u64>,
667        where_clause: Option<String>,
668    ) -> PyResult<Vec<QuoteTick>> {
669        // Convert u64 timestamps to UnixNanos
670        let start_nanos = start.map(UnixNanos::from);
671        let end_nanos = end.map(UnixNanos::from);
672
673        // Use the backend catalog's generic query_typed_data function
674        self.inner
675            .query_typed_data::<QuoteTick>(
676                instrument_ids,
677                start_nanos,
678                end_nanos,
679                where_clause.as_deref(),
680                None,
681            )
682            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
683    }
684
685    /// Query trade tick data from Parquet files.
686    ///
687    /// # Parameters
688    ///
689    /// - `instrument_ids`: Optional list of instrument IDs to filter by
690    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
691    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
692    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
693    ///
694    /// # Returns
695    ///
696    /// Returns a vector of `TradeTick` objects matching the query criteria.
697    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
698    pub fn query_trade_ticks(
699        &mut self,
700        instrument_ids: Option<Vec<String>>,
701        start: Option<u64>,
702        end: Option<u64>,
703        where_clause: Option<String>,
704    ) -> PyResult<Vec<TradeTick>> {
705        // Convert u64 timestamps to UnixNanos
706        let start_nanos = start.map(UnixNanos::from);
707        let end_nanos = end.map(UnixNanos::from);
708
709        // Use the backend catalog's generic query_typed_data function
710        self.inner
711            .query_typed_data::<TradeTick>(
712                instrument_ids,
713                start_nanos,
714                end_nanos,
715                where_clause.as_deref(),
716                None,
717            )
718            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
719    }
720
721    /// Query order book delta data from Parquet files.
722    ///
723    /// # Parameters
724    ///
725    /// - `instrument_ids`: Optional list of instrument IDs to filter by
726    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
727    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
728    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
729    ///
730    /// # Returns
731    ///
732    /// Returns a vector of `OrderBookDelta` objects matching the query criteria.
733    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
734    pub fn query_order_book_deltas(
735        &mut self,
736        instrument_ids: Option<Vec<String>>,
737        start: Option<u64>,
738        end: Option<u64>,
739        where_clause: Option<String>,
740    ) -> PyResult<Vec<OrderBookDelta>> {
741        // Convert u64 timestamps to UnixNanos
742        let start_nanos = start.map(UnixNanos::from);
743        let end_nanos = end.map(UnixNanos::from);
744
745        // Use the backend catalog's generic query_typed_data function
746        self.inner
747            .query_typed_data::<OrderBookDelta>(
748                instrument_ids,
749                start_nanos,
750                end_nanos,
751                where_clause.as_deref(),
752                None,
753            )
754            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
755    }
756
757    /// Query bar data from Parquet files.
758    ///
759    /// # Parameters
760    ///
761    /// - `instrument_ids`: Optional list of instrument IDs to filter by
762    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
763    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
764    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
765    ///
766    /// # Returns
767    ///
768    /// Returns a vector of Bar objects matching the query criteria.
769    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
770    pub fn query_bars(
771        &mut self,
772        instrument_ids: Option<Vec<String>>,
773        start: Option<u64>,
774        end: Option<u64>,
775        where_clause: Option<String>,
776    ) -> PyResult<Vec<Bar>> {
777        // Convert u64 timestamps to UnixNanos
778        let start_nanos = start.map(UnixNanos::from);
779        let end_nanos = end.map(UnixNanos::from);
780
781        // Use the backend catalog's generic query_typed_data function
782        self.inner
783            .query_typed_data::<Bar>(
784                instrument_ids,
785                start_nanos,
786                end_nanos,
787                where_clause.as_deref(),
788                None,
789            )
790            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
791    }
792
793    /// Query order book depth data from Parquet files.
794    ///
795    /// # Parameters
796    ///
797    /// - `instrument_ids`: Optional list of instrument IDs to filter by
798    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
799    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
800    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
801    ///
802    /// # Returns
803    ///
804    /// Returns a vector of `OrderBookDepth10` objects matching the query criteria.
805    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
806    pub fn query_order_book_depths(
807        &mut self,
808        instrument_ids: Option<Vec<String>>,
809        start: Option<u64>,
810        end: Option<u64>,
811        where_clause: Option<String>,
812    ) -> PyResult<Vec<OrderBookDepth10>> {
813        // Convert u64 timestamps to UnixNanos
814        let start_nanos = start.map(UnixNanos::from);
815        let end_nanos = end.map(UnixNanos::from);
816
817        // Use the backend catalog's generic query_typed_data function
818        self.inner
819            .query_typed_data::<OrderBookDepth10>(
820                instrument_ids,
821                start_nanos,
822                end_nanos,
823                where_clause.as_deref(),
824                None,
825            )
826            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
827    }
828
829    /// Query mark price update data from Parquet files.
830    ///
831    /// # Parameters
832    ///
833    /// - `instrument_ids`: Optional list of instrument IDs to filter by
834    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
835    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
836    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
837    ///
838    /// # Returns
839    ///
840    /// Returns a vector of `MarkPriceUpdate` objects matching the query criteria.
841    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
842    pub fn query_mark_price_updates(
843        &mut self,
844        instrument_ids: Option<Vec<String>>,
845        start: Option<u64>,
846        end: Option<u64>,
847        where_clause: Option<String>,
848    ) -> PyResult<Vec<MarkPriceUpdate>> {
849        // Convert u64 timestamps to UnixNanos
850        let start_nanos = start.map(UnixNanos::from);
851        let end_nanos = end.map(UnixNanos::from);
852
853        // Use the backend catalog's generic query_typed_data function
854        self.inner
855            .query_typed_data::<MarkPriceUpdate>(
856                instrument_ids,
857                start_nanos,
858                end_nanos,
859                where_clause.as_deref(),
860                None,
861            )
862            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
863    }
864
865    /// Query index price update data from Parquet files.
866    ///
867    /// # Parameters
868    ///
869    /// - `instrument_ids`: Optional list of instrument IDs to filter by
870    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
871    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
872    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
873    ///
874    /// # Returns
875    ///
876    /// Returns a vector of `IndexPriceUpdate` objects matching the query criteria.
877    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
878    pub fn query_index_price_updates(
879        &mut self,
880        instrument_ids: Option<Vec<String>>,
881        start: Option<u64>,
882        end: Option<u64>,
883        where_clause: Option<String>,
884    ) -> PyResult<Vec<IndexPriceUpdate>> {
885        // Convert u64 timestamps to UnixNanos
886        let start_nanos = start.map(UnixNanos::from);
887        let end_nanos = end.map(UnixNanos::from);
888
889        // Use the backend catalog's generic query_typed_data function
890        self.inner
891            .query_typed_data::<IndexPriceUpdate>(
892                instrument_ids,
893                start_nanos,
894                end_nanos,
895                where_clause.as_deref(),
896                None,
897            )
898            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
899    }
900}