nautilus_persistence/python/catalog.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::UnixNanos;
17use nautilus_model::data::{
18 Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
19};
20use pyo3::{exceptions::PyIOError, prelude::*};
21
22use crate::backend::catalog::ParquetDataCatalog;
23
24/// A catalog for writing data to Parquet files.
25#[cfg_attr(
26 feature = "python",
27 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.persistence")
28)]
29pub struct ParquetDataCatalogV2 {
30 inner: ParquetDataCatalog,
31}
32
33#[pymethods]
34impl ParquetDataCatalogV2 {
35 /// Create a new `ParquetCatalog` with the given base path and optional parameters.
36 ///
37 /// # Parameters
38 ///
39 /// - `base_path`: The base path for the catalog
40 /// - `storage_options`: Optional storage configuration for cloud backends
41 /// - `batch_size`: Optional batch size for processing (default: 5000)
42 /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
43 /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
44 #[new]
45 #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
46 #[must_use]
47 pub fn new(
48 base_path: String,
49 storage_options: Option<std::collections::HashMap<String, String>>,
50 batch_size: Option<usize>,
51 compression: Option<u8>,
52 max_row_group_size: Option<usize>,
53 ) -> Self {
54 let compression = compression.map(|c| match c {
55 0 => parquet::basic::Compression::UNCOMPRESSED,
56 1 => parquet::basic::Compression::SNAPPY,
57 // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
58 // since we can't pass the level parameter through PyO3
59 2 => {
60 let level = Default::default();
61 parquet::basic::Compression::GZIP(level)
62 }
63 3 => parquet::basic::Compression::LZO,
64 4 => {
65 let level = Default::default();
66 parquet::basic::Compression::BROTLI(level)
67 }
68 5 => parquet::basic::Compression::LZ4,
69 6 => {
70 let level = Default::default();
71 parquet::basic::Compression::ZSTD(level)
72 }
73 _ => parquet::basic::Compression::SNAPPY,
74 });
75
76 Self {
77 inner: ParquetDataCatalog::from_uri(
78 &base_path,
79 storage_options,
80 batch_size,
81 compression,
82 max_row_group_size,
83 )
84 .expect("Failed to create ParquetDataCatalog"),
85 }
86 }
87
88 // TODO: Cannot pass mixed data across pyo3 as a single type
89 // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: NautilusDataType, data: Vec<Data>) {}
90
91 /// Write quote tick data to Parquet files.
92 ///
93 /// # Parameters
94 ///
95 /// - `data`: Vector of quote ticks to write
96 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
97 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
98 ///
99 /// # Returns
100 ///
101 /// Returns the path of the created file as a string.
102 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
103 pub fn write_quote_ticks(
104 &self,
105 data: Vec<QuoteTick>,
106 start: Option<u64>,
107 end: Option<u64>,
108 skip_disjoint_check: bool,
109 ) -> PyResult<String> {
110 // Convert u64 timestamps to UnixNanos
111 let start_nanos = start.map(UnixNanos::from);
112 let end_nanos = end.map(UnixNanos::from);
113
114 self.inner
115 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
116 .map(|path| path.to_string_lossy().to_string())
117 .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
118 }
119
120 /// Write trade tick data to Parquet files.
121 ///
122 /// # Parameters
123 ///
124 /// - `data`: Vector of trade ticks to write
125 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
126 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
127 ///
128 /// # Returns
129 ///
130 /// Returns the path of the created file as a string.
131 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
132 pub fn write_trade_ticks(
133 &self,
134 data: Vec<TradeTick>,
135 start: Option<u64>,
136 end: Option<u64>,
137 skip_disjoint_check: bool,
138 ) -> PyResult<String> {
139 // Convert u64 timestamps to UnixNanos
140 let start_nanos = start.map(UnixNanos::from);
141 let end_nanos = end.map(UnixNanos::from);
142
143 self.inner
144 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
145 .map(|path| path.to_string_lossy().to_string())
146 .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
147 }
148
149 /// Write order book delta data to Parquet files.
150 ///
151 /// # Parameters
152 ///
153 /// - `data`: Vector of order book deltas to write
154 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
155 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
156 ///
157 /// # Returns
158 ///
159 /// Returns the path of the created file as a string.
160 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
161 pub fn write_order_book_deltas(
162 &self,
163 data: Vec<OrderBookDelta>,
164 start: Option<u64>,
165 end: Option<u64>,
166 skip_disjoint_check: bool,
167 ) -> PyResult<String> {
168 // Convert u64 timestamps to UnixNanos
169 let start_nanos = start.map(UnixNanos::from);
170 let end_nanos = end.map(UnixNanos::from);
171
172 self.inner
173 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
174 .map(|path| path.to_string_lossy().to_string())
175 .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
176 }
177
178 /// Write bar data to Parquet files.
179 ///
180 /// # Parameters
181 ///
182 /// - `data`: Vector of bars to write
183 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
184 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
185 ///
186 /// # Returns
187 ///
188 /// Returns the path of the created file as a string.
189 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
190 pub fn write_bars(
191 &self,
192 data: Vec<Bar>,
193 start: Option<u64>,
194 end: Option<u64>,
195 skip_disjoint_check: bool,
196 ) -> PyResult<String> {
197 // Convert u64 timestamps to UnixNanos
198 let start_nanos = start.map(UnixNanos::from);
199 let end_nanos = end.map(UnixNanos::from);
200
201 self.inner
202 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
203 .map(|path| path.to_string_lossy().to_string())
204 .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
205 }
206
207 /// Write order book depth data to Parquet files.
208 ///
209 /// # Parameters
210 ///
211 /// - `data`: Vector of order book depths to write
212 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
213 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
214 ///
215 /// # Returns
216 ///
217 /// Returns the path of the created file as a string.
218 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
219 pub fn write_order_book_depths(
220 &self,
221 data: Vec<OrderBookDepth10>,
222 start: Option<u64>,
223 end: Option<u64>,
224 skip_disjoint_check: bool,
225 ) -> PyResult<String> {
226 // Convert u64 timestamps to UnixNanos
227 let start_nanos = start.map(UnixNanos::from);
228 let end_nanos = end.map(UnixNanos::from);
229
230 self.inner
231 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
232 .map(|path| path.to_string_lossy().to_string())
233 .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
234 }
235
236 /// Write mark price update data to Parquet files.
237 ///
238 /// # Parameters
239 ///
240 /// - `data`: Vector of mark price updates to write
241 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
242 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
243 ///
244 /// # Returns
245 ///
246 /// Returns the path of the created file as a string.
247 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
248 pub fn write_mark_price_updates(
249 &self,
250 data: Vec<MarkPriceUpdate>,
251 start: Option<u64>,
252 end: Option<u64>,
253 skip_disjoint_check: bool,
254 ) -> PyResult<String> {
255 // Convert u64 timestamps to UnixNanos
256 let start_nanos = start.map(UnixNanos::from);
257 let end_nanos = end.map(UnixNanos::from);
258
259 self.inner
260 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
261 .map(|path| path.to_string_lossy().to_string())
262 .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
263 }
264
265 /// Write index price update data to Parquet files.
266 ///
267 /// # Parameters
268 ///
269 /// - `data`: Vector of index price updates to write
270 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
271 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
272 ///
273 /// # Returns
274 ///
275 /// Returns the path of the created file as a string.
276 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
277 pub fn write_index_price_updates(
278 &self,
279 data: Vec<IndexPriceUpdate>,
280 start: Option<u64>,
281 end: Option<u64>,
282 skip_disjoint_check: bool,
283 ) -> PyResult<String> {
284 // Convert u64 timestamps to UnixNanos
285 let start_nanos = start.map(UnixNanos::from);
286 let end_nanos = end.map(UnixNanos::from);
287
288 self.inner
289 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
290 .map(|path| path.to_string_lossy().to_string())
291 .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
292 }
293
294 /// Extend file names in the catalog with additional timestamp information.
295 ///
296 /// # Parameters
297 ///
298 /// - `data_cls`: The data class name
299 /// - `instrument_id`: Optional instrument ID filter
300 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
301 /// - `end`: End timestamp (nanoseconds since Unix epoch)
302 #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
303 pub fn extend_file_name(
304 &self,
305 data_cls: &str,
306 instrument_id: Option<String>,
307 start: u64,
308 end: u64,
309 ) -> PyResult<()> {
310 // Convert u64 timestamps to UnixNanos
311 let start_nanos = UnixNanos::from(start);
312 let end_nanos = UnixNanos::from(end);
313
314 self.inner
315 .extend_file_name(data_cls, instrument_id, start_nanos, end_nanos)
316 .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
317 }
318
319 /// Consolidate all data files in the catalog within the specified time range.
320 ///
321 /// # Parameters
322 ///
323 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
324 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
325 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
326 #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None))]
327 pub fn consolidate_catalog(
328 &self,
329 start: Option<u64>,
330 end: Option<u64>,
331 ensure_contiguous_files: Option<bool>,
332 ) -> PyResult<()> {
333 // Convert u64 timestamps to UnixNanos
334 let start_nanos = start.map(UnixNanos::from);
335 let end_nanos = end.map(UnixNanos::from);
336
337 self.inner
338 .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files)
339 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
340 }
341
342 /// Consolidate data files for a specific data type within the specified time range.
343 ///
344 /// # Parameters
345 ///
346 /// - `type_name`: The data type name to consolidate
347 /// - `instrument_id`: Optional instrument ID filter
348 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
349 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
350 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
351 #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None))]
352 pub fn consolidate_data(
353 &self,
354 type_name: &str,
355 instrument_id: Option<String>,
356 start: Option<u64>,
357 end: Option<u64>,
358 ensure_contiguous_files: Option<bool>,
359 ) -> PyResult<()> {
360 // Convert u64 timestamps to UnixNanos
361 let start_nanos = start.map(UnixNanos::from);
362 let end_nanos = end.map(UnixNanos::from);
363
364 self.inner
365 .consolidate_data(
366 type_name,
367 instrument_id,
368 start_nanos,
369 end_nanos,
370 ensure_contiguous_files,
371 )
372 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
373 }
374
375 /// Consolidate all data files in the catalog by splitting them into fixed time periods.
376 ///
377 /// This method identifies all leaf directories in the catalog that contain parquet files
378 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
379 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
380 /// and instrument IDs in the catalog.
381 ///
382 /// # Parameters
383 ///
384 /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
385 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
386 /// - `start`: Optional start timestamp for the consolidation range (nanoseconds since Unix epoch)
387 /// - `end`: Optional end timestamp for the consolidation range (nanoseconds since Unix epoch)
388 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
389 #[pyo3(signature = (period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
390 pub fn consolidate_catalog_by_period(
391 &mut self,
392 period_nanos: Option<u64>,
393 start: Option<u64>,
394 end: Option<u64>,
395 ensure_contiguous_files: Option<bool>,
396 ) -> PyResult<()> {
397 // Convert u64 timestamps to UnixNanos
398 let start_nanos = start.map(UnixNanos::from);
399 let end_nanos = end.map(UnixNanos::from);
400
401 self.inner
402 .consolidate_catalog_by_period(
403 period_nanos,
404 start_nanos,
405 end_nanos,
406 ensure_contiguous_files,
407 )
408 .map_err(|e| {
409 PyIOError::new_err(format!("Failed to consolidate catalog by period: {e}"))
410 })
411 }
412
413 /// Consolidate data files by splitting them into fixed time periods.
414 ///
415 /// This method queries data by period and writes consolidated files immediately,
416 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
417 /// the function automatically splits those files to preserve all data.
418 ///
419 /// # Parameters
420 ///
421 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
422 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments
423 /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
424 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
425 /// - `start`: Optional start timestamp for consolidation range (nanoseconds since Unix epoch)
426 /// - `end`: Optional end timestamp for consolidation range (nanoseconds since Unix epoch)
427 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
428 #[pyo3(signature = (type_name, identifier=None, period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
429 pub fn consolidate_data_by_period(
430 &mut self,
431 type_name: &str,
432 identifier: Option<String>,
433 period_nanos: Option<u64>,
434 start: Option<u64>,
435 end: Option<u64>,
436 ensure_contiguous_files: Option<bool>,
437 ) -> PyResult<()> {
438 // Convert u64 timestamps to UnixNanos
439 let start_nanos = start.map(UnixNanos::from);
440 let end_nanos = end.map(UnixNanos::from);
441
442 self.inner
443 .consolidate_data_by_period(
444 type_name,
445 identifier,
446 period_nanos,
447 start_nanos,
448 end_nanos,
449 ensure_contiguous_files,
450 )
451 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data by period: {e}")))
452 }
453
454 /// Reset all catalog file names to their canonical form.
455 pub fn reset_catalog_file_names(&self) -> PyResult<()> {
456 self.inner
457 .reset_catalog_file_names()
458 .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
459 }
460
461 /// Reset data file names for a specific data class to their canonical form.
462 ///
463 /// # Parameters
464 ///
465 /// - `data_cls`: The data class name
466 /// - `instrument_id`: Optional instrument ID filter
467 #[pyo3(signature = (data_cls, instrument_id=None))]
468 pub fn reset_data_file_names(
469 &self,
470 data_cls: &str,
471 instrument_id: Option<String>,
472 ) -> PyResult<()> {
473 self.inner
474 .reset_data_file_names(data_cls, instrument_id)
475 .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
476 }
477
478 /// Delete data within a specified time range across the entire catalog.
479 ///
480 /// This method identifies all leaf directories in the catalog that contain parquet files
481 /// and deletes data within the specified time range from each directory. A leaf directory
482 /// is one that contains files but no subdirectories. This is a convenience method that
483 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
484 ///
485 /// # Parameters
486 ///
487 /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
488 /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
489 ///
490 /// # Notes
491 ///
492 /// - This operation permanently removes data and cannot be undone
493 /// - The deletion process handles file intersections intelligently by splitting files
494 /// when they partially overlap with the deletion range
495 /// - Files completely within the deletion range are removed entirely
496 /// - Files partially overlapping the deletion range are split to preserve data outside the range
497 /// - This method is useful for bulk data cleanup operations across the entire catalog
498 /// - Empty directories are not automatically removed after deletion
499 #[pyo3(signature = (start=None, end=None))]
500 pub fn delete_catalog_range(&mut self, start: Option<u64>, end: Option<u64>) -> PyResult<()> {
501 // Convert u64 timestamps to UnixNanos
502 let start_nanos = start.map(UnixNanos::from);
503 let end_nanos = end.map(UnixNanos::from);
504
505 self.inner
506 .delete_catalog_range(start_nanos, end_nanos)
507 .map_err(|e| PyIOError::new_err(format!("Failed to delete catalog range: {e}")))
508 }
509
510 /// Delete data within a specified time range for a specific data type and instrument.
511 ///
512 /// This method identifies all parquet files that intersect with the specified time range
513 /// and handles them appropriately:
514 /// - Files completely within the range are deleted
515 /// - Files partially overlapping the range are split to preserve data outside the range
516 /// - The original intersecting files are removed after processing
517 ///
518 /// # Parameters
519 ///
520 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
521 /// - `instrument_id`: Optional instrument ID to delete data for. If None, deletes data across all instruments
522 /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
523 /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
524 ///
525 /// # Notes
526 ///
527 /// - This operation permanently removes data and cannot be undone
528 /// - Files that partially overlap the deletion range are split to preserve data outside the range
529 /// - The method ensures data integrity by using atomic operations where possible
530 /// - Empty directories are not automatically removed after deletion
531 #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None))]
532 pub fn delete_data_range(
533 &mut self,
534 type_name: &str,
535 instrument_id: Option<String>,
536 start: Option<u64>,
537 end: Option<u64>,
538 ) -> PyResult<()> {
539 // Convert u64 timestamps to UnixNanos
540 let start_nanos = start.map(UnixNanos::from);
541 let end_nanos = end.map(UnixNanos::from);
542
543 self.inner
544 .delete_data_range(type_name, instrument_id, start_nanos, end_nanos)
545 .map_err(|e| PyIOError::new_err(format!("Failed to delete data range: {e}")))
546 }
547
548 /// Query files in the catalog matching the specified criteria.
549 ///
550 /// # Parameters
551 ///
552 /// - `data_cls`: The data class name to query
553 /// - `instrument_ids`: Optional list of instrument IDs to filter by
554 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
555 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
556 ///
557 /// # Returns
558 ///
559 /// Returns a list of file paths matching the criteria.
560 #[pyo3(signature = (data_cls, instrument_ids=None, start=None, end=None))]
561 pub fn query_files(
562 &self,
563 data_cls: &str,
564 instrument_ids: Option<Vec<String>>,
565 start: Option<u64>,
566 end: Option<u64>,
567 ) -> PyResult<Vec<String>> {
568 // Convert u64 timestamps to UnixNanos
569 let start_nanos = start.map(UnixNanos::from);
570 let end_nanos = end.map(UnixNanos::from);
571
572 self.inner
573 .query_files(data_cls, instrument_ids, start_nanos, end_nanos)
574 .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
575 }
576
577 /// Get missing time intervals for a data request.
578 ///
579 /// # Parameters
580 ///
581 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
582 /// - `end`: End timestamp (nanoseconds since Unix epoch)
583 /// - `data_cls`: The data class name
584 /// - `instrument_id`: Optional instrument ID filter
585 ///
586 /// # Returns
587 ///
588 /// Returns a list of (start, end) timestamp tuples representing missing intervals.
589 #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
590 pub fn get_missing_intervals_for_request(
591 &self,
592 start: u64,
593 end: u64,
594 data_cls: &str,
595 instrument_id: Option<String>,
596 ) -> PyResult<Vec<(u64, u64)>> {
597 self.inner
598 .get_missing_intervals_for_request(start, end, data_cls, instrument_id)
599 .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
600 }
601
602 /// Query the last timestamp for a specific data class and instrument.
603 ///
604 /// # Parameters
605 ///
606 /// - `data_cls`: The data class name
607 /// - `instrument_id`: Optional instrument ID filter
608 ///
609 /// # Returns
610 ///
611 /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
612 #[pyo3(signature = (data_cls, instrument_id=None))]
613 pub fn query_last_timestamp(
614 &self,
615 data_cls: &str,
616 instrument_id: Option<String>,
617 ) -> PyResult<Option<u64>> {
618 self.inner
619 .query_last_timestamp(data_cls, instrument_id)
620 .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
621 }
622
623 /// Get time intervals covered by data for a specific data class and instrument.
624 ///
625 /// # Parameters
626 ///
627 /// - `data_cls`: The data class name
628 /// - `instrument_id`: Optional instrument ID filter
629 ///
630 /// # Returns
631 ///
632 /// Returns a list of (start, end) timestamp tuples representing covered intervals.
633 #[pyo3(signature = (data_cls, instrument_id=None))]
634 pub fn get_intervals(
635 &self,
636 data_cls: &str,
637 instrument_id: Option<String>,
638 ) -> PyResult<Vec<(u64, u64)>> {
639 self.inner
640 .get_intervals(data_cls, instrument_id)
641 .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
642 }
643
644 /// Query quote tick data from Parquet files.
645 ///
646 /// # Parameters
647 ///
648 /// - `instrument_ids`: Optional list of instrument IDs to filter by
649 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
650 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
651 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
652 ///
653 /// # Returns
654 ///
655 /// Returns a vector of `QuoteTick` objects matching the query criteria.
656 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
657 pub fn query_quote_ticks(
658 &mut self,
659 instrument_ids: Option<Vec<String>>,
660 start: Option<u64>,
661 end: Option<u64>,
662 where_clause: Option<String>,
663 ) -> PyResult<Vec<QuoteTick>> {
664 // Convert u64 timestamps to UnixNanos
665 let start_nanos = start.map(UnixNanos::from);
666 let end_nanos = end.map(UnixNanos::from);
667
668 // Use the backend catalog's generic query_typed_data function
669 self.inner
670 .query_typed_data::<QuoteTick>(
671 instrument_ids,
672 start_nanos,
673 end_nanos,
674 where_clause.as_deref(),
675 None,
676 )
677 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
678 }
679
680 /// Query trade tick data from Parquet files.
681 ///
682 /// # Parameters
683 ///
684 /// - `instrument_ids`: Optional list of instrument IDs to filter by
685 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
686 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
687 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
688 ///
689 /// # Returns
690 ///
691 /// Returns a vector of `TradeTick` objects matching the query criteria.
692 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
693 pub fn query_trade_ticks(
694 &mut self,
695 instrument_ids: Option<Vec<String>>,
696 start: Option<u64>,
697 end: Option<u64>,
698 where_clause: Option<String>,
699 ) -> PyResult<Vec<TradeTick>> {
700 // Convert u64 timestamps to UnixNanos
701 let start_nanos = start.map(UnixNanos::from);
702 let end_nanos = end.map(UnixNanos::from);
703
704 // Use the backend catalog's generic query_typed_data function
705 self.inner
706 .query_typed_data::<TradeTick>(
707 instrument_ids,
708 start_nanos,
709 end_nanos,
710 where_clause.as_deref(),
711 None,
712 )
713 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
714 }
715
716 /// Query order book delta data from Parquet files.
717 ///
718 /// # Parameters
719 ///
720 /// - `instrument_ids`: Optional list of instrument IDs to filter by
721 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
722 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
723 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
724 ///
725 /// # Returns
726 ///
727 /// Returns a vector of `OrderBookDelta` objects matching the query criteria.
728 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
729 pub fn query_order_book_deltas(
730 &mut self,
731 instrument_ids: Option<Vec<String>>,
732 start: Option<u64>,
733 end: Option<u64>,
734 where_clause: Option<String>,
735 ) -> PyResult<Vec<OrderBookDelta>> {
736 // Convert u64 timestamps to UnixNanos
737 let start_nanos = start.map(UnixNanos::from);
738 let end_nanos = end.map(UnixNanos::from);
739
740 // Use the backend catalog's generic query_typed_data function
741 self.inner
742 .query_typed_data::<OrderBookDelta>(
743 instrument_ids,
744 start_nanos,
745 end_nanos,
746 where_clause.as_deref(),
747 None,
748 )
749 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
750 }
751
752 /// Query bar data from Parquet files.
753 ///
754 /// # Parameters
755 ///
756 /// - `instrument_ids`: Optional list of instrument IDs to filter by
757 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
758 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
759 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
760 ///
761 /// # Returns
762 ///
763 /// Returns a vector of Bar objects matching the query criteria.
764 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
765 pub fn query_bars(
766 &mut self,
767 instrument_ids: Option<Vec<String>>,
768 start: Option<u64>,
769 end: Option<u64>,
770 where_clause: Option<String>,
771 ) -> PyResult<Vec<Bar>> {
772 // Convert u64 timestamps to UnixNanos
773 let start_nanos = start.map(UnixNanos::from);
774 let end_nanos = end.map(UnixNanos::from);
775
776 // Use the backend catalog's generic query_typed_data function
777 self.inner
778 .query_typed_data::<Bar>(
779 instrument_ids,
780 start_nanos,
781 end_nanos,
782 where_clause.as_deref(),
783 None,
784 )
785 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
786 }
787
788 /// Query order book depth data from Parquet files.
789 ///
790 /// # Parameters
791 ///
792 /// - `instrument_ids`: Optional list of instrument IDs to filter by
793 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
794 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
795 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
796 ///
797 /// # Returns
798 ///
799 /// Returns a vector of `OrderBookDepth10` objects matching the query criteria.
800 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
801 pub fn query_order_book_depths(
802 &mut self,
803 instrument_ids: Option<Vec<String>>,
804 start: Option<u64>,
805 end: Option<u64>,
806 where_clause: Option<String>,
807 ) -> PyResult<Vec<OrderBookDepth10>> {
808 // Convert u64 timestamps to UnixNanos
809 let start_nanos = start.map(UnixNanos::from);
810 let end_nanos = end.map(UnixNanos::from);
811
812 // Use the backend catalog's generic query_typed_data function
813 self.inner
814 .query_typed_data::<OrderBookDepth10>(
815 instrument_ids,
816 start_nanos,
817 end_nanos,
818 where_clause.as_deref(),
819 None,
820 )
821 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
822 }
823
824 /// Query mark price update data from Parquet files.
825 ///
826 /// # Parameters
827 ///
828 /// - `instrument_ids`: Optional list of instrument IDs to filter by
829 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
830 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
831 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
832 ///
833 /// # Returns
834 ///
835 /// Returns a vector of `MarkPriceUpdate` objects matching the query criteria.
836 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
837 pub fn query_mark_price_updates(
838 &mut self,
839 instrument_ids: Option<Vec<String>>,
840 start: Option<u64>,
841 end: Option<u64>,
842 where_clause: Option<String>,
843 ) -> PyResult<Vec<MarkPriceUpdate>> {
844 // Convert u64 timestamps to UnixNanos
845 let start_nanos = start.map(UnixNanos::from);
846 let end_nanos = end.map(UnixNanos::from);
847
848 // Use the backend catalog's generic query_typed_data function
849 self.inner
850 .query_typed_data::<MarkPriceUpdate>(
851 instrument_ids,
852 start_nanos,
853 end_nanos,
854 where_clause.as_deref(),
855 None,
856 )
857 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
858 }
859
860 /// Query index price update data from Parquet files.
861 ///
862 /// # Parameters
863 ///
864 /// - `instrument_ids`: Optional list of instrument IDs to filter by
865 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
866 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
867 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
868 ///
869 /// # Returns
870 ///
871 /// Returns a vector of `IndexPriceUpdate` objects matching the query criteria.
872 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
873 pub fn query_index_price_updates(
874 &mut self,
875 instrument_ids: Option<Vec<String>>,
876 start: Option<u64>,
877 end: Option<u64>,
878 where_clause: Option<String>,
879 ) -> PyResult<Vec<IndexPriceUpdate>> {
880 // Convert u64 timestamps to UnixNanos
881 let start_nanos = start.map(UnixNanos::from);
882 let end_nanos = end.map(UnixNanos::from);
883
884 // Use the backend catalog's generic query_typed_data function
885 self.inner
886 .query_typed_data::<IndexPriceUpdate>(
887 instrument_ids,
888 start_nanos,
889 end_nanos,
890 where_clause.as_deref(),
891 None,
892 )
893 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
894 }
895}