nautilus_persistence/python/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::UnixNanos;
17use nautilus_model::data::{
18    Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
19};
20use pyo3::{exceptions::PyIOError, prelude::*};
21
22use crate::backend::catalog::ParquetDataCatalog;
23
24/// A catalog for writing data to Parquet files.
25#[cfg_attr(
26    feature = "python",
27    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.persistence")
28)]
29pub struct ParquetDataCatalogV2 {
30    inner: ParquetDataCatalog,
31}
32
33#[pymethods]
34impl ParquetDataCatalogV2 {
35    /// Create a new `ParquetCatalog` with the given base path and optional parameters.
36    ///
37    /// # Parameters
38    ///
39    /// - `base_path`: The base path for the catalog
40    /// - `storage_options`: Optional storage configuration for cloud backends
41    /// - `batch_size`: Optional batch size for processing (default: 5000)
42    /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
43    /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
44    #[new]
45    #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
46    #[must_use]
47    pub fn new(
48        base_path: String,
49        storage_options: Option<std::collections::HashMap<String, String>>,
50        batch_size: Option<usize>,
51        compression: Option<u8>,
52        max_row_group_size: Option<usize>,
53    ) -> Self {
54        let compression = compression.map(|c| match c {
55            0 => parquet::basic::Compression::UNCOMPRESSED,
56            1 => parquet::basic::Compression::SNAPPY,
57            // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
58            // since we can't pass the level parameter through PyO3
59            2 => {
60                let level = Default::default();
61                parquet::basic::Compression::GZIP(level)
62            }
63            3 => parquet::basic::Compression::LZO,
64            4 => {
65                let level = Default::default();
66                parquet::basic::Compression::BROTLI(level)
67            }
68            5 => parquet::basic::Compression::LZ4,
69            6 => {
70                let level = Default::default();
71                parquet::basic::Compression::ZSTD(level)
72            }
73            _ => parquet::basic::Compression::SNAPPY,
74        });
75
76        Self {
77            inner: ParquetDataCatalog::from_uri(
78                &base_path,
79                storage_options,
80                batch_size,
81                compression,
82                max_row_group_size,
83            )
84            .expect("Failed to create ParquetDataCatalog"),
85        }
86    }
87
88    // TODO: Cannot pass mixed data across pyo3 as a single type
89    // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: NautilusDataType, data: Vec<Data>) {}
90
91    /// Write quote tick data to Parquet files.
92    ///
93    /// # Parameters
94    ///
95    /// - `data`: Vector of quote ticks to write
96    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
97    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
98    ///
99    /// # Returns
100    ///
101    /// Returns the path of the created file as a string.
102    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
103    pub fn write_quote_ticks(
104        &self,
105        data: Vec<QuoteTick>,
106        start: Option<u64>,
107        end: Option<u64>,
108        skip_disjoint_check: bool,
109    ) -> PyResult<String> {
110        // Convert u64 timestamps to UnixNanos
111        let start_nanos = start.map(UnixNanos::from);
112        let end_nanos = end.map(UnixNanos::from);
113
114        self.inner
115            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
116            .map(|path| path.to_string_lossy().to_string())
117            .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
118    }
119
120    /// Write trade tick data to Parquet files.
121    ///
122    /// # Parameters
123    ///
124    /// - `data`: Vector of trade ticks to write
125    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
126    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
127    ///
128    /// # Returns
129    ///
130    /// Returns the path of the created file as a string.
131    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
132    pub fn write_trade_ticks(
133        &self,
134        data: Vec<TradeTick>,
135        start: Option<u64>,
136        end: Option<u64>,
137        skip_disjoint_check: bool,
138    ) -> PyResult<String> {
139        // Convert u64 timestamps to UnixNanos
140        let start_nanos = start.map(UnixNanos::from);
141        let end_nanos = end.map(UnixNanos::from);
142
143        self.inner
144            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
145            .map(|path| path.to_string_lossy().to_string())
146            .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
147    }
148
149    /// Write order book delta data to Parquet files.
150    ///
151    /// # Parameters
152    ///
153    /// - `data`: Vector of order book deltas to write
154    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
155    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
156    ///
157    /// # Returns
158    ///
159    /// Returns the path of the created file as a string.
160    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
161    pub fn write_order_book_deltas(
162        &self,
163        data: Vec<OrderBookDelta>,
164        start: Option<u64>,
165        end: Option<u64>,
166        skip_disjoint_check: bool,
167    ) -> PyResult<String> {
168        // Convert u64 timestamps to UnixNanos
169        let start_nanos = start.map(UnixNanos::from);
170        let end_nanos = end.map(UnixNanos::from);
171
172        self.inner
173            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
174            .map(|path| path.to_string_lossy().to_string())
175            .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
176    }
177
178    /// Write bar data to Parquet files.
179    ///
180    /// # Parameters
181    ///
182    /// - `data`: Vector of bars to write
183    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
184    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
185    ///
186    /// # Returns
187    ///
188    /// Returns the path of the created file as a string.
189    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
190    pub fn write_bars(
191        &self,
192        data: Vec<Bar>,
193        start: Option<u64>,
194        end: Option<u64>,
195        skip_disjoint_check: bool,
196    ) -> PyResult<String> {
197        // Convert u64 timestamps to UnixNanos
198        let start_nanos = start.map(UnixNanos::from);
199        let end_nanos = end.map(UnixNanos::from);
200
201        self.inner
202            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
203            .map(|path| path.to_string_lossy().to_string())
204            .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
205    }
206
207    /// Write order book depth data to Parquet files.
208    ///
209    /// # Parameters
210    ///
211    /// - `data`: Vector of order book depths to write
212    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
213    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
214    ///
215    /// # Returns
216    ///
217    /// Returns the path of the created file as a string.
218    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
219    pub fn write_order_book_depths(
220        &self,
221        data: Vec<OrderBookDepth10>,
222        start: Option<u64>,
223        end: Option<u64>,
224        skip_disjoint_check: bool,
225    ) -> PyResult<String> {
226        // Convert u64 timestamps to UnixNanos
227        let start_nanos = start.map(UnixNanos::from);
228        let end_nanos = end.map(UnixNanos::from);
229
230        self.inner
231            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
232            .map(|path| path.to_string_lossy().to_string())
233            .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
234    }
235
236    /// Write mark price update data to Parquet files.
237    ///
238    /// # Parameters
239    ///
240    /// - `data`: Vector of mark price updates to write
241    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
242    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
243    ///
244    /// # Returns
245    ///
246    /// Returns the path of the created file as a string.
247    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
248    pub fn write_mark_price_updates(
249        &self,
250        data: Vec<MarkPriceUpdate>,
251        start: Option<u64>,
252        end: Option<u64>,
253        skip_disjoint_check: bool,
254    ) -> PyResult<String> {
255        // Convert u64 timestamps to UnixNanos
256        let start_nanos = start.map(UnixNanos::from);
257        let end_nanos = end.map(UnixNanos::from);
258
259        self.inner
260            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
261            .map(|path| path.to_string_lossy().to_string())
262            .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
263    }
264
265    /// Write index price update data to Parquet files.
266    ///
267    /// # Parameters
268    ///
269    /// - `data`: Vector of index price updates to write
270    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
271    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
272    ///
273    /// # Returns
274    ///
275    /// Returns the path of the created file as a string.
276    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
277    pub fn write_index_price_updates(
278        &self,
279        data: Vec<IndexPriceUpdate>,
280        start: Option<u64>,
281        end: Option<u64>,
282        skip_disjoint_check: bool,
283    ) -> PyResult<String> {
284        // Convert u64 timestamps to UnixNanos
285        let start_nanos = start.map(UnixNanos::from);
286        let end_nanos = end.map(UnixNanos::from);
287
288        self.inner
289            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
290            .map(|path| path.to_string_lossy().to_string())
291            .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
292    }
293
294    /// Extend file names in the catalog with additional timestamp information.
295    ///
296    /// # Parameters
297    ///
298    /// - `data_cls`: The data class name
299    /// - `instrument_id`: Optional instrument ID filter
300    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
301    /// - `end`: End timestamp (nanoseconds since Unix epoch)
302    #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
303    pub fn extend_file_name(
304        &self,
305        data_cls: &str,
306        instrument_id: Option<String>,
307        start: u64,
308        end: u64,
309    ) -> PyResult<()> {
310        // Convert u64 timestamps to UnixNanos
311        let start_nanos = UnixNanos::from(start);
312        let end_nanos = UnixNanos::from(end);
313
314        self.inner
315            .extend_file_name(data_cls, instrument_id, start_nanos, end_nanos)
316            .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
317    }
318
319    /// Consolidate all data files in the catalog within the specified time range.
320    ///
321    /// # Parameters
322    ///
323    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
324    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
325    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
326    #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None))]
327    pub fn consolidate_catalog(
328        &self,
329        start: Option<u64>,
330        end: Option<u64>,
331        ensure_contiguous_files: Option<bool>,
332    ) -> PyResult<()> {
333        // Convert u64 timestamps to UnixNanos
334        let start_nanos = start.map(UnixNanos::from);
335        let end_nanos = end.map(UnixNanos::from);
336
337        self.inner
338            .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files)
339            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
340    }
341
342    /// Consolidate data files for a specific data type within the specified time range.
343    ///
344    /// # Parameters
345    ///
346    /// - `type_name`: The data type name to consolidate
347    /// - `instrument_id`: Optional instrument ID filter
348    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
349    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
350    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
351    #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None))]
352    pub fn consolidate_data(
353        &self,
354        type_name: &str,
355        instrument_id: Option<String>,
356        start: Option<u64>,
357        end: Option<u64>,
358        ensure_contiguous_files: Option<bool>,
359    ) -> PyResult<()> {
360        // Convert u64 timestamps to UnixNanos
361        let start_nanos = start.map(UnixNanos::from);
362        let end_nanos = end.map(UnixNanos::from);
363
364        self.inner
365            .consolidate_data(
366                type_name,
367                instrument_id,
368                start_nanos,
369                end_nanos,
370                ensure_contiguous_files,
371            )
372            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
373    }
374
375    /// Consolidate all data files in the catalog by splitting them into fixed time periods.
376    ///
377    /// This method identifies all leaf directories in the catalog that contain parquet files
378    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
379    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
380    /// and instrument IDs in the catalog.
381    ///
382    /// # Parameters
383    ///
384    /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
385    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
386    /// - `start`: Optional start timestamp for the consolidation range (nanoseconds since Unix epoch)
387    /// - `end`: Optional end timestamp for the consolidation range (nanoseconds since Unix epoch)
388    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
389    #[pyo3(signature = (period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
390    pub fn consolidate_catalog_by_period(
391        &mut self,
392        period_nanos: Option<u64>,
393        start: Option<u64>,
394        end: Option<u64>,
395        ensure_contiguous_files: Option<bool>,
396    ) -> PyResult<()> {
397        // Convert u64 timestamps to UnixNanos
398        let start_nanos = start.map(UnixNanos::from);
399        let end_nanos = end.map(UnixNanos::from);
400
401        self.inner
402            .consolidate_catalog_by_period(
403                period_nanos,
404                start_nanos,
405                end_nanos,
406                ensure_contiguous_files,
407            )
408            .map_err(|e| {
409                PyIOError::new_err(format!("Failed to consolidate catalog by period: {e}"))
410            })
411    }
412
413    /// Consolidate data files by splitting them into fixed time periods.
414    ///
415    /// This method queries data by period and writes consolidated files immediately,
416    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
417    /// the function automatically splits those files to preserve all data.
418    ///
419    /// # Parameters
420    ///
421    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
422    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments
423    /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
424    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
425    /// - `start`: Optional start timestamp for consolidation range (nanoseconds since Unix epoch)
426    /// - `end`: Optional end timestamp for consolidation range (nanoseconds since Unix epoch)
427    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
428    #[pyo3(signature = (type_name, identifier=None, period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
429    pub fn consolidate_data_by_period(
430        &mut self,
431        type_name: &str,
432        identifier: Option<String>,
433        period_nanos: Option<u64>,
434        start: Option<u64>,
435        end: Option<u64>,
436        ensure_contiguous_files: Option<bool>,
437    ) -> PyResult<()> {
438        // Convert u64 timestamps to UnixNanos
439        let start_nanos = start.map(UnixNanos::from);
440        let end_nanos = end.map(UnixNanos::from);
441
442        self.inner
443            .consolidate_data_by_period(
444                type_name,
445                identifier,
446                period_nanos,
447                start_nanos,
448                end_nanos,
449                ensure_contiguous_files,
450            )
451            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data by period: {e}")))
452    }
453
454    /// Reset all catalog file names to their canonical form.
455    pub fn reset_all_file_names(&self) -> PyResult<()> {
456        self.inner
457            .reset_all_file_names()
458            .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
459    }
460
461    /// Reset data file names for a specific data class to their canonical form.
462    ///
463    /// # Parameters
464    ///
465    /// - `data_cls`: The data class name
466    /// - `instrument_id`: Optional instrument ID filter
467    #[pyo3(signature = (data_cls, instrument_id=None))]
468    pub fn reset_data_file_names(
469        &self,
470        data_cls: &str,
471        instrument_id: Option<String>,
472    ) -> PyResult<()> {
473        self.inner
474            .reset_data_file_names(data_cls, instrument_id)
475            .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
476    }
477
478    /// Delete data within a specified time range across the entire catalog.
479    ///
480    /// This method identifies all leaf directories in the catalog that contain parquet files
481    /// and deletes data within the specified time range from each directory. A leaf directory
482    /// is one that contains files but no subdirectories. This is a convenience method that
483    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
484    ///
485    /// # Parameters
486    ///
487    /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
488    /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
489    ///
490    /// # Notes
491    ///
492    /// - This operation permanently removes data and cannot be undone
493    /// - The deletion process handles file intersections intelligently by splitting files
494    ///   when they partially overlap with the deletion range
495    /// - Files completely within the deletion range are removed entirely
496    /// - Files partially overlapping the deletion range are split to preserve data outside the range
497    /// - This method is useful for bulk data cleanup operations across the entire catalog
498    /// - Empty directories are not automatically removed after deletion
499    #[pyo3(signature = (start=None, end=None))]
500    pub fn delete_catalog_range(&mut self, start: Option<u64>, end: Option<u64>) -> PyResult<()> {
501        // Convert u64 timestamps to UnixNanos
502        let start_nanos = start.map(UnixNanos::from);
503        let end_nanos = end.map(UnixNanos::from);
504
505        self.inner
506            .delete_catalog_range(start_nanos, end_nanos)
507            .map_err(|e| PyIOError::new_err(format!("Failed to delete catalog range: {e}")))
508    }
509
510    /// Delete data within a specified time range for a specific data type and instrument.
511    ///
512    /// This method identifies all parquet files that intersect with the specified time range
513    /// and handles them appropriately:
514    /// - Files completely within the range are deleted
515    /// - Files partially overlapping the range are split to preserve data outside the range
516    /// - The original intersecting files are removed after processing
517    ///
518    /// # Parameters
519    ///
520    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
521    /// - `instrument_id`: Optional instrument ID to delete data for. If None, deletes data across all instruments
522    /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
523    /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
524    ///
525    /// # Notes
526    ///
527    /// - This operation permanently removes data and cannot be undone
528    /// - Files that partially overlap the deletion range are split to preserve data outside the range
529    /// - The method ensures data integrity by using atomic operations where possible
530    /// - Empty directories are not automatically removed after deletion
531    #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None))]
532    pub fn delete_data_range(
533        &mut self,
534        type_name: &str,
535        instrument_id: Option<String>,
536        start: Option<u64>,
537        end: Option<u64>,
538    ) -> PyResult<()> {
539        // Convert u64 timestamps to UnixNanos
540        let start_nanos = start.map(UnixNanos::from);
541        let end_nanos = end.map(UnixNanos::from);
542
543        self.inner
544            .delete_data_range(type_name, instrument_id, start_nanos, end_nanos)
545            .map_err(|e| PyIOError::new_err(format!("Failed to delete data range: {e}")))
546    }
547
548    /// Query files in the catalog matching the specified criteria.
549    ///
550    /// # Parameters
551    ///
552    /// - `data_cls`: The data class name to query
553    /// - `instrument_ids`: Optional list of instrument IDs to filter by
554    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
555    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
556    ///
557    /// # Returns
558    ///
559    /// Returns a list of file paths matching the criteria.
560    #[pyo3(signature = (data_cls, instrument_ids=None, start=None, end=None))]
561    pub fn query_files(
562        &self,
563        data_cls: &str,
564        instrument_ids: Option<Vec<String>>,
565        start: Option<u64>,
566        end: Option<u64>,
567    ) -> PyResult<Vec<String>> {
568        // Convert u64 timestamps to UnixNanos
569        let start_nanos = start.map(UnixNanos::from);
570        let end_nanos = end.map(UnixNanos::from);
571
572        self.inner
573            .query_files(data_cls, instrument_ids, start_nanos, end_nanos)
574            .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
575    }
576
577    /// Get missing time intervals for a data request.
578    ///
579    /// # Parameters
580    ///
581    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
582    /// - `end`: End timestamp (nanoseconds since Unix epoch)
583    /// - `data_cls`: The data class name
584    /// - `instrument_id`: Optional instrument ID filter
585    ///
586    /// # Returns
587    ///
588    /// Returns a list of (start, end) timestamp tuples representing missing intervals.
589    #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
590    pub fn get_missing_intervals_for_request(
591        &self,
592        start: u64,
593        end: u64,
594        data_cls: &str,
595        instrument_id: Option<String>,
596    ) -> PyResult<Vec<(u64, u64)>> {
597        self.inner
598            .get_missing_intervals_for_request(start, end, data_cls, instrument_id)
599            .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
600    }
601
602    /// Query the last timestamp for a specific data class and instrument.
603    ///
604    /// # Parameters
605    ///
606    /// - `data_cls`: The data class name
607    /// - `instrument_id`: Optional instrument ID filter
608    ///
609    /// # Returns
610    ///
611    /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
612    #[pyo3(signature = (data_cls, instrument_id=None))]
613    pub fn query_last_timestamp(
614        &self,
615        data_cls: &str,
616        instrument_id: Option<String>,
617    ) -> PyResult<Option<u64>> {
618        self.inner
619            .query_last_timestamp(data_cls, instrument_id)
620            .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
621    }
622
623    /// Get time intervals covered by data for a specific data class and instrument.
624    ///
625    /// # Parameters
626    ///
627    /// - `data_cls`: The data class name
628    /// - `instrument_id`: Optional instrument ID filter
629    ///
630    /// # Returns
631    ///
632    /// Returns a list of (start, end) timestamp tuples representing covered intervals.
633    #[pyo3(signature = (data_cls, instrument_id=None))]
634    pub fn get_intervals(
635        &self,
636        data_cls: &str,
637        instrument_id: Option<String>,
638    ) -> PyResult<Vec<(u64, u64)>> {
639        self.inner
640            .get_intervals(data_cls, instrument_id)
641            .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
642    }
643
644    /// Query quote tick data from Parquet files.
645    ///
646    /// # Parameters
647    ///
648    /// - `instrument_ids`: Optional list of instrument IDs to filter by
649    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
650    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
651    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
652    ///
653    /// # Returns
654    ///
655    /// Returns a vector of `QuoteTick` objects matching the query criteria.
656    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
657    pub fn query_quote_ticks(
658        &mut self,
659        instrument_ids: Option<Vec<String>>,
660        start: Option<u64>,
661        end: Option<u64>,
662        where_clause: Option<String>,
663    ) -> PyResult<Vec<QuoteTick>> {
664        // Convert u64 timestamps to UnixNanos
665        let start_nanos = start.map(UnixNanos::from);
666        let end_nanos = end.map(UnixNanos::from);
667
668        // Use the backend catalog's generic query_typed_data function
669        self.inner
670            .query_typed_data::<QuoteTick>(
671                instrument_ids,
672                start_nanos,
673                end_nanos,
674                where_clause.as_deref(),
675                None,
676            )
677            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
678    }
679
680    /// Query trade tick data from Parquet files.
681    ///
682    /// # Parameters
683    ///
684    /// - `instrument_ids`: Optional list of instrument IDs to filter by
685    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
686    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
687    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
688    ///
689    /// # Returns
690    ///
691    /// Returns a vector of `TradeTick` objects matching the query criteria.
692    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
693    pub fn query_trade_ticks(
694        &mut self,
695        instrument_ids: Option<Vec<String>>,
696        start: Option<u64>,
697        end: Option<u64>,
698        where_clause: Option<String>,
699    ) -> PyResult<Vec<TradeTick>> {
700        // Convert u64 timestamps to UnixNanos
701        let start_nanos = start.map(UnixNanos::from);
702        let end_nanos = end.map(UnixNanos::from);
703
704        // Use the backend catalog's generic query_typed_data function
705        self.inner
706            .query_typed_data::<TradeTick>(
707                instrument_ids,
708                start_nanos,
709                end_nanos,
710                where_clause.as_deref(),
711                None,
712            )
713            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
714    }
715
716    /// Query order book delta data from Parquet files.
717    ///
718    /// # Parameters
719    ///
720    /// - `instrument_ids`: Optional list of instrument IDs to filter by
721    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
722    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
723    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
724    ///
725    /// # Returns
726    ///
727    /// Returns a vector of `OrderBookDelta` objects matching the query criteria.
728    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
729    pub fn query_order_book_deltas(
730        &mut self,
731        instrument_ids: Option<Vec<String>>,
732        start: Option<u64>,
733        end: Option<u64>,
734        where_clause: Option<String>,
735    ) -> PyResult<Vec<OrderBookDelta>> {
736        // Convert u64 timestamps to UnixNanos
737        let start_nanos = start.map(UnixNanos::from);
738        let end_nanos = end.map(UnixNanos::from);
739
740        // Use the backend catalog's generic query_typed_data function
741        self.inner
742            .query_typed_data::<OrderBookDelta>(
743                instrument_ids,
744                start_nanos,
745                end_nanos,
746                where_clause.as_deref(),
747                None,
748            )
749            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
750    }
751
752    /// Query bar data from Parquet files.
753    ///
754    /// # Parameters
755    ///
756    /// - `instrument_ids`: Optional list of instrument IDs to filter by
757    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
758    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
759    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
760    ///
761    /// # Returns
762    ///
763    /// Returns a vector of Bar objects matching the query criteria.
764    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
765    pub fn query_bars(
766        &mut self,
767        instrument_ids: Option<Vec<String>>,
768        start: Option<u64>,
769        end: Option<u64>,
770        where_clause: Option<String>,
771    ) -> PyResult<Vec<Bar>> {
772        // Convert u64 timestamps to UnixNanos
773        let start_nanos = start.map(UnixNanos::from);
774        let end_nanos = end.map(UnixNanos::from);
775
776        // Use the backend catalog's generic query_typed_data function
777        self.inner
778            .query_typed_data::<Bar>(
779                instrument_ids,
780                start_nanos,
781                end_nanos,
782                where_clause.as_deref(),
783                None,
784            )
785            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
786    }
787
788    /// Query order book depth data from Parquet files.
789    ///
790    /// # Parameters
791    ///
792    /// - `instrument_ids`: Optional list of instrument IDs to filter by
793    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
794    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
795    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
796    ///
797    /// # Returns
798    ///
799    /// Returns a vector of `OrderBookDepth10` objects matching the query criteria.
800    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
801    pub fn query_order_book_depths(
802        &mut self,
803        instrument_ids: Option<Vec<String>>,
804        start: Option<u64>,
805        end: Option<u64>,
806        where_clause: Option<String>,
807    ) -> PyResult<Vec<OrderBookDepth10>> {
808        // Convert u64 timestamps to UnixNanos
809        let start_nanos = start.map(UnixNanos::from);
810        let end_nanos = end.map(UnixNanos::from);
811
812        // Use the backend catalog's generic query_typed_data function
813        self.inner
814            .query_typed_data::<OrderBookDepth10>(
815                instrument_ids,
816                start_nanos,
817                end_nanos,
818                where_clause.as_deref(),
819                None,
820            )
821            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
822    }
823
824    /// Query mark price update data from Parquet files.
825    ///
826    /// # Parameters
827    ///
828    /// - `instrument_ids`: Optional list of instrument IDs to filter by
829    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
830    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
831    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
832    ///
833    /// # Returns
834    ///
835    /// Returns a vector of `MarkPriceUpdate` objects matching the query criteria.
836    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
837    pub fn query_mark_price_updates(
838        &mut self,
839        instrument_ids: Option<Vec<String>>,
840        start: Option<u64>,
841        end: Option<u64>,
842        where_clause: Option<String>,
843    ) -> PyResult<Vec<MarkPriceUpdate>> {
844        // Convert u64 timestamps to UnixNanos
845        let start_nanos = start.map(UnixNanos::from);
846        let end_nanos = end.map(UnixNanos::from);
847
848        // Use the backend catalog's generic query_typed_data function
849        self.inner
850            .query_typed_data::<MarkPriceUpdate>(
851                instrument_ids,
852                start_nanos,
853                end_nanos,
854                where_clause.as_deref(),
855                None,
856            )
857            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
858    }
859
860    /// Query index price update data from Parquet files.
861    ///
862    /// # Parameters
863    ///
864    /// - `instrument_ids`: Optional list of instrument IDs to filter by
865    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
866    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
867    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
868    ///
869    /// # Returns
870    ///
871    /// Returns a vector of `IndexPriceUpdate` objects matching the query criteria.
872    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
873    pub fn query_index_price_updates(
874        &mut self,
875        instrument_ids: Option<Vec<String>>,
876        start: Option<u64>,
877        end: Option<u64>,
878        where_clause: Option<String>,
879    ) -> PyResult<Vec<IndexPriceUpdate>> {
880        // Convert u64 timestamps to UnixNanos
881        let start_nanos = start.map(UnixNanos::from);
882        let end_nanos = end.map(UnixNanos::from);
883
884        // Use the backend catalog's generic query_typed_data function
885        self.inner
886            .query_typed_data::<IndexPriceUpdate>(
887                instrument_ids,
888                start_nanos,
889                end_nanos,
890                where_clause.as_deref(),
891                None,
892            )
893            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
894    }
895}