nautilus_persistence/python/catalog.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_core::UnixNanos;
19use nautilus_model::data::{
20 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
21};
22use pyo3::{exceptions::PyIOError, prelude::*};
23
24use crate::backend::catalog::ParquetDataCatalog;
25
26/// A catalog for writing data to Parquet files.
27#[cfg_attr(
28 feature = "python",
29 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.persistence")
30)]
31pub struct ParquetDataCatalogV2 {
32 inner: ParquetDataCatalog,
33}
34
35#[pymethods]
36impl ParquetDataCatalogV2 {
37 /// Create a new `ParquetCatalog` with the given base path and optional parameters.
38 ///
39 /// # Parameters
40 ///
41 /// - `base_path`: The base path for the catalog
42 /// - `storage_options`: Optional storage configuration for cloud backends
43 /// - `batch_size`: Optional batch size for processing (default: 5000)
44 /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
45 /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
46 #[new]
47 #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
48 #[must_use]
49 pub fn new(
50 base_path: String,
51 storage_options: Option<HashMap<String, String>>,
52 batch_size: Option<usize>,
53 compression: Option<u8>,
54 max_row_group_size: Option<usize>,
55 ) -> Self {
56 let compression = compression.map(|c| match c {
57 0 => parquet::basic::Compression::UNCOMPRESSED,
58 1 => parquet::basic::Compression::SNAPPY,
59 // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
60 // since we can't pass the level parameter through PyO3
61 2 => {
62 let level = Default::default();
63 parquet::basic::Compression::GZIP(level)
64 }
65 3 => parquet::basic::Compression::LZO,
66 4 => {
67 let level = Default::default();
68 parquet::basic::Compression::BROTLI(level)
69 }
70 5 => parquet::basic::Compression::LZ4,
71 6 => {
72 let level = Default::default();
73 parquet::basic::Compression::ZSTD(level)
74 }
75 _ => parquet::basic::Compression::SNAPPY,
76 });
77
78 // Convert HashMap to AHashMap for internal use
79 let storage_options = storage_options.map(|m| m.into_iter().collect());
80
81 Self {
82 inner: ParquetDataCatalog::from_uri(
83 &base_path,
84 storage_options,
85 batch_size,
86 compression,
87 max_row_group_size,
88 )
89 .expect("Failed to create ParquetDataCatalog"),
90 }
91 }
92
93 // TODO: Cannot pass mixed data across pyo3 as a single type
94 // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: NautilusDataType, data: Vec<Data>) {}
95
96 /// Write quote tick data to Parquet files.
97 ///
98 /// # Parameters
99 ///
100 /// - `data`: Vector of quote ticks to write
101 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
102 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
103 ///
104 /// # Returns
105 ///
106 /// Returns the path of the created file as a string.
107 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
108 pub fn write_quote_ticks(
109 &self,
110 data: Vec<QuoteTick>,
111 start: Option<u64>,
112 end: Option<u64>,
113 skip_disjoint_check: bool,
114 ) -> PyResult<String> {
115 // Convert u64 timestamps to UnixNanos
116 let start_nanos = start.map(UnixNanos::from);
117 let end_nanos = end.map(UnixNanos::from);
118
119 self.inner
120 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
121 .map(|path| path.to_string_lossy().to_string())
122 .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
123 }
124
125 /// Write trade tick data to Parquet files.
126 ///
127 /// # Parameters
128 ///
129 /// - `data`: Vector of trade ticks to write
130 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
131 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
132 ///
133 /// # Returns
134 ///
135 /// Returns the path of the created file as a string.
136 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
137 pub fn write_trade_ticks(
138 &self,
139 data: Vec<TradeTick>,
140 start: Option<u64>,
141 end: Option<u64>,
142 skip_disjoint_check: bool,
143 ) -> PyResult<String> {
144 // Convert u64 timestamps to UnixNanos
145 let start_nanos = start.map(UnixNanos::from);
146 let end_nanos = end.map(UnixNanos::from);
147
148 self.inner
149 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
150 .map(|path| path.to_string_lossy().to_string())
151 .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
152 }
153
154 /// Write order book delta data to Parquet files.
155 ///
156 /// # Parameters
157 ///
158 /// - `data`: Vector of order book deltas to write
159 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
160 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
161 ///
162 /// # Returns
163 ///
164 /// Returns the path of the created file as a string.
165 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
166 pub fn write_order_book_deltas(
167 &self,
168 data: Vec<OrderBookDelta>,
169 start: Option<u64>,
170 end: Option<u64>,
171 skip_disjoint_check: bool,
172 ) -> PyResult<String> {
173 // Convert u64 timestamps to UnixNanos
174 let start_nanos = start.map(UnixNanos::from);
175 let end_nanos = end.map(UnixNanos::from);
176
177 self.inner
178 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
179 .map(|path| path.to_string_lossy().to_string())
180 .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
181 }
182
183 /// Write bar data to Parquet files.
184 ///
185 /// # Parameters
186 ///
187 /// - `data`: Vector of bars to write
188 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
189 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
190 ///
191 /// # Returns
192 ///
193 /// Returns the path of the created file as a string.
194 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
195 pub fn write_bars(
196 &self,
197 data: Vec<Bar>,
198 start: Option<u64>,
199 end: Option<u64>,
200 skip_disjoint_check: bool,
201 ) -> PyResult<String> {
202 // Convert u64 timestamps to UnixNanos
203 let start_nanos = start.map(UnixNanos::from);
204 let end_nanos = end.map(UnixNanos::from);
205
206 self.inner
207 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
208 .map(|path| path.to_string_lossy().to_string())
209 .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
210 }
211
212 /// Write order book depth data to Parquet files.
213 ///
214 /// # Parameters
215 ///
216 /// - `data`: Vector of order book depths to write
217 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
218 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
219 ///
220 /// # Returns
221 ///
222 /// Returns the path of the created file as a string.
223 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
224 pub fn write_order_book_depths(
225 &self,
226 data: Vec<OrderBookDepth10>,
227 start: Option<u64>,
228 end: Option<u64>,
229 skip_disjoint_check: bool,
230 ) -> PyResult<String> {
231 // Convert u64 timestamps to UnixNanos
232 let start_nanos = start.map(UnixNanos::from);
233 let end_nanos = end.map(UnixNanos::from);
234
235 self.inner
236 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
237 .map(|path| path.to_string_lossy().to_string())
238 .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
239 }
240
241 /// Write mark price update data to Parquet files.
242 ///
243 /// # Parameters
244 ///
245 /// - `data`: Vector of mark price updates to write
246 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
247 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
248 ///
249 /// # Returns
250 ///
251 /// Returns the path of the created file as a string.
252 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
253 pub fn write_mark_price_updates(
254 &self,
255 data: Vec<MarkPriceUpdate>,
256 start: Option<u64>,
257 end: Option<u64>,
258 skip_disjoint_check: bool,
259 ) -> PyResult<String> {
260 // Convert u64 timestamps to UnixNanos
261 let start_nanos = start.map(UnixNanos::from);
262 let end_nanos = end.map(UnixNanos::from);
263
264 self.inner
265 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
266 .map(|path| path.to_string_lossy().to_string())
267 .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
268 }
269
270 /// Write index price update data to Parquet files.
271 ///
272 /// # Parameters
273 ///
274 /// - `data`: Vector of index price updates to write
275 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
276 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
277 ///
278 /// # Returns
279 ///
280 /// Returns the path of the created file as a string.
281 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
282 pub fn write_index_price_updates(
283 &self,
284 data: Vec<IndexPriceUpdate>,
285 start: Option<u64>,
286 end: Option<u64>,
287 skip_disjoint_check: bool,
288 ) -> PyResult<String> {
289 // Convert u64 timestamps to UnixNanos
290 let start_nanos = start.map(UnixNanos::from);
291 let end_nanos = end.map(UnixNanos::from);
292
293 self.inner
294 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
295 .map(|path| path.to_string_lossy().to_string())
296 .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
297 }
298
299 /// Extend file names in the catalog with additional timestamp information.
300 ///
301 /// # Parameters
302 ///
303 /// - `data_cls`: The data class name
304 /// - `instrument_id`: Optional instrument ID filter
305 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
306 /// - `end`: End timestamp (nanoseconds since Unix epoch)
307 #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
308 pub fn extend_file_name(
309 &self,
310 data_cls: &str,
311 instrument_id: Option<String>,
312 start: u64,
313 end: u64,
314 ) -> PyResult<()> {
315 // Convert u64 timestamps to UnixNanos
316 let start_nanos = UnixNanos::from(start);
317 let end_nanos = UnixNanos::from(end);
318
319 self.inner
320 .extend_file_name(data_cls, instrument_id, start_nanos, end_nanos)
321 .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
322 }
323
324 /// Consolidate all data files in the catalog within the specified time range.
325 ///
326 /// # Parameters
327 ///
328 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
329 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
330 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
331 #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None))]
332 pub fn consolidate_catalog(
333 &self,
334 start: Option<u64>,
335 end: Option<u64>,
336 ensure_contiguous_files: Option<bool>,
337 ) -> PyResult<()> {
338 // Convert u64 timestamps to UnixNanos
339 let start_nanos = start.map(UnixNanos::from);
340 let end_nanos = end.map(UnixNanos::from);
341
342 self.inner
343 .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files)
344 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
345 }
346
347 /// Consolidate data files for a specific data type within the specified time range.
348 ///
349 /// # Parameters
350 ///
351 /// - `type_name`: The data type name to consolidate
352 /// - `instrument_id`: Optional instrument ID filter
353 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
354 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
355 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
356 #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None))]
357 pub fn consolidate_data(
358 &self,
359 type_name: &str,
360 instrument_id: Option<String>,
361 start: Option<u64>,
362 end: Option<u64>,
363 ensure_contiguous_files: Option<bool>,
364 ) -> PyResult<()> {
365 // Convert u64 timestamps to UnixNanos
366 let start_nanos = start.map(UnixNanos::from);
367 let end_nanos = end.map(UnixNanos::from);
368
369 self.inner
370 .consolidate_data(
371 type_name,
372 instrument_id,
373 start_nanos,
374 end_nanos,
375 ensure_contiguous_files,
376 )
377 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
378 }
379
380 /// Consolidate all data files in the catalog by splitting them into fixed time periods.
381 ///
382 /// This method identifies all leaf directories in the catalog that contain parquet files
383 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
384 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
385 /// and instrument IDs in the catalog.
386 ///
387 /// # Parameters
388 ///
389 /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
390 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
391 /// - `start`: Optional start timestamp for the consolidation range (nanoseconds since Unix epoch)
392 /// - `end`: Optional end timestamp for the consolidation range (nanoseconds since Unix epoch)
393 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
394 #[pyo3(signature = (period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
395 pub fn consolidate_catalog_by_period(
396 &mut self,
397 period_nanos: Option<u64>,
398 start: Option<u64>,
399 end: Option<u64>,
400 ensure_contiguous_files: Option<bool>,
401 ) -> PyResult<()> {
402 // Convert u64 timestamps to UnixNanos
403 let start_nanos = start.map(UnixNanos::from);
404 let end_nanos = end.map(UnixNanos::from);
405
406 self.inner
407 .consolidate_catalog_by_period(
408 period_nanos,
409 start_nanos,
410 end_nanos,
411 ensure_contiguous_files,
412 )
413 .map_err(|e| {
414 PyIOError::new_err(format!("Failed to consolidate catalog by period: {e}"))
415 })
416 }
417
418 /// Consolidate data files by splitting them into fixed time periods.
419 ///
420 /// This method queries data by period and writes consolidated files immediately,
421 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
422 /// the function automatically splits those files to preserve all data.
423 ///
424 /// # Parameters
425 ///
426 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
427 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments
428 /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
429 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
430 /// - `start`: Optional start timestamp for consolidation range (nanoseconds since Unix epoch)
431 /// - `end`: Optional end timestamp for consolidation range (nanoseconds since Unix epoch)
432 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
433 #[pyo3(signature = (type_name, identifier=None, period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
434 pub fn consolidate_data_by_period(
435 &mut self,
436 type_name: &str,
437 identifier: Option<String>,
438 period_nanos: Option<u64>,
439 start: Option<u64>,
440 end: Option<u64>,
441 ensure_contiguous_files: Option<bool>,
442 ) -> PyResult<()> {
443 // Convert u64 timestamps to UnixNanos
444 let start_nanos = start.map(UnixNanos::from);
445 let end_nanos = end.map(UnixNanos::from);
446
447 self.inner
448 .consolidate_data_by_period(
449 type_name,
450 identifier,
451 period_nanos,
452 start_nanos,
453 end_nanos,
454 ensure_contiguous_files,
455 )
456 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data by period: {e}")))
457 }
458
459 /// Reset all catalog file names to their canonical form.
460 pub fn reset_all_file_names(&self) -> PyResult<()> {
461 self.inner
462 .reset_all_file_names()
463 .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
464 }
465
466 /// Reset data file names for a specific data class to their canonical form.
467 ///
468 /// # Parameters
469 ///
470 /// - `data_cls`: The data class name
471 /// - `instrument_id`: Optional instrument ID filter
472 #[pyo3(signature = (data_cls, instrument_id=None))]
473 pub fn reset_data_file_names(
474 &self,
475 data_cls: &str,
476 instrument_id: Option<String>,
477 ) -> PyResult<()> {
478 self.inner
479 .reset_data_file_names(data_cls, instrument_id)
480 .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
481 }
482
483 /// Delete data within a specified time range across the entire catalog.
484 ///
485 /// This method identifies all leaf directories in the catalog that contain parquet files
486 /// and deletes data within the specified time range from each directory. A leaf directory
487 /// is one that contains files but no subdirectories. This is a convenience method that
488 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
489 ///
490 /// # Parameters
491 ///
492 /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
493 /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
494 ///
495 /// # Notes
496 ///
497 /// - This operation permanently removes data and cannot be undone
498 /// - The deletion process handles file intersections intelligently by splitting files
499 /// when they partially overlap with the deletion range
500 /// - Files completely within the deletion range are removed entirely
501 /// - Files partially overlapping the deletion range are split to preserve data outside the range
502 /// - This method is useful for bulk data cleanup operations across the entire catalog
503 /// - Empty directories are not automatically removed after deletion
504 #[pyo3(signature = (start=None, end=None))]
505 pub fn delete_catalog_range(&mut self, start: Option<u64>, end: Option<u64>) -> PyResult<()> {
506 // Convert u64 timestamps to UnixNanos
507 let start_nanos = start.map(UnixNanos::from);
508 let end_nanos = end.map(UnixNanos::from);
509
510 self.inner
511 .delete_catalog_range(start_nanos, end_nanos)
512 .map_err(|e| PyIOError::new_err(format!("Failed to delete catalog range: {e}")))
513 }
514
515 /// Delete data within a specified time range for a specific data type and instrument.
516 ///
517 /// This method identifies all parquet files that intersect with the specified time range
518 /// and handles them appropriately:
519 /// - Files completely within the range are deleted
520 /// - Files partially overlapping the range are split to preserve data outside the range
521 /// - The original intersecting files are removed after processing
522 ///
523 /// # Parameters
524 ///
525 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
526 /// - `instrument_id`: Optional instrument ID to delete data for. If None, deletes data across all instruments
527 /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
528 /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
529 ///
530 /// # Notes
531 ///
532 /// - This operation permanently removes data and cannot be undone
533 /// - Files that partially overlap the deletion range are split to preserve data outside the range
534 /// - The method ensures data integrity by using atomic operations where possible
535 /// - Empty directories are not automatically removed after deletion
536 #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None))]
537 pub fn delete_data_range(
538 &mut self,
539 type_name: &str,
540 instrument_id: Option<String>,
541 start: Option<u64>,
542 end: Option<u64>,
543 ) -> PyResult<()> {
544 // Convert u64 timestamps to UnixNanos
545 let start_nanos = start.map(UnixNanos::from);
546 let end_nanos = end.map(UnixNanos::from);
547
548 self.inner
549 .delete_data_range(type_name, instrument_id, start_nanos, end_nanos)
550 .map_err(|e| PyIOError::new_err(format!("Failed to delete data range: {e}")))
551 }
552
553 /// Query files in the catalog matching the specified criteria.
554 ///
555 /// # Parameters
556 ///
557 /// - `data_cls`: The data class name to query
558 /// - `instrument_ids`: Optional list of instrument IDs to filter by
559 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
560 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
561 ///
562 /// # Returns
563 ///
564 /// Returns a list of file paths matching the criteria.
565 #[pyo3(signature = (data_cls, instrument_ids=None, start=None, end=None))]
566 pub fn query_files(
567 &self,
568 data_cls: &str,
569 instrument_ids: Option<Vec<String>>,
570 start: Option<u64>,
571 end: Option<u64>,
572 ) -> PyResult<Vec<String>> {
573 // Convert u64 timestamps to UnixNanos
574 let start_nanos = start.map(UnixNanos::from);
575 let end_nanos = end.map(UnixNanos::from);
576
577 self.inner
578 .query_files(data_cls, instrument_ids, start_nanos, end_nanos)
579 .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
580 }
581
582 /// Get missing time intervals for a data request.
583 ///
584 /// # Parameters
585 ///
586 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
587 /// - `end`: End timestamp (nanoseconds since Unix epoch)
588 /// - `data_cls`: The data class name
589 /// - `instrument_id`: Optional instrument ID filter
590 ///
591 /// # Returns
592 ///
593 /// Returns a list of (start, end) timestamp tuples representing missing intervals.
594 #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
595 pub fn get_missing_intervals_for_request(
596 &self,
597 start: u64,
598 end: u64,
599 data_cls: &str,
600 instrument_id: Option<String>,
601 ) -> PyResult<Vec<(u64, u64)>> {
602 self.inner
603 .get_missing_intervals_for_request(start, end, data_cls, instrument_id)
604 .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
605 }
606
607 /// Query the last timestamp for a specific data class and instrument.
608 ///
609 /// # Parameters
610 ///
611 /// - `data_cls`: The data class name
612 /// - `instrument_id`: Optional instrument ID filter
613 ///
614 /// # Returns
615 ///
616 /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
617 #[pyo3(signature = (data_cls, instrument_id=None))]
618 pub fn query_last_timestamp(
619 &self,
620 data_cls: &str,
621 instrument_id: Option<String>,
622 ) -> PyResult<Option<u64>> {
623 self.inner
624 .query_last_timestamp(data_cls, instrument_id)
625 .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
626 }
627
628 /// Get time intervals covered by data for a specific data class and instrument.
629 ///
630 /// # Parameters
631 ///
632 /// - `data_cls`: The data class name
633 /// - `instrument_id`: Optional instrument ID filter
634 ///
635 /// # Returns
636 ///
637 /// Returns a list of (start, end) timestamp tuples representing covered intervals.
638 #[pyo3(signature = (data_cls, instrument_id=None))]
639 pub fn get_intervals(
640 &self,
641 data_cls: &str,
642 instrument_id: Option<String>,
643 ) -> PyResult<Vec<(u64, u64)>> {
644 self.inner
645 .get_intervals(data_cls, instrument_id)
646 .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
647 }
648
649 /// Query quote tick data from Parquet files.
650 ///
651 /// # Parameters
652 ///
653 /// - `instrument_ids`: Optional list of instrument IDs to filter by
654 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
655 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
656 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
657 ///
658 /// # Returns
659 ///
660 /// Returns a vector of `QuoteTick` objects matching the query criteria.
661 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
662 pub fn query_quote_ticks(
663 &mut self,
664 instrument_ids: Option<Vec<String>>,
665 start: Option<u64>,
666 end: Option<u64>,
667 where_clause: Option<String>,
668 ) -> PyResult<Vec<QuoteTick>> {
669 // Convert u64 timestamps to UnixNanos
670 let start_nanos = start.map(UnixNanos::from);
671 let end_nanos = end.map(UnixNanos::from);
672
673 // Use the backend catalog's generic query_typed_data function
674 self.inner
675 .query_typed_data::<QuoteTick>(
676 instrument_ids,
677 start_nanos,
678 end_nanos,
679 where_clause.as_deref(),
680 None,
681 )
682 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
683 }
684
685 /// Query trade tick data from Parquet files.
686 ///
687 /// # Parameters
688 ///
689 /// - `instrument_ids`: Optional list of instrument IDs to filter by
690 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
691 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
692 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
693 ///
694 /// # Returns
695 ///
696 /// Returns a vector of `TradeTick` objects matching the query criteria.
697 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
698 pub fn query_trade_ticks(
699 &mut self,
700 instrument_ids: Option<Vec<String>>,
701 start: Option<u64>,
702 end: Option<u64>,
703 where_clause: Option<String>,
704 ) -> PyResult<Vec<TradeTick>> {
705 // Convert u64 timestamps to UnixNanos
706 let start_nanos = start.map(UnixNanos::from);
707 let end_nanos = end.map(UnixNanos::from);
708
709 // Use the backend catalog's generic query_typed_data function
710 self.inner
711 .query_typed_data::<TradeTick>(
712 instrument_ids,
713 start_nanos,
714 end_nanos,
715 where_clause.as_deref(),
716 None,
717 )
718 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
719 }
720
721 /// Query order book delta data from Parquet files.
722 ///
723 /// # Parameters
724 ///
725 /// - `instrument_ids`: Optional list of instrument IDs to filter by
726 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
727 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
728 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
729 ///
730 /// # Returns
731 ///
732 /// Returns a vector of `OrderBookDelta` objects matching the query criteria.
733 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
734 pub fn query_order_book_deltas(
735 &mut self,
736 instrument_ids: Option<Vec<String>>,
737 start: Option<u64>,
738 end: Option<u64>,
739 where_clause: Option<String>,
740 ) -> PyResult<Vec<OrderBookDelta>> {
741 // Convert u64 timestamps to UnixNanos
742 let start_nanos = start.map(UnixNanos::from);
743 let end_nanos = end.map(UnixNanos::from);
744
745 // Use the backend catalog's generic query_typed_data function
746 self.inner
747 .query_typed_data::<OrderBookDelta>(
748 instrument_ids,
749 start_nanos,
750 end_nanos,
751 where_clause.as_deref(),
752 None,
753 )
754 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
755 }
756
757 /// Query bar data from Parquet files.
758 ///
759 /// # Parameters
760 ///
761 /// - `instrument_ids`: Optional list of instrument IDs to filter by
762 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
763 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
764 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
765 ///
766 /// # Returns
767 ///
768 /// Returns a vector of Bar objects matching the query criteria.
769 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
770 pub fn query_bars(
771 &mut self,
772 instrument_ids: Option<Vec<String>>,
773 start: Option<u64>,
774 end: Option<u64>,
775 where_clause: Option<String>,
776 ) -> PyResult<Vec<Bar>> {
777 // Convert u64 timestamps to UnixNanos
778 let start_nanos = start.map(UnixNanos::from);
779 let end_nanos = end.map(UnixNanos::from);
780
781 // Use the backend catalog's generic query_typed_data function
782 self.inner
783 .query_typed_data::<Bar>(
784 instrument_ids,
785 start_nanos,
786 end_nanos,
787 where_clause.as_deref(),
788 None,
789 )
790 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
791 }
792
793 /// Query order book depth data from Parquet files.
794 ///
795 /// # Parameters
796 ///
797 /// - `instrument_ids`: Optional list of instrument IDs to filter by
798 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
799 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
800 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
801 ///
802 /// # Returns
803 ///
804 /// Returns a vector of `OrderBookDepth10` objects matching the query criteria.
805 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
806 pub fn query_order_book_depths(
807 &mut self,
808 instrument_ids: Option<Vec<String>>,
809 start: Option<u64>,
810 end: Option<u64>,
811 where_clause: Option<String>,
812 ) -> PyResult<Vec<OrderBookDepth10>> {
813 // Convert u64 timestamps to UnixNanos
814 let start_nanos = start.map(UnixNanos::from);
815 let end_nanos = end.map(UnixNanos::from);
816
817 // Use the backend catalog's generic query_typed_data function
818 self.inner
819 .query_typed_data::<OrderBookDepth10>(
820 instrument_ids,
821 start_nanos,
822 end_nanos,
823 where_clause.as_deref(),
824 None,
825 )
826 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
827 }
828
829 /// Query mark price update data from Parquet files.
830 ///
831 /// # Parameters
832 ///
833 /// - `instrument_ids`: Optional list of instrument IDs to filter by
834 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
835 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
836 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
837 ///
838 /// # Returns
839 ///
840 /// Returns a vector of `MarkPriceUpdate` objects matching the query criteria.
841 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
842 pub fn query_mark_price_updates(
843 &mut self,
844 instrument_ids: Option<Vec<String>>,
845 start: Option<u64>,
846 end: Option<u64>,
847 where_clause: Option<String>,
848 ) -> PyResult<Vec<MarkPriceUpdate>> {
849 // Convert u64 timestamps to UnixNanos
850 let start_nanos = start.map(UnixNanos::from);
851 let end_nanos = end.map(UnixNanos::from);
852
853 // Use the backend catalog's generic query_typed_data function
854 self.inner
855 .query_typed_data::<MarkPriceUpdate>(
856 instrument_ids,
857 start_nanos,
858 end_nanos,
859 where_clause.as_deref(),
860 None,
861 )
862 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
863 }
864
865 /// Query index price update data from Parquet files.
866 ///
867 /// # Parameters
868 ///
869 /// - `instrument_ids`: Optional list of instrument IDs to filter by
870 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
871 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
872 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
873 ///
874 /// # Returns
875 ///
876 /// Returns a vector of `IndexPriceUpdate` objects matching the query criteria.
877 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
878 pub fn query_index_price_updates(
879 &mut self,
880 instrument_ids: Option<Vec<String>>,
881 start: Option<u64>,
882 end: Option<u64>,
883 where_clause: Option<String>,
884 ) -> PyResult<Vec<IndexPriceUpdate>> {
885 // Convert u64 timestamps to UnixNanos
886 let start_nanos = start.map(UnixNanos::from);
887 let end_nanos = end.map(UnixNanos::from);
888
889 // Use the backend catalog's generic query_typed_data function
890 self.inner
891 .query_typed_data::<IndexPriceUpdate>(
892 instrument_ids,
893 start_nanos,
894 end_nanos,
895 where_clause.as_deref(),
896 None,
897 )
898 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
899 }
900}