nautilus_persistence/backend/catalog_operations.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use ahash::{AHashMap, AHashSet};
22use futures::StreamExt;
23use nautilus_core::UnixNanos;
24use nautilus_model::data::{
25 Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
26 QuoteTick, TradeTick, close::InstrumentClose,
27};
28use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
29use object_store::path::Path as ObjectPath;
30
31use crate::{
32 backend::catalog::{
33 CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
34 extract_path_components, make_object_store_path, parse_filename_timestamps,
35 timestamps_to_filename,
36 },
37 parquet::{
38 combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
39 },
40};
41
42/// Information about a consolidation query to be executed.
43///
44/// This struct encapsulates all the information needed to execute a single consolidation
45/// operation, including the data range to query and file naming strategy.
46///
47/// # Fields
48///
49/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
50/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
51/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
52///
53/// # Usage
54///
55/// This struct is used internally by the consolidation system to plan and execute
56/// data consolidation operations. It allows the system to:
57/// - Separate query planning from execution.
58/// - Handle complex scenarios like data splitting.
59/// - Optimize file naming strategies.
60/// - Batch multiple operations efficiently.
61/// - Maintain file contiguity across periods.
62///
63/// # Examples
64///
65/// ```rust,no_run
66/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
67///
68/// // Regular consolidation query
69/// let query = ConsolidationQuery {
70/// query_start: 1609459200000000000,
71/// query_end: 1609545600000000000,
72/// use_period_boundaries: true,
73/// };
74///
75/// // Split operation to preserve data
76/// let split_query = ConsolidationQuery {
77/// query_start: 1609459200000000000,
78/// query_end: 1609462800000000000,
79/// use_period_boundaries: false,
80/// };
81/// ```
82#[derive(Debug, Clone)]
83pub struct ConsolidationQuery {
84 /// Start timestamp for the query range (inclusive, in nanoseconds)
85 pub query_start: u64,
86 /// End timestamp for the query range (inclusive, in nanoseconds)
87 pub query_end: u64,
88 /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
89 pub use_period_boundaries: bool,
90}
91
92/// Information about a deletion operation to be executed.
93///
94/// This struct encapsulates all the information needed to execute a single deletion
95/// operation, including the type of operation and file handling details.
96#[derive(Debug, Clone)]
97pub struct DeleteOperation {
98 /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
99 pub operation_type: String,
100 /// List of files involved in this operation.
101 pub files: Vec<String>,
102 /// Start timestamp for data query (used for split operations).
103 pub query_start: u64,
104 /// End timestamp for data query (used for split operations).
105 pub query_end: u64,
106 /// Start timestamp for new file naming (used for split operations).
107 pub file_start_ns: u64,
108 /// End timestamp for new file naming (used for split operations).
109 pub file_end_ns: u64,
110}
111
112impl ParquetDataCatalog {
113 /// Consolidates all data files in the catalog.
114 ///
115 /// This method identifies all leaf directories in the catalog that contain parquet files
116 /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
117 /// This is a convenience method that effectively calls `consolidate_data` for all data types
118 /// and instrument IDs in the catalog.
119 ///
120 /// # Parameters
121 ///
122 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
123 /// greater than or equal to this value will be consolidated. If None, all files
124 /// from the beginning of time will be considered.
125 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
126 /// less than or equal to this value will be consolidated. If None, all files
127 /// up to the end of time will be considered.
128 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
129 ///
130 /// # Returns
131 ///
132 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
133 ///
134 /// # Errors
135 ///
136 /// Returns an error if:
137 /// - Directory listing fails.
138 /// - File consolidation operations fail.
139 /// - Interval validation fails (when `ensure_contiguous_files` is true).
140 ///
141 /// # Examples
142 ///
143 /// ```rust,no_run
144 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
145 /// use nautilus_core::UnixNanos;
146 ///
147 /// let catalog = ParquetDataCatalog::new(/* ... */);
148 ///
149 /// // Consolidate all files in the catalog
150 /// catalog.consolidate_catalog(None, None, None)?;
151 ///
152 /// // Consolidate only files within a specific time range
153 /// catalog.consolidate_catalog(
154 /// Some(UnixNanos::from(1609459200000000000)),
155 /// Some(UnixNanos::from(1609545600000000000)),
156 /// Some(true)
157 /// )?;
158 /// # Ok::<(), anyhow::Error>(())
159 /// ```
160 pub fn consolidate_catalog(
161 &self,
162 start: Option<UnixNanos>,
163 end: Option<UnixNanos>,
164 ensure_contiguous_files: Option<bool>,
165 ) -> anyhow::Result<()> {
166 let leaf_directories = self.find_leaf_data_directories()?;
167
168 for directory in leaf_directories {
169 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
170 }
171
172 Ok(())
173 }
174
175 /// Consolidates data files for a specific data type and instrument.
176 ///
177 /// This method consolidates Parquet files within a specific directory (defined by data type
178 /// and optional instrument ID) by merging multiple files into a single file. This improves
179 /// query performance and can reduce storage overhead.
180 ///
181 /// # Parameters
182 ///
183 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
184 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
185 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
186 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
187 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
188 ///
189 /// # Returns
190 ///
191 /// Returns `Ok(())` on success, or an error if consolidation fails.
192 ///
193 /// # Errors
194 ///
195 /// Returns an error if:
196 /// - The directory path cannot be constructed.
197 /// - File consolidation operations fail.
198 /// - Interval validation fails (when `ensure_contiguous_files` is true).
199 ///
200 /// # Examples
201 ///
202 /// ```rust,no_run
203 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
204 /// use nautilus_core::UnixNanos;
205 ///
206 /// let catalog = ParquetDataCatalog::new(/* ... */);
207 ///
208 /// // Consolidate all quote files for a specific instrument
209 /// catalog.consolidate_data(
210 /// "quotes",
211 /// Some("BTCUSD".to_string()),
212 /// None,
213 /// None,
214 /// None
215 /// )?;
216 ///
217 /// // Consolidate trade files within a time range
218 /// catalog.consolidate_data(
219 /// "trades",
220 /// None,
221 /// Some(UnixNanos::from(1609459200000000000)),
222 /// Some(UnixNanos::from(1609545600000000000)),
223 /// Some(true)
224 /// )?;
225 /// # Ok::<(), anyhow::Error>(())
226 /// ```
227 pub fn consolidate_data(
228 &self,
229 type_name: &str,
230 instrument_id: Option<String>,
231 start: Option<UnixNanos>,
232 end: Option<UnixNanos>,
233 ensure_contiguous_files: Option<bool>,
234 ) -> anyhow::Result<()> {
235 let directory = self.make_path(type_name, instrument_id)?;
236 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
237 }
238
239 /// Consolidates Parquet files within a specific directory by merging them into a single file.
240 ///
241 /// This internal method performs the actual consolidation work for a single directory.
242 /// It identifies files within the specified time range, validates their intervals,
243 /// and combines them into a single Parquet file with optimized storage.
244 ///
245 /// # Parameters
246 ///
247 /// - `directory`: The directory path containing Parquet files to consolidate.
248 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
249 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
250 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
251 ///
252 /// # Returns
253 ///
254 /// Returns `Ok(())` on success, or an error if consolidation fails.
255 ///
256 /// # Behavior
257 ///
258 /// - Skips consolidation if directory contains 1 or fewer files.
259 /// - Filters files by timestamp range if start/end are specified.
260 /// - Sorts intervals by start timestamp before consolidation.
261 /// - Creates a new file spanning the entire time range of input files.
262 /// - Validates interval disjointness after consolidation (if enabled).
263 ///
264 /// # Errors
265 ///
266 /// Returns an error if:
267 /// - Directory listing fails.
268 /// - File combination operations fail.
269 /// - Interval validation fails (when `ensure_contiguous_files` is true).
270 /// - Object store operations fail.
271 fn consolidate_directory(
272 &self,
273 directory: &str,
274 start: Option<UnixNanos>,
275 end: Option<UnixNanos>,
276 ensure_contiguous_files: Option<bool>,
277 ) -> anyhow::Result<()> {
278 let parquet_files = self.list_parquet_files(directory)?;
279
280 if parquet_files.len() <= 1 {
281 return Ok(());
282 }
283
284 let mut files_to_consolidate = Vec::new();
285 let mut intervals = Vec::new();
286 let start = start.map(|t| t.as_u64());
287 let end = end.map(|t| t.as_u64());
288
289 for file in parquet_files {
290 if let Some(interval) = parse_filename_timestamps(&file) {
291 let (interval_start, interval_end) = interval;
292 let include_file = match (start, end) {
293 (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
294 (Some(s), None) => interval_start >= s,
295 (None, Some(e)) => interval_end <= e,
296 (None, None) => true,
297 };
298
299 if include_file {
300 files_to_consolidate.push(file);
301 intervals.push(interval);
302 }
303 }
304 }
305
306 intervals.sort_by_key(|&(start, _)| start);
307
308 if !intervals.is_empty() {
309 let file_name = timestamps_to_filename(
310 UnixNanos::from(intervals[0].0),
311 UnixNanos::from(intervals.last().unwrap().1),
312 );
313 let path = make_object_store_path(directory, &[&file_name]);
314
315 // Convert string paths to ObjectPath for the function call
316 let object_paths: Vec<ObjectPath> = files_to_consolidate
317 .iter()
318 .map(|path| ObjectPath::from(path.as_str()))
319 .collect();
320
321 self.execute_async(async {
322 combine_parquet_files_from_object_store(
323 self.object_store.clone(),
324 object_paths,
325 &ObjectPath::from(path),
326 Some(self.compression),
327 Some(self.max_row_group_size),
328 )
329 .await
330 })?;
331 }
332
333 if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
334 anyhow::bail!("Intervals are not disjoint after consolidating a directory");
335 }
336
337 Ok(())
338 }
339
340 /// Consolidates all data files in the catalog by splitting them into fixed time periods.
341 ///
342 /// This method identifies all leaf directories in the catalog that contain parquet files
343 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
344 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
345 /// and instrument IDs in the catalog.
346 ///
347 /// # Parameters
348 ///
349 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
350 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
351 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
352 /// greater than or equal to this value will be consolidated. If None, all files
353 /// from the beginning of time will be considered.
354 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
355 /// less than or equal to this value will be consolidated. If None, all files
356 /// up to the end of time will be considered.
357 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
358 /// If false, uses actual data timestamps for file naming.
359 ///
360 /// # Returns
361 ///
362 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
363 ///
364 /// # Errors
365 ///
366 /// Returns an error if:
367 /// - Directory listing fails.
368 /// - Data type extraction from path fails.
369 /// - Period-based consolidation operations fail.
370 ///
371 /// # Notes
372 ///
373 /// - This operation can be resource-intensive for large catalogs with many data types.
374 /// and instruments.
375 /// - The consolidation process splits data into fixed time periods rather than combining.
376 /// all files into a single file per directory.
377 /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
378 /// - Original files are removed and replaced with period-based consolidated files.
379 /// - This method is useful for periodic maintenance of the catalog to standardize.
380 /// file organization by time periods.
381 ///
382 /// # Examples
383 ///
384 /// ```rust,no_run
385 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
386 /// use nautilus_core::UnixNanos;
387 ///
388 /// let catalog = ParquetDataCatalog::new(/* ... */);
389 ///
390 /// // Consolidate all files in the catalog by 1-day periods
391 /// catalog.consolidate_catalog_by_period(
392 /// Some(86400000000000), // 1 day in nanoseconds
393 /// None,
394 /// None,
395 /// Some(true)
396 /// )?;
397 ///
398 /// // Consolidate only files within a specific time range by 1-hour periods
399 /// catalog.consolidate_catalog_by_period(
400 /// Some(3600000000000), // 1 hour in nanoseconds
401 /// Some(UnixNanos::from(1609459200000000000)),
402 /// Some(UnixNanos::from(1609545600000000000)),
403 /// Some(false)
404 /// )?;
405 /// # Ok::<(), anyhow::Error>(())
406 /// ```
407 pub fn consolidate_catalog_by_period(
408 &mut self,
409 period_nanos: Option<u64>,
410 start: Option<UnixNanos>,
411 end: Option<UnixNanos>,
412 ensure_contiguous_files: Option<bool>,
413 ) -> anyhow::Result<()> {
414 let leaf_directories = self.find_leaf_data_directories()?;
415
416 for directory in leaf_directories {
417 let (data_cls, identifier) =
418 self.extract_data_cls_and_identifier_from_path(&directory)?;
419
420 if let Some(data_cls_name) = data_cls {
421 // Use match statement to call the generic consolidate_data_by_period for various types
422 match data_cls_name.as_str() {
423 "quotes" => {
424 self.consolidate_data_by_period_generic::<QuoteTick>(
425 identifier,
426 period_nanos,
427 start,
428 end,
429 ensure_contiguous_files,
430 )?;
431 }
432 "trades" => {
433 self.consolidate_data_by_period_generic::<TradeTick>(
434 identifier,
435 period_nanos,
436 start,
437 end,
438 ensure_contiguous_files,
439 )?;
440 }
441 "order_book_deltas" => {
442 self.consolidate_data_by_period_generic::<OrderBookDelta>(
443 identifier,
444 period_nanos,
445 start,
446 end,
447 ensure_contiguous_files,
448 )?;
449 }
450 "order_book_depths" => {
451 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
452 identifier,
453 period_nanos,
454 start,
455 end,
456 ensure_contiguous_files,
457 )?;
458 }
459 "bars" => {
460 self.consolidate_data_by_period_generic::<Bar>(
461 identifier,
462 period_nanos,
463 start,
464 end,
465 ensure_contiguous_files,
466 )?;
467 }
468 "index_prices" => {
469 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
470 identifier,
471 period_nanos,
472 start,
473 end,
474 ensure_contiguous_files,
475 )?;
476 }
477 "mark_prices" => {
478 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
479 identifier,
480 period_nanos,
481 start,
482 end,
483 ensure_contiguous_files,
484 )?;
485 }
486 "instrument_closes" => {
487 self.consolidate_data_by_period_generic::<InstrumentClose>(
488 identifier,
489 period_nanos,
490 start,
491 end,
492 ensure_contiguous_files,
493 )?;
494 }
495 _ => {
496 // Skip unknown data types
497 log::warn!("Unknown data type for consolidation: {data_cls_name}");
498 continue;
499 }
500 }
501 }
502 }
503
504 Ok(())
505 }
506
507 /// Extracts data class and identifier from a directory path.
508 ///
509 /// This method parses a directory path to extract the data type and optional
510 /// instrument identifier. It's used to determine what type of data consolidation
511 /// to perform for each directory.
512 ///
513 /// # Parameters
514 ///
515 /// - `path`: The directory path to parse.
516 ///
517 /// # Returns
518 ///
519 /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
520 pub fn extract_data_cls_and_identifier_from_path(
521 &self,
522 path: &str,
523 ) -> anyhow::Result<(Option<String>, Option<String>)> {
524 // Use cross-platform path parsing
525 let path_components = extract_path_components(path);
526
527 // Find the "data" directory in the path
528 if let Some(data_index) = path_components.iter().position(|part| part == "data")
529 && data_index + 1 < path_components.len()
530 {
531 let data_cls = path_components[data_index + 1].clone();
532
533 // Check if there's an identifier (instrument ID) after the data class
534 let identifier = if data_index + 2 < path_components.len() {
535 Some(path_components[data_index + 2].clone())
536 } else {
537 None
538 };
539
540 return Ok((Some(data_cls), identifier));
541 }
542
543 // If we can't parse the path, return None for both
544 Ok((None, None))
545 }
546
547 /// Consolidates data files by splitting them into fixed time periods.
548 ///
549 /// This method queries data by period and writes consolidated files immediately,
550 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
551 /// the function automatically splits those files to preserve all data.
552 ///
553 /// # Parameters
554 ///
555 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
556 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
557 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
558 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
559 /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
560 /// If specified and intersects existing files, those files will be split to preserve
561 /// data outside the consolidation range.
562 /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
563 /// If specified and intersects existing files, those files will be split to preserve
564 /// data outside the consolidation range.
565 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
566 /// If false, uses actual data timestamps for file naming.
567 ///
568 /// # Returns
569 ///
570 /// Returns `Ok(())` on success, or an error if consolidation fails.
571 ///
572 /// # Errors
573 ///
574 /// Returns an error if:
575 /// - The directory path cannot be constructed.
576 /// - File operations fail.
577 /// - Data querying or writing fails.
578 ///
579 /// # Notes
580 ///
581 /// - Uses two-phase approach: first determines all queries, then executes them.
582 /// - Groups intervals into contiguous groups to preserve holes between groups.
583 /// - Allows consolidation across multiple files within each contiguous group.
584 /// - Skips queries if target files already exist for efficiency.
585 /// - Original files are removed immediately after querying each period.
586 /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
587 /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
588 /// - Uses modulo arithmetic for efficient period boundary calculation.
589 /// - Preserves holes in data by preventing queries from spanning across gaps.
590 /// - Automatically splits files at start/end boundaries to preserve all data.
591 /// - Split operations are executed before consolidation to ensure data preservation.
592 ///
593 /// # Examples
594 ///
595 /// ```rust,no_run
596 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
597 /// use nautilus_core::UnixNanos;
598 ///
599 /// let catalog = ParquetDataCatalog::new(/* ... */);
600 ///
601 /// // Consolidate all quote files by 1-day periods
602 /// catalog.consolidate_data_by_period(
603 /// "quotes",
604 /// None,
605 /// Some(86400000000000), // 1 day in nanoseconds
606 /// None,
607 /// None,
608 /// Some(true)
609 /// )?;
610 ///
611 /// // Consolidate specific instrument by 1-hour periods
612 /// catalog.consolidate_data_by_period(
613 /// "trades",
614 /// Some("BTCUSD".to_string()),
615 /// Some(3600000000000), // 1 hour in nanoseconds
616 /// Some(UnixNanos::from(1609459200000000000)),
617 /// Some(UnixNanos::from(1609545600000000000)),
618 /// Some(false)
619 /// )?;
620 /// # Ok::<(), anyhow::Error>(())
621 /// ```
622 pub fn consolidate_data_by_period(
623 &mut self,
624 type_name: &str,
625 identifier: Option<String>,
626 period_nanos: Option<u64>,
627 start: Option<UnixNanos>,
628 end: Option<UnixNanos>,
629 ensure_contiguous_files: Option<bool>,
630 ) -> anyhow::Result<()> {
631 // Use match statement to call the generic consolidate_data_by_period for various types
632 match type_name {
633 "quotes" => {
634 self.consolidate_data_by_period_generic::<QuoteTick>(
635 identifier,
636 period_nanos,
637 start,
638 end,
639 ensure_contiguous_files,
640 )?;
641 }
642 "trades" => {
643 self.consolidate_data_by_period_generic::<TradeTick>(
644 identifier,
645 period_nanos,
646 start,
647 end,
648 ensure_contiguous_files,
649 )?;
650 }
651 "order_book_deltas" => {
652 self.consolidate_data_by_period_generic::<OrderBookDelta>(
653 identifier,
654 period_nanos,
655 start,
656 end,
657 ensure_contiguous_files,
658 )?;
659 }
660 "order_book_depths" => {
661 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
662 identifier,
663 period_nanos,
664 start,
665 end,
666 ensure_contiguous_files,
667 )?;
668 }
669 "bars" => {
670 self.consolidate_data_by_period_generic::<Bar>(
671 identifier,
672 period_nanos,
673 start,
674 end,
675 ensure_contiguous_files,
676 )?;
677 }
678 "index_prices" => {
679 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
680 identifier,
681 period_nanos,
682 start,
683 end,
684 ensure_contiguous_files,
685 )?;
686 }
687 "mark_prices" => {
688 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
689 identifier,
690 period_nanos,
691 start,
692 end,
693 ensure_contiguous_files,
694 )?;
695 }
696 "instrument_closes" => {
697 self.consolidate_data_by_period_generic::<InstrumentClose>(
698 identifier,
699 period_nanos,
700 start,
701 end,
702 ensure_contiguous_files,
703 )?;
704 }
705 _ => {
706 anyhow::bail!("Unknown data type for consolidation: {type_name}");
707 }
708 }
709
710 Ok(())
711 }
712
713 /// Generic consolidate data files by splitting them into fixed time periods.
714 ///
715 /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
716 /// to ensure compile-time correctness and enable reuse across different data types.
717 ///
718 /// # Type Parameters
719 ///
720 /// - `T`: The data type to consolidate, must implement required traits for serialization.
721 ///
722 /// # Parameters
723 ///
724 /// - `identifier`: Optional instrument ID to target a specific instrument's data.
725 /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
726 /// - `start`: Optional start timestamp for consolidation range.
727 /// - `end`: Optional end timestamp for consolidation range.
728 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
729 ///
730 /// # Returns
731 ///
732 /// Returns `Ok(())` on success, or an error if consolidation fails.
733 pub fn consolidate_data_by_period_generic<T>(
734 &mut self,
735 identifier: Option<String>,
736 period_nanos: Option<u64>,
737 start: Option<UnixNanos>,
738 end: Option<UnixNanos>,
739 ensure_contiguous_files: Option<bool>,
740 ) -> anyhow::Result<()>
741 where
742 T: DecodeDataFromRecordBatch
743 + CatalogPathPrefix
744 + EncodeToRecordBatch
745 + HasTsInit
746 + TryFrom<Data>
747 + Clone,
748 {
749 let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
750 let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
751
752 // Use get_intervals for cleaner implementation
753 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
754
755 if intervals.is_empty() {
756 return Ok(()); // No files to consolidate
757 }
758
759 // Use auxiliary function to prepare all queries for execution
760 let queries_to_execute = self.prepare_consolidation_queries(
761 T::path_prefix(),
762 identifier.clone(),
763 &intervals,
764 period_nanos,
765 start,
766 end,
767 ensure_contiguous_files,
768 )?;
769
770 if queries_to_execute.is_empty() {
771 return Ok(()); // No queries to execute
772 }
773
774 // Get directory for file operations
775 let directory = self.make_path(T::path_prefix(), identifier.clone())?;
776 let mut existing_files = self.list_parquet_files(&directory)?;
777 existing_files.sort();
778
779 // Track files to remove and maintain existing_files list
780 let mut files_to_remove = AHashSet::new();
781 let original_files_count = existing_files.len();
782
783 // Phase 2: Execute queries, write, and delete
784 let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
785
786 for query_info in queries_to_execute {
787 // Query data for this period using query_typed_data
788 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
789
790 let period_data = self.query_typed_data::<T>(
791 instrument_ids,
792 Some(UnixNanos::from(query_info.query_start)),
793 Some(UnixNanos::from(query_info.query_end)),
794 None,
795 Some(existing_files.clone()),
796 )?;
797
798 if period_data.is_empty() {
799 // Skip if no data found, but maintain contiguity by using query start
800 if file_start_ns.is_none() {
801 file_start_ns = Some(query_info.query_start);
802 }
803 continue;
804 }
805 file_start_ns = None;
806
807 // Determine final file timestamps
808 let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
809 // Use period boundaries for file naming, maintaining contiguity
810 if file_start_ns.is_none() {
811 file_start_ns = Some(query_info.query_start);
812 }
813 (file_start_ns.unwrap(), query_info.query_end)
814 } else {
815 // Use actual data timestamps for file naming
816 let first_ts = period_data.first().unwrap().ts_init().as_u64();
817 let last_ts = period_data.last().unwrap().ts_init().as_u64();
818 (first_ts, last_ts)
819 };
820
821 // Check again if target file exists (in case it was created during this process)
822 let target_filename = format!(
823 "{}/{}",
824 directory,
825 timestamps_to_filename(
826 UnixNanos::from(final_start_ns),
827 UnixNanos::from(final_end_ns)
828 )
829 );
830
831 if self.file_exists(&target_filename)? {
832 // Skip if target file already exists
833 continue;
834 }
835
836 // Write consolidated data for this period using write_to_parquet
837 // Use skip_disjoint_check since we're managing file removal carefully
838 let start_ts = UnixNanos::from(final_start_ns);
839 let end_ts = UnixNanos::from(final_end_ns);
840 self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
841
842 // Identify files that are completely covered by this period
843 // Only remove files AFTER successfully writing a new file
844 // Use slice copy to avoid modification during iteration (match Python logic)
845 for file in existing_files.clone() {
846 if let Some(interval) = parse_filename_timestamps(&file)
847 && interval.1 <= query_info.query_end
848 {
849 files_to_remove.insert(file.clone());
850 existing_files.retain(|f| f != &file);
851 }
852 }
853
854 // Remove files as soon as we have some to remove
855 if !files_to_remove.is_empty() {
856 for file in files_to_remove.drain() {
857 self.delete_file(&file)?;
858 }
859 }
860 }
861
862 // Remove any remaining files that weren't removed in the loop
863 // This matches the Python implementation's final cleanup step
864 // Only remove files if any consolidation actually happened (i.e., files were processed)
865 let files_were_processed = existing_files.len() < original_files_count;
866 if files_were_processed {
867 for file in existing_files {
868 self.delete_file(&file)?;
869 }
870 }
871
872 Ok(())
873 }
874
875 /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
876 ///
877 /// This auxiliary function handles all the preparation logic for consolidation:
878 /// 1. Filters intervals by time range.
879 /// 2. Groups intervals into contiguous groups.
880 /// 3. Identifies and creates split operations for data preservation.
881 /// 4. Generates period-based consolidation queries.
882 /// 5. Checks for existing target files.
883 #[allow(clippy::too_many_arguments)]
884 pub fn prepare_consolidation_queries(
885 &self,
886 type_name: &str,
887 identifier: Option<String>,
888 intervals: &[(u64, u64)],
889 period_nanos: u64,
890 start: Option<UnixNanos>,
891 end: Option<UnixNanos>,
892 ensure_contiguous_files: bool,
893 ) -> anyhow::Result<Vec<ConsolidationQuery>> {
894 // Filter intervals by time range if specified
895 let used_start = start.map(|s| s.as_u64());
896 let used_end = end.map(|e| e.as_u64());
897
898 let mut filtered_intervals = Vec::new();
899 for &(interval_start, interval_end) in intervals {
900 // Check if interval overlaps with the specified range
901 if (used_start.is_none() || used_start.unwrap() <= interval_end)
902 && (used_end.is_none() || interval_start <= used_end.unwrap())
903 {
904 filtered_intervals.push((interval_start, interval_end));
905 }
906 }
907
908 if filtered_intervals.is_empty() {
909 return Ok(Vec::new()); // No intervals in the specified range
910 }
911
912 // Check contiguity of filtered intervals if required
913 if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
914 anyhow::bail!(
915 "Intervals are not contiguous. When ensure_contiguous_files=true, \
916 all files in the consolidation range must have contiguous timestamps."
917 );
918 }
919
920 // Group intervals into contiguous groups to preserve holes between groups
921 // but allow consolidation within each contiguous group
922 let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
923
924 let mut queries_to_execute = Vec::new();
925
926 // Handle interval splitting by creating split operations for data preservation
927 if !filtered_intervals.is_empty() {
928 if let Some(start_ts) = used_start {
929 let first_interval = filtered_intervals[0];
930 if first_interval.0 < start_ts && start_ts <= first_interval.1 {
931 // Split before start: preserve data from interval_start to start-1
932 queries_to_execute.push(ConsolidationQuery {
933 query_start: first_interval.0,
934 query_end: start_ts - 1,
935 use_period_boundaries: false,
936 });
937 }
938 }
939
940 if let Some(end_ts) = used_end {
941 let last_interval = filtered_intervals[filtered_intervals.len() - 1];
942 if last_interval.0 <= end_ts && end_ts < last_interval.1 {
943 // Split after end: preserve data from end+1 to interval_end
944 queries_to_execute.push(ConsolidationQuery {
945 query_start: end_ts + 1,
946 query_end: last_interval.1,
947 use_period_boundaries: false,
948 });
949 }
950 }
951 }
952
953 // Generate period-based consolidation queries for each contiguous group
954 for group in contiguous_groups {
955 let group_start = group[0].0;
956 let group_end = group[group.len() - 1].1;
957
958 // Apply start/end filtering to the group
959 let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
960 let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
961
962 if effective_start > effective_end {
963 continue; // Skip if no overlap
964 }
965
966 // Generate period-based queries within this contiguous group
967 let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
968
969 // Add safety check to prevent infinite loops (match Python logic)
970 let max_iterations = 10000;
971 let mut iteration_count = 0;
972
973 while current_start_ns <= effective_end {
974 iteration_count += 1;
975 if iteration_count > max_iterations {
976 // Safety break to prevent infinite loops
977 break;
978 }
979 let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
980
981 // Check if target file already exists (only when ensure_contiguous_files is true)
982 if ensure_contiguous_files {
983 let directory = self.make_path(type_name, identifier.clone())?;
984 let target_filename = format!(
985 "{}/{}",
986 directory,
987 timestamps_to_filename(
988 UnixNanos::from(current_start_ns),
989 UnixNanos::from(current_end_ns)
990 )
991 );
992
993 if self.file_exists(&target_filename)? {
994 // Skip if target file already exists
995 current_start_ns += period_nanos;
996 continue;
997 }
998 }
999
1000 // Add query to execution list
1001 queries_to_execute.push(ConsolidationQuery {
1002 query_start: current_start_ns,
1003 query_end: current_end_ns,
1004 use_period_boundaries: ensure_contiguous_files,
1005 });
1006
1007 // Move to next period
1008 current_start_ns += period_nanos;
1009
1010 if current_start_ns > effective_end {
1011 break;
1012 }
1013 }
1014 }
1015
1016 // Sort queries by start date to enable efficient file removal
1017 // Files can be removed when interval[1] <= query_info["query_end"]
1018 // and processing in chronological order ensures optimal cleanup
1019 queries_to_execute.sort_by_key(|q| q.query_start);
1020
1021 Ok(queries_to_execute)
1022 }
1023
1024 /// Groups intervals into contiguous groups for efficient consolidation.
1025 ///
1026 /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1027 /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1028 /// before the start of the next interval. This grouping preserves data gaps while allowing
1029 /// consolidation within each contiguous group.
1030 ///
1031 /// # Parameters
1032 ///
1033 /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1034 ///
1035 /// # Returns
1036 ///
1037 /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1038 /// Returns an empty vector if the input is empty.
1039 ///
1040 /// # Algorithm
1041 ///
1042 /// 1. Starts with the first interval in a new group.
1043 /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1044 /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1045 /// 4. If not contiguous, starts a new group.
1046 /// 5. Returns all groups.
1047 ///
1048 /// # Examples
1049 ///
1050 /// ```text
1051 /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1052 /// Returns: [[(1,5), (6,10), (11,15)]]
1053 ///
1054 /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1055 /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1056 /// ```
1057 ///
1058 /// # Notes
1059 ///
1060 /// - Input intervals should be sorted by start timestamp.
1061 /// - Gaps between groups are preserved and not consolidated.
1062 /// - Used internally by period-based consolidation methods.
1063 #[must_use]
1064 pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1065 if intervals.is_empty() {
1066 return Vec::new();
1067 }
1068
1069 let mut contiguous_groups = Vec::new();
1070 let mut current_group = vec![intervals[0]];
1071
1072 for i in 1..intervals.len() {
1073 let prev_interval = intervals[i - 1];
1074 let curr_interval = intervals[i];
1075
1076 // Check if current interval is contiguous with previous (end + 1 == start)
1077 if prev_interval.1 + 1 == curr_interval.0 {
1078 current_group.push(curr_interval);
1079 } else {
1080 // Gap found, start new group
1081 contiguous_groups.push(current_group);
1082 current_group = vec![curr_interval];
1083 }
1084 }
1085
1086 // Add the last group
1087 contiguous_groups.push(current_group);
1088
1089 contiguous_groups
1090 }
1091
1092 /// Checks if a file exists in the object store.
1093 ///
1094 /// This method performs a HEAD operation on the object store to determine if a file
1095 /// exists without downloading its content. It works with both local and remote object stores.
1096 ///
1097 /// # Parameters
1098 ///
1099 /// - `path`: The file path to check, relative to the catalog structure.
1100 ///
1101 /// # Returns
1102 ///
1103 /// Returns `true` if the file exists, `false` if it doesn't exist.
1104 ///
1105 /// # Errors
1106 ///
1107 /// Returns an error if the object store operation fails due to network issues,
1108 /// authentication problems, or other I/O errors.
1109 fn file_exists(&self, path: &str) -> anyhow::Result<bool> {
1110 let object_path = self.to_object_path(path);
1111 let exists =
1112 self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1113 Ok(exists)
1114 }
1115
1116 /// Deletes a file from the object store.
1117 ///
1118 /// This method removes a file from the object store. The operation is permanent
1119 /// and cannot be undone. It works with both local filesystems and remote object stores.
1120 ///
1121 /// # Parameters
1122 ///
1123 /// - `path`: The file path to delete, relative to the catalog structure.
1124 ///
1125 /// # Returns
1126 ///
1127 /// Returns `Ok(())` on successful deletion.
1128 ///
1129 /// # Errors
1130 ///
1131 /// Returns an error if:
1132 /// - The file doesn't exist.
1133 /// - Permission is denied.
1134 /// - Network issues occur (for remote stores).
1135 /// - The object store operation fails.
1136 ///
1137 /// # Safety
1138 ///
1139 /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1140 fn delete_file(&self, path: &str) -> anyhow::Result<()> {
1141 let object_path = self.to_object_path(path);
1142 self.execute_async(async {
1143 self.object_store
1144 .delete(&object_path)
1145 .await
1146 .map_err(anyhow::Error::from)
1147 })?;
1148 Ok(())
1149 }
1150
1151 /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1152 ///
1153 /// This method scans all leaf data directories in the catalog and renames files based on
1154 /// the actual timestamp range of their content. This is useful when files have been
1155 /// modified or when filename conventions have changed.
1156 ///
1157 /// # Returns
1158 ///
1159 /// Returns `Ok(())` on success, or an error if the operation fails.
1160 ///
1161 /// # Errors
1162 ///
1163 /// Returns an error if:
1164 /// - Directory listing fails.
1165 /// - File metadata reading fails.
1166 /// - File rename operations fail.
1167 /// - Interval validation fails after renaming.
1168 ///
1169 /// # Examples
1170 ///
1171 /// ```rust,no_run
1172 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1173 ///
1174 /// let catalog = ParquetDataCatalog::new(/* ... */);
1175 ///
1176 /// // Reset all filenames in the catalog
1177 /// catalog.reset_all_file_names()?;
1178 /// # Ok::<(), anyhow::Error>(())
1179 /// ```
1180 pub fn reset_all_file_names(&self) -> anyhow::Result<()> {
1181 let leaf_directories = self.find_leaf_data_directories()?;
1182
1183 for directory in leaf_directories {
1184 self.reset_file_names(&directory)?;
1185 }
1186
1187 Ok(())
1188 }
1189
1190 /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1191 ///
1192 /// This method renames files in a specific directory based on the actual timestamp
1193 /// range of their content. This is useful for correcting filenames after data
1194 /// modifications or when filename conventions have changed.
1195 ///
1196 /// # Parameters
1197 ///
1198 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1199 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1200 ///
1201 /// # Returns
1202 ///
1203 /// Returns `Ok(())` on success, or an error if the operation fails.
1204 ///
1205 /// # Errors
1206 ///
1207 /// Returns an error if:
1208 /// - The directory path cannot be constructed.
1209 /// - File metadata reading fails.
1210 /// - File rename operations fail.
1211 /// - Interval validation fails after renaming.
1212 ///
1213 /// # Examples
1214 ///
1215 /// ```rust,no_run
1216 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1217 ///
1218 /// let catalog = ParquetDataCatalog::new(/* ... */);
1219 ///
1220 /// // Reset filenames for all quote files
1221 /// catalog.reset_data_file_names("quotes", None)?;
1222 ///
1223 /// // Reset filenames for a specific instrument's trade files
1224 /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1225 /// # Ok::<(), anyhow::Error>(())
1226 /// ```
1227 pub fn reset_data_file_names(
1228 &self,
1229 data_cls: &str,
1230 instrument_id: Option<String>,
1231 ) -> anyhow::Result<()> {
1232 let directory = self.make_path(data_cls, instrument_id)?;
1233 self.reset_file_names(&directory)
1234 }
1235
1236 /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1237 ///
1238 /// This internal method scans all Parquet files in a directory, reads their metadata to
1239 /// determine the actual timestamp range of their content, and renames the files accordingly.
1240 /// This ensures that filenames accurately reflect the data they contain.
1241 ///
1242 /// # Parameters
1243 ///
1244 /// - `directory`: The directory path containing Parquet files to rename.
1245 ///
1246 /// # Returns
1247 ///
1248 /// Returns `Ok(())` on success, or an error if the operation fails.
1249 ///
1250 /// # Process
1251 ///
1252 /// 1. Lists all Parquet files in the directory
1253 /// 2. For each file, reads metadata to extract min/max timestamps
1254 /// 3. Generates a new filename based on actual timestamp range
1255 /// 4. Moves the file to the new name using object store operations
1256 /// 5. Validates that intervals remain disjoint after renaming
1257 ///
1258 /// # Errors
1259 ///
1260 /// Returns an error if:
1261 /// - Directory listing fails.
1262 /// - Metadata reading fails for any file.
1263 /// - File move operations fail.
1264 /// - Interval validation fails after renaming.
1265 /// - Object store operations fail.
1266 ///
1267 /// # Notes
1268 ///
1269 /// - This operation can be time-consuming for directories with many files.
1270 /// - Files are processed sequentially to avoid conflicts.
1271 /// - The operation is atomic per file but not across the entire directory.
1272 fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1273 let parquet_files = self.list_parquet_files(directory)?;
1274
1275 for file in parquet_files {
1276 let object_path = ObjectPath::from(file.as_str());
1277 let (first_ts, last_ts) = self.execute_async(async {
1278 min_max_from_parquet_metadata_object_store(
1279 self.object_store.clone(),
1280 &object_path,
1281 "ts_init",
1282 )
1283 .await
1284 })?;
1285
1286 let new_filename =
1287 timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1288 let new_file_path = make_object_store_path(directory, &[&new_filename]);
1289 let new_object_path = ObjectPath::from(new_file_path);
1290
1291 self.move_file(&object_path, &new_object_path)?;
1292 }
1293
1294 let intervals = self.get_directory_intervals(directory)?;
1295
1296 if !are_intervals_disjoint(&intervals) {
1297 anyhow::bail!("Intervals are not disjoint after resetting file names");
1298 }
1299
1300 Ok(())
1301 }
1302
1303 /// Finds all leaf data directories in the catalog.
1304 ///
1305 /// A leaf directory is one that contains data files but no subdirectories.
1306 /// This method is used to identify directories that can be processed for
1307 /// consolidation or other operations.
1308 ///
1309 /// # Returns
1310 ///
1311 /// Returns a vector of directory path strings representing leaf directories,
1312 /// or an error if directory traversal fails.
1313 ///
1314 /// # Errors
1315 ///
1316 /// Returns an error if:
1317 /// - Object store listing operations fail.
1318 /// - Directory structure cannot be analyzed.
1319 ///
1320 /// # Examples
1321 ///
1322 /// ```rust,no_run
1323 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1324 ///
1325 /// let catalog = ParquetDataCatalog::new(/* ... */);
1326 ///
1327 /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1328 /// for dir in leaf_dirs {
1329 /// println!("Found leaf directory: {}", dir);
1330 /// }
1331 /// # Ok::<(), anyhow::Error>(())
1332 /// ```
1333 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1334 let data_dir = make_object_store_path(&self.base_path, &["data"]);
1335
1336 let leaf_dirs = self.execute_async(async {
1337 let mut all_paths = AHashSet::new();
1338 let mut directories = AHashSet::new();
1339 let mut files_in_dirs = AHashMap::new();
1340
1341 // List all objects under the data directory
1342 let prefix = ObjectPath::from(format!("{data_dir}/"));
1343 let mut stream = self.object_store.list(Some(&prefix));
1344
1345 while let Some(object) = stream.next().await {
1346 let object = object?;
1347 let path_str = object.location.to_string();
1348 all_paths.insert(path_str.clone());
1349
1350 // Extract directory path
1351 if let Some(parent) = std::path::Path::new(&path_str).parent() {
1352 let parent_str = parent.to_string_lossy().to_string();
1353 directories.insert(parent_str.clone());
1354
1355 // Track files in each directory
1356 files_in_dirs
1357 .entry(parent_str)
1358 .or_insert_with(Vec::new)
1359 .push(path_str);
1360 }
1361 }
1362
1363 // Find leaf directories (directories with files but no subdirectories)
1364 let mut leaf_dirs = Vec::new();
1365 for dir in &directories {
1366 let has_files = files_in_dirs
1367 .get(dir)
1368 .is_some_and(|files| !files.is_empty());
1369 let has_subdirs = directories
1370 .iter()
1371 .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1372
1373 if has_files && !has_subdirs {
1374 leaf_dirs.push(dir.clone());
1375 }
1376 }
1377
1378 Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1379 })?;
1380
1381 Ok(leaf_dirs)
1382 }
1383
1384 /// Deletes data within a specified time range for a specific data type and instrument.
1385 ///
1386 /// This method identifies all parquet files that intersect with the specified time range
1387 /// and handles them appropriately:
1388 /// - Files completely within the range are deleted
1389 /// - Files partially overlapping the range are split to preserve data outside the range
1390 /// - The original intersecting files are removed after processing
1391 ///
1392 /// # Parameters
1393 ///
1394 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1395 /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1396 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1397 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1398 ///
1399 /// # Returns
1400 ///
1401 /// Returns `Ok(())` on success, or an error if deletion fails.
1402 ///
1403 /// # Errors
1404 ///
1405 /// Returns an error if:
1406 /// - The directory path cannot be constructed.
1407 /// - File operations fail.
1408 /// - Data querying or writing fails.
1409 ///
1410 /// # Notes
1411 ///
1412 /// - This operation permanently removes data and cannot be undone.
1413 /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1414 /// - The method ensures data integrity by using atomic operations where possible.
1415 /// - Empty directories are not automatically removed after deletion.
1416 ///
1417 /// # Examples
1418 ///
1419 /// ```rust,no_run
1420 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1421 /// use nautilus_core::UnixNanos;
1422 ///
1423 /// let catalog = ParquetDataCatalog::new(/* ... */);
1424 ///
1425 /// // Delete all quote data for a specific instrument
1426 /// catalog.delete_data_range(
1427 /// "quotes",
1428 /// Some("BTCUSD".to_string()),
1429 /// None,
1430 /// None
1431 /// )?;
1432 ///
1433 /// // Delete trade data within a specific time range
1434 /// catalog.delete_data_range(
1435 /// "trades",
1436 /// None,
1437 /// Some(UnixNanos::from(1609459200000000000)),
1438 /// Some(UnixNanos::from(1609545600000000000))
1439 /// )?;
1440 /// # Ok::<(), anyhow::Error>(())
1441 /// ```
1442 pub fn delete_data_range(
1443 &mut self,
1444 type_name: &str,
1445 identifier: Option<String>,
1446 start: Option<UnixNanos>,
1447 end: Option<UnixNanos>,
1448 ) -> anyhow::Result<()> {
1449 // Use match statement to call the generic delete_data_range for various types
1450 match type_name {
1451 "quotes" => self.delete_data_range_generic::<QuoteTick>(identifier, start, end),
1452 "trades" => self.delete_data_range_generic::<TradeTick>(identifier, start, end),
1453 "bars" => self.delete_data_range_generic::<Bar>(identifier, start, end),
1454 "order_book_deltas" => {
1455 self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1456 }
1457 "order_book_depth10" => {
1458 self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1459 }
1460 _ => anyhow::bail!("Unsupported data type: {type_name}"),
1461 }
1462 }
1463
1464 /// Deletes data within a specified time range across the entire catalog.
1465 ///
1466 /// This method identifies all leaf directories in the catalog that contain parquet files
1467 /// and deletes data within the specified time range from each directory. A leaf directory
1468 /// is one that contains files but no subdirectories. This is a convenience method that
1469 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1470 ///
1471 /// # Parameters
1472 ///
1473 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1474 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1475 ///
1476 /// # Returns
1477 ///
1478 /// Returns `Ok(())` on success, or an error if deletion fails.
1479 ///
1480 /// # Errors
1481 ///
1482 /// Returns an error if:
1483 /// - Directory traversal fails.
1484 /// - Data class extraction from paths fails.
1485 /// - Individual delete operations fail.
1486 ///
1487 /// # Notes
1488 ///
1489 /// - This operation permanently removes data and cannot be undone.
1490 /// - The deletion process handles file intersections intelligently by splitting files
1491 /// when they partially overlap with the deletion range.
1492 /// - Files completely within the deletion range are removed entirely.
1493 /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1494 /// - This method is useful for bulk data cleanup operations across the entire catalog.
1495 /// - Empty directories are not automatically removed after deletion.
1496 ///
1497 /// # Examples
1498 ///
1499 /// ```rust,no_run
1500 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1501 /// use nautilus_core::UnixNanos;
1502 ///
1503 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1504 ///
1505 /// // Delete all data before a specific date across entire catalog
1506 /// catalog.delete_catalog_range(
1507 /// None,
1508 /// Some(UnixNanos::from(1609459200000000000))
1509 /// )?;
1510 ///
1511 /// // Delete all data within a specific range across entire catalog
1512 /// catalog.delete_catalog_range(
1513 /// Some(UnixNanos::from(1609459200000000000)),
1514 /// Some(UnixNanos::from(1609545600000000000))
1515 /// )?;
1516 ///
1517 /// // Delete all data after a specific date across entire catalog
1518 /// catalog.delete_catalog_range(
1519 /// Some(UnixNanos::from(1609459200000000000)),
1520 /// None
1521 /// )?;
1522 /// # Ok::<(), anyhow::Error>(())
1523 /// ```
1524 pub fn delete_catalog_range(
1525 &mut self,
1526 start: Option<UnixNanos>,
1527 end: Option<UnixNanos>,
1528 ) -> anyhow::Result<()> {
1529 let leaf_directories = self.find_leaf_data_directories()?;
1530
1531 for directory in leaf_directories {
1532 if let Ok((Some(data_type), identifier)) =
1533 self.extract_data_cls_and_identifier_from_path(&directory)
1534 {
1535 // Call the existing delete_data_range method
1536 if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1537 eprintln!("Failed to delete data in directory {directory}: {e}");
1538 // Continue with other directories instead of failing completely
1539 }
1540 }
1541 }
1542
1543 Ok(())
1544 }
1545
1546 /// Generic implementation for deleting data within a specified time range.
1547 ///
1548 /// This method provides the core deletion logic that works with any data type
1549 /// that implements the required traits. It handles file intersection analysis,
1550 /// data splitting for partial overlaps, and file cleanup.
1551 ///
1552 /// # Type Parameters
1553 ///
1554 /// - `T`: The data type that implements required traits for catalog operations.
1555 ///
1556 /// # Parameters
1557 ///
1558 /// - `identifier`: Optional instrument ID to delete data for.
1559 /// - `start`: Optional start timestamp for the deletion range.
1560 /// - `end`: Optional end timestamp for the deletion range.
1561 ///
1562 /// # Returns
1563 ///
1564 /// Returns `Ok(())` on success, or an error if deletion fails.
1565 pub fn delete_data_range_generic<T>(
1566 &mut self,
1567 identifier: Option<String>,
1568 start: Option<UnixNanos>,
1569 end: Option<UnixNanos>,
1570 ) -> anyhow::Result<()>
1571 where
1572 T: DecodeDataFromRecordBatch
1573 + CatalogPathPrefix
1574 + EncodeToRecordBatch
1575 + HasTsInit
1576 + TryFrom<Data>
1577 + Clone,
1578 {
1579 // Get intervals for cleaner implementation
1580 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1581
1582 if intervals.is_empty() {
1583 return Ok(()); // No files to process
1584 }
1585
1586 // Prepare all operations for execution
1587 let operations_to_execute = self.prepare_delete_operations(
1588 T::path_prefix(),
1589 identifier.clone(),
1590 &intervals,
1591 start,
1592 end,
1593 )?;
1594
1595 if operations_to_execute.is_empty() {
1596 return Ok(()); // No operations to execute
1597 }
1598
1599 // Execute all operations
1600 let mut files_to_remove = AHashSet::<String>::new();
1601
1602 for operation in operations_to_execute {
1603 // Reset the session before each operation to ensure fresh data is loaded
1604 // This clears any cached table registrations that might interfere with file operations
1605 self.reset_session();
1606 match operation.operation_type.as_str() {
1607 "split_before" => {
1608 // Query data before the deletion range and write it
1609 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1610 let before_data = self.query_typed_data::<T>(
1611 instrument_ids,
1612 Some(UnixNanos::from(operation.query_start)),
1613 Some(UnixNanos::from(operation.query_end)),
1614 None,
1615 Some(operation.files.clone()),
1616 )?;
1617
1618 if !before_data.is_empty() {
1619 let start_ts = UnixNanos::from(operation.file_start_ns);
1620 let end_ts = UnixNanos::from(operation.file_end_ns);
1621 self.write_to_parquet(
1622 before_data,
1623 Some(start_ts),
1624 Some(end_ts),
1625 Some(true),
1626 )?;
1627 }
1628 }
1629 "split_after" => {
1630 // Query data after the deletion range and write it
1631 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1632 let after_data = self.query_typed_data::<T>(
1633 instrument_ids,
1634 Some(UnixNanos::from(operation.query_start)),
1635 Some(UnixNanos::from(operation.query_end)),
1636 None,
1637 Some(operation.files.clone()),
1638 )?;
1639
1640 if !after_data.is_empty() {
1641 let start_ts = UnixNanos::from(operation.file_start_ns);
1642 let end_ts = UnixNanos::from(operation.file_end_ns);
1643 self.write_to_parquet(
1644 after_data,
1645 Some(start_ts),
1646 Some(end_ts),
1647 Some(true),
1648 )?;
1649 }
1650 }
1651 _ => {
1652 // For "remove" operations, just mark files for removal
1653 }
1654 }
1655
1656 // Mark files for removal (applies to all operation types)
1657 for file in operation.files {
1658 files_to_remove.insert(file);
1659 }
1660 }
1661
1662 // Remove all files that were processed
1663 for file in files_to_remove {
1664 if let Err(e) = self.delete_file(&file) {
1665 eprintln!("Failed to delete file {file}: {e}");
1666 }
1667 }
1668
1669 Ok(())
1670 }
1671
1672 /// Prepares all operations for data deletion by identifying files that need to be
1673 /// split or removed.
1674 ///
1675 /// This auxiliary function handles all the preparation logic for deletion:
1676 /// 1. Filters intervals by time range
1677 /// 2. Identifies files that intersect with the deletion range
1678 /// 3. Creates split operations for files that partially overlap
1679 /// 4. Generates removal operations for files completely within the range
1680 ///
1681 /// # Parameters
1682 ///
1683 /// - `type_name`: The data type directory name for path generation.
1684 /// - `identifier`: Optional instrument identifier for path generation.
1685 /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1686 /// - `start`: Optional start timestamp for deletion range.
1687 /// - `end`: Optional end timestamp for deletion range.
1688 ///
1689 /// # Returns
1690 ///
1691 /// Returns a vector of `DeleteOperation` structs ready for execution.
1692 pub fn prepare_delete_operations(
1693 &self,
1694 type_name: &str,
1695 identifier: Option<String>,
1696 intervals: &[(u64, u64)],
1697 start: Option<UnixNanos>,
1698 end: Option<UnixNanos>,
1699 ) -> anyhow::Result<Vec<DeleteOperation>> {
1700 // Convert start/end to nanoseconds
1701 let delete_start_ns = start.map(|s| s.as_u64());
1702 let delete_end_ns = end.map(|e| e.as_u64());
1703
1704 let mut operations = Vec::new();
1705
1706 // Get directory for file path construction
1707 let directory = self.make_path(type_name, identifier)?;
1708
1709 // Process each interval (which represents an actual file)
1710 for &(file_start_ns, file_end_ns) in intervals {
1711 // Check if file intersects with deletion range
1712 let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1713 && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1714
1715 if !intersects {
1716 continue; // File doesn't intersect with deletion range
1717 }
1718
1719 // Construct file path from interval timestamps
1720 let filename = timestamps_to_filename(
1721 UnixNanos::from(file_start_ns),
1722 UnixNanos::from(file_end_ns),
1723 );
1724 let file_path = make_object_store_path(&directory, &[&filename]);
1725
1726 // Determine what type of operation is needed
1727 let file_completely_within_range = (delete_start_ns.is_none()
1728 || delete_start_ns.unwrap() <= file_start_ns)
1729 && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1730
1731 if file_completely_within_range {
1732 // File is completely within deletion range - just mark for removal
1733 operations.push(DeleteOperation {
1734 operation_type: "remove".to_string(),
1735 files: vec![file_path],
1736 query_start: 0,
1737 query_end: 0,
1738 file_start_ns: 0,
1739 file_end_ns: 0,
1740 });
1741 } else {
1742 // File partially overlaps - need to split
1743 if let Some(delete_start) = delete_start_ns
1744 && file_start_ns < delete_start
1745 {
1746 // Keep data before deletion range
1747 operations.push(DeleteOperation {
1748 operation_type: "split_before".to_string(),
1749 files: vec![file_path.clone()],
1750 query_start: file_start_ns,
1751 query_end: delete_start.saturating_sub(1), // Exclusive end
1752 file_start_ns,
1753 file_end_ns: delete_start.saturating_sub(1),
1754 });
1755 }
1756
1757 if let Some(delete_end) = delete_end_ns
1758 && delete_end < file_end_ns
1759 {
1760 // Keep data after deletion range
1761 operations.push(DeleteOperation {
1762 operation_type: "split_after".to_string(),
1763 files: vec![file_path.clone()],
1764 query_start: delete_end.saturating_add(1), // Exclusive start
1765 query_end: file_end_ns,
1766 file_start_ns: delete_end.saturating_add(1),
1767 file_end_ns,
1768 });
1769 }
1770 }
1771 }
1772
1773 Ok(operations)
1774 }
1775}