nautilus_persistence/backend/
catalog_operations.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use ahash::{AHashMap, AHashSet};
22use futures::StreamExt;
23use nautilus_core::UnixNanos;
24use nautilus_model::data::{
25    Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
26    QuoteTick, TradeTick, close::InstrumentClose,
27};
28use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
29use object_store::path::Path as ObjectPath;
30
31use crate::{
32    backend::catalog::{
33        CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
34        extract_path_components, make_object_store_path, parse_filename_timestamps,
35        timestamps_to_filename,
36    },
37    parquet::{
38        combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
39    },
40};
41
42/// Information about a consolidation query to be executed.
43///
44/// This struct encapsulates all the information needed to execute a single consolidation
45/// operation, including the data range to query and file naming strategy.
46///
47/// # Fields
48///
49/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
50/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
51/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
52///
53/// # Usage
54///
55/// This struct is used internally by the consolidation system to plan and execute
56/// data consolidation operations. It allows the system to:
57/// - Separate query planning from execution.
58/// - Handle complex scenarios like data splitting.
59/// - Optimize file naming strategies.
60/// - Batch multiple operations efficiently.
61/// - Maintain file contiguity across periods.
62///
63/// # Examples
64///
65/// ```rust,no_run
66/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
67///
68/// // Regular consolidation query
69/// let query = ConsolidationQuery {
70///     query_start: 1609459200000000000,
71///     query_end: 1609545600000000000,
72///     use_period_boundaries: true,
73/// };
74///
75/// // Split operation to preserve data
76/// let split_query = ConsolidationQuery {
77///     query_start: 1609459200000000000,
78///     query_end: 1609462800000000000,
79///     use_period_boundaries: false,
80/// };
81/// ```
82#[derive(Debug, Clone)]
83pub struct ConsolidationQuery {
84    /// Start timestamp for the query range (inclusive, in nanoseconds)
85    pub query_start: u64,
86    /// End timestamp for the query range (inclusive, in nanoseconds)
87    pub query_end: u64,
88    /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
89    pub use_period_boundaries: bool,
90}
91
92/// Information about a deletion operation to be executed.
93///
94/// This struct encapsulates all the information needed to execute a single deletion
95/// operation, including the type of operation and file handling details.
96#[derive(Debug, Clone)]
97pub struct DeleteOperation {
98    /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
99    pub operation_type: String,
100    /// List of files involved in this operation.
101    pub files: Vec<String>,
102    /// Start timestamp for data query (used for split operations).
103    pub query_start: u64,
104    /// End timestamp for data query (used for split operations).
105    pub query_end: u64,
106    /// Start timestamp for new file naming (used for split operations).
107    pub file_start_ns: u64,
108    /// End timestamp for new file naming (used for split operations).
109    pub file_end_ns: u64,
110}
111
112impl ParquetDataCatalog {
113    /// Consolidates all data files in the catalog.
114    ///
115    /// This method identifies all leaf directories in the catalog that contain parquet files
116    /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
117    /// This is a convenience method that effectively calls `consolidate_data` for all data types
118    /// and instrument IDs in the catalog.
119    ///
120    /// # Parameters
121    ///
122    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
123    ///   greater than or equal to this value will be consolidated. If None, all files
124    ///   from the beginning of time will be considered.
125    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
126    ///   less than or equal to this value will be consolidated. If None, all files
127    ///   up to the end of time will be considered.
128    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
129    ///
130    /// # Returns
131    ///
132    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if:
137    /// - Directory listing fails.
138    /// - File consolidation operations fail.
139    /// - Interval validation fails (when `ensure_contiguous_files` is true).
140    ///
141    /// # Examples
142    ///
143    /// ```rust,no_run
144    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
145    /// use nautilus_core::UnixNanos;
146    ///
147    /// let catalog = ParquetDataCatalog::new(/* ... */);
148    ///
149    /// // Consolidate all files in the catalog
150    /// catalog.consolidate_catalog(None, None, None)?;
151    ///
152    /// // Consolidate only files within a specific time range
153    /// catalog.consolidate_catalog(
154    ///     Some(UnixNanos::from(1609459200000000000)),
155    ///     Some(UnixNanos::from(1609545600000000000)),
156    ///     Some(true)
157    /// )?;
158    /// # Ok::<(), anyhow::Error>(())
159    /// ```
160    pub fn consolidate_catalog(
161        &self,
162        start: Option<UnixNanos>,
163        end: Option<UnixNanos>,
164        ensure_contiguous_files: Option<bool>,
165    ) -> anyhow::Result<()> {
166        let leaf_directories = self.find_leaf_data_directories()?;
167
168        for directory in leaf_directories {
169            self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
170        }
171
172        Ok(())
173    }
174
175    /// Consolidates data files for a specific data type and instrument.
176    ///
177    /// This method consolidates Parquet files within a specific directory (defined by data type
178    /// and optional instrument ID) by merging multiple files into a single file. This improves
179    /// query performance and can reduce storage overhead.
180    ///
181    /// # Parameters
182    ///
183    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
184    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
185    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
186    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
187    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
188    ///
189    /// # Returns
190    ///
191    /// Returns `Ok(())` on success, or an error if consolidation fails.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if:
196    /// - The directory path cannot be constructed.
197    /// - File consolidation operations fail.
198    /// - Interval validation fails (when `ensure_contiguous_files` is true).
199    ///
200    /// # Examples
201    ///
202    /// ```rust,no_run
203    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
204    /// use nautilus_core::UnixNanos;
205    ///
206    /// let catalog = ParquetDataCatalog::new(/* ... */);
207    ///
208    /// // Consolidate all quote files for a specific instrument
209    /// catalog.consolidate_data(
210    ///     "quotes",
211    ///     Some("BTCUSD".to_string()),
212    ///     None,
213    ///     None,
214    ///     None
215    /// )?;
216    ///
217    /// // Consolidate trade files within a time range
218    /// catalog.consolidate_data(
219    ///     "trades",
220    ///     None,
221    ///     Some(UnixNanos::from(1609459200000000000)),
222    ///     Some(UnixNanos::from(1609545600000000000)),
223    ///     Some(true)
224    /// )?;
225    /// # Ok::<(), anyhow::Error>(())
226    /// ```
227    pub fn consolidate_data(
228        &self,
229        type_name: &str,
230        instrument_id: Option<String>,
231        start: Option<UnixNanos>,
232        end: Option<UnixNanos>,
233        ensure_contiguous_files: Option<bool>,
234    ) -> anyhow::Result<()> {
235        let directory = self.make_path(type_name, instrument_id)?;
236        self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
237    }
238
239    /// Consolidates Parquet files within a specific directory by merging them into a single file.
240    ///
241    /// This internal method performs the actual consolidation work for a single directory.
242    /// It identifies files within the specified time range, validates their intervals,
243    /// and combines them into a single Parquet file with optimized storage.
244    ///
245    /// # Parameters
246    ///
247    /// - `directory`: The directory path containing Parquet files to consolidate.
248    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
249    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
250    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
251    ///
252    /// # Returns
253    ///
254    /// Returns `Ok(())` on success, or an error if consolidation fails.
255    ///
256    /// # Behavior
257    ///
258    /// - Skips consolidation if directory contains 1 or fewer files.
259    /// - Filters files by timestamp range if start/end are specified.
260    /// - Sorts intervals by start timestamp before consolidation.
261    /// - Creates a new file spanning the entire time range of input files.
262    /// - Validates interval disjointness after consolidation (if enabled).
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if:
267    /// - Directory listing fails.
268    /// - File combination operations fail.
269    /// - Interval validation fails (when `ensure_contiguous_files` is true).
270    /// - Object store operations fail.
271    fn consolidate_directory(
272        &self,
273        directory: &str,
274        start: Option<UnixNanos>,
275        end: Option<UnixNanos>,
276        ensure_contiguous_files: Option<bool>,
277    ) -> anyhow::Result<()> {
278        let parquet_files = self.list_parquet_files(directory)?;
279
280        if parquet_files.len() <= 1 {
281            return Ok(());
282        }
283
284        let mut files_to_consolidate = Vec::new();
285        let mut intervals = Vec::new();
286        let start = start.map(|t| t.as_u64());
287        let end = end.map(|t| t.as_u64());
288
289        for file in parquet_files {
290            if let Some(interval) = parse_filename_timestamps(&file) {
291                let (interval_start, interval_end) = interval;
292                let include_file = match (start, end) {
293                    (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
294                    (Some(s), None) => interval_start >= s,
295                    (None, Some(e)) => interval_end <= e,
296                    (None, None) => true,
297                };
298
299                if include_file {
300                    files_to_consolidate.push(file);
301                    intervals.push(interval);
302                }
303            }
304        }
305
306        intervals.sort_by_key(|&(start, _)| start);
307
308        if !intervals.is_empty() {
309            let file_name = timestamps_to_filename(
310                UnixNanos::from(intervals[0].0),
311                UnixNanos::from(intervals.last().unwrap().1),
312            );
313            let path = make_object_store_path(directory, &[&file_name]);
314
315            // Convert string paths to ObjectPath for the function call
316            let object_paths: Vec<ObjectPath> = files_to_consolidate
317                .iter()
318                .map(|path| ObjectPath::from(path.as_str()))
319                .collect();
320
321            self.execute_async(async {
322                combine_parquet_files_from_object_store(
323                    self.object_store.clone(),
324                    object_paths,
325                    &ObjectPath::from(path),
326                    Some(self.compression),
327                    Some(self.max_row_group_size),
328                )
329                .await
330            })?;
331        }
332
333        if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
334            anyhow::bail!("Intervals are not disjoint after consolidating a directory");
335        }
336
337        Ok(())
338    }
339
340    /// Consolidates all data files in the catalog by splitting them into fixed time periods.
341    ///
342    /// This method identifies all leaf directories in the catalog that contain parquet files
343    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
344    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
345    /// and instrument IDs in the catalog.
346    ///
347    /// # Parameters
348    ///
349    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
350    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
351    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
352    ///   greater than or equal to this value will be consolidated. If None, all files
353    ///   from the beginning of time will be considered.
354    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
355    ///   less than or equal to this value will be consolidated. If None, all files
356    ///   up to the end of time will be considered.
357    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
358    ///   If false, uses actual data timestamps for file naming.
359    ///
360    /// # Returns
361    ///
362    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if:
367    /// - Directory listing fails.
368    /// - Data type extraction from path fails.
369    /// - Period-based consolidation operations fail.
370    ///
371    /// # Notes
372    ///
373    /// - This operation can be resource-intensive for large catalogs with many data types.
374    ///   and instruments.
375    /// - The consolidation process splits data into fixed time periods rather than combining.
376    ///   all files into a single file per directory.
377    /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
378    /// - Original files are removed and replaced with period-based consolidated files.
379    /// - This method is useful for periodic maintenance of the catalog to standardize.
380    ///   file organization by time periods.
381    ///
382    /// # Examples
383    ///
384    /// ```rust,no_run
385    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
386    /// use nautilus_core::UnixNanos;
387    ///
388    /// let catalog = ParquetDataCatalog::new(/* ... */);
389    ///
390    /// // Consolidate all files in the catalog by 1-day periods
391    /// catalog.consolidate_catalog_by_period(
392    ///     Some(86400000000000), // 1 day in nanoseconds
393    ///     None,
394    ///     None,
395    ///     Some(true)
396    /// )?;
397    ///
398    /// // Consolidate only files within a specific time range by 1-hour periods
399    /// catalog.consolidate_catalog_by_period(
400    ///     Some(3600000000000), // 1 hour in nanoseconds
401    ///     Some(UnixNanos::from(1609459200000000000)),
402    ///     Some(UnixNanos::from(1609545600000000000)),
403    ///     Some(false)
404    /// )?;
405    /// # Ok::<(), anyhow::Error>(())
406    /// ```
407    pub fn consolidate_catalog_by_period(
408        &mut self,
409        period_nanos: Option<u64>,
410        start: Option<UnixNanos>,
411        end: Option<UnixNanos>,
412        ensure_contiguous_files: Option<bool>,
413    ) -> anyhow::Result<()> {
414        let leaf_directories = self.find_leaf_data_directories()?;
415
416        for directory in leaf_directories {
417            let (data_cls, identifier) =
418                self.extract_data_cls_and_identifier_from_path(&directory)?;
419
420            if let Some(data_cls_name) = data_cls {
421                // Use match statement to call the generic consolidate_data_by_period for various types
422                match data_cls_name.as_str() {
423                    "quotes" => {
424                        self.consolidate_data_by_period_generic::<QuoteTick>(
425                            identifier,
426                            period_nanos,
427                            start,
428                            end,
429                            ensure_contiguous_files,
430                        )?;
431                    }
432                    "trades" => {
433                        self.consolidate_data_by_period_generic::<TradeTick>(
434                            identifier,
435                            period_nanos,
436                            start,
437                            end,
438                            ensure_contiguous_files,
439                        )?;
440                    }
441                    "order_book_deltas" => {
442                        self.consolidate_data_by_period_generic::<OrderBookDelta>(
443                            identifier,
444                            period_nanos,
445                            start,
446                            end,
447                            ensure_contiguous_files,
448                        )?;
449                    }
450                    "order_book_depths" => {
451                        self.consolidate_data_by_period_generic::<OrderBookDepth10>(
452                            identifier,
453                            period_nanos,
454                            start,
455                            end,
456                            ensure_contiguous_files,
457                        )?;
458                    }
459                    "bars" => {
460                        self.consolidate_data_by_period_generic::<Bar>(
461                            identifier,
462                            period_nanos,
463                            start,
464                            end,
465                            ensure_contiguous_files,
466                        )?;
467                    }
468                    "index_prices" => {
469                        self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
470                            identifier,
471                            period_nanos,
472                            start,
473                            end,
474                            ensure_contiguous_files,
475                        )?;
476                    }
477                    "mark_prices" => {
478                        self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
479                            identifier,
480                            period_nanos,
481                            start,
482                            end,
483                            ensure_contiguous_files,
484                        )?;
485                    }
486                    "instrument_closes" => {
487                        self.consolidate_data_by_period_generic::<InstrumentClose>(
488                            identifier,
489                            period_nanos,
490                            start,
491                            end,
492                            ensure_contiguous_files,
493                        )?;
494                    }
495                    _ => {
496                        // Skip unknown data types
497                        log::warn!("Unknown data type for consolidation: {data_cls_name}");
498                        continue;
499                    }
500                }
501            }
502        }
503
504        Ok(())
505    }
506
507    /// Extracts data class and identifier from a directory path.
508    ///
509    /// This method parses a directory path to extract the data type and optional
510    /// instrument identifier. It's used to determine what type of data consolidation
511    /// to perform for each directory.
512    ///
513    /// # Parameters
514    ///
515    /// - `path`: The directory path to parse.
516    ///
517    /// # Returns
518    ///
519    /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
520    pub fn extract_data_cls_and_identifier_from_path(
521        &self,
522        path: &str,
523    ) -> anyhow::Result<(Option<String>, Option<String>)> {
524        // Use cross-platform path parsing
525        let path_components = extract_path_components(path);
526
527        // Find the "data" directory in the path
528        if let Some(data_index) = path_components.iter().position(|part| part == "data")
529            && data_index + 1 < path_components.len()
530        {
531            let data_cls = path_components[data_index + 1].clone();
532
533            // Check if there's an identifier (instrument ID) after the data class
534            let identifier = if data_index + 2 < path_components.len() {
535                Some(path_components[data_index + 2].clone())
536            } else {
537                None
538            };
539
540            return Ok((Some(data_cls), identifier));
541        }
542
543        // If we can't parse the path, return None for both
544        Ok((None, None))
545    }
546
547    /// Consolidates data files by splitting them into fixed time periods.
548    ///
549    /// This method queries data by period and writes consolidated files immediately,
550    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
551    /// the function automatically splits those files to preserve all data.
552    ///
553    /// # Parameters
554    ///
555    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
556    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
557    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
558    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
559    /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
560    ///   If specified and intersects existing files, those files will be split to preserve
561    ///   data outside the consolidation range.
562    /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
563    ///   If specified and intersects existing files, those files will be split to preserve
564    ///   data outside the consolidation range.
565    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
566    ///   If false, uses actual data timestamps for file naming.
567    ///
568    /// # Returns
569    ///
570    /// Returns `Ok(())` on success, or an error if consolidation fails.
571    ///
572    /// # Errors
573    ///
574    /// Returns an error if:
575    /// - The directory path cannot be constructed.
576    /// - File operations fail.
577    /// - Data querying or writing fails.
578    ///
579    /// # Notes
580    ///
581    /// - Uses two-phase approach: first determines all queries, then executes them.
582    /// - Groups intervals into contiguous groups to preserve holes between groups.
583    /// - Allows consolidation across multiple files within each contiguous group.
584    /// - Skips queries if target files already exist for efficiency.
585    /// - Original files are removed immediately after querying each period.
586    /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
587    /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
588    /// - Uses modulo arithmetic for efficient period boundary calculation.
589    /// - Preserves holes in data by preventing queries from spanning across gaps.
590    /// - Automatically splits files at start/end boundaries to preserve all data.
591    /// - Split operations are executed before consolidation to ensure data preservation.
592    ///
593    /// # Examples
594    ///
595    /// ```rust,no_run
596    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
597    /// use nautilus_core::UnixNanos;
598    ///
599    /// let catalog = ParquetDataCatalog::new(/* ... */);
600    ///
601    /// // Consolidate all quote files by 1-day periods
602    /// catalog.consolidate_data_by_period(
603    ///     "quotes",
604    ///     None,
605    ///     Some(86400000000000), // 1 day in nanoseconds
606    ///     None,
607    ///     None,
608    ///     Some(true)
609    /// )?;
610    ///
611    /// // Consolidate specific instrument by 1-hour periods
612    /// catalog.consolidate_data_by_period(
613    ///     "trades",
614    ///     Some("BTCUSD".to_string()),
615    ///     Some(3600000000000), // 1 hour in nanoseconds
616    ///     Some(UnixNanos::from(1609459200000000000)),
617    ///     Some(UnixNanos::from(1609545600000000000)),
618    ///     Some(false)
619    /// )?;
620    /// # Ok::<(), anyhow::Error>(())
621    /// ```
622    pub fn consolidate_data_by_period(
623        &mut self,
624        type_name: &str,
625        identifier: Option<String>,
626        period_nanos: Option<u64>,
627        start: Option<UnixNanos>,
628        end: Option<UnixNanos>,
629        ensure_contiguous_files: Option<bool>,
630    ) -> anyhow::Result<()> {
631        // Use match statement to call the generic consolidate_data_by_period for various types
632        match type_name {
633            "quotes" => {
634                self.consolidate_data_by_period_generic::<QuoteTick>(
635                    identifier,
636                    period_nanos,
637                    start,
638                    end,
639                    ensure_contiguous_files,
640                )?;
641            }
642            "trades" => {
643                self.consolidate_data_by_period_generic::<TradeTick>(
644                    identifier,
645                    period_nanos,
646                    start,
647                    end,
648                    ensure_contiguous_files,
649                )?;
650            }
651            "order_book_deltas" => {
652                self.consolidate_data_by_period_generic::<OrderBookDelta>(
653                    identifier,
654                    period_nanos,
655                    start,
656                    end,
657                    ensure_contiguous_files,
658                )?;
659            }
660            "order_book_depths" => {
661                self.consolidate_data_by_period_generic::<OrderBookDepth10>(
662                    identifier,
663                    period_nanos,
664                    start,
665                    end,
666                    ensure_contiguous_files,
667                )?;
668            }
669            "bars" => {
670                self.consolidate_data_by_period_generic::<Bar>(
671                    identifier,
672                    period_nanos,
673                    start,
674                    end,
675                    ensure_contiguous_files,
676                )?;
677            }
678            "index_prices" => {
679                self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
680                    identifier,
681                    period_nanos,
682                    start,
683                    end,
684                    ensure_contiguous_files,
685                )?;
686            }
687            "mark_prices" => {
688                self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
689                    identifier,
690                    period_nanos,
691                    start,
692                    end,
693                    ensure_contiguous_files,
694                )?;
695            }
696            "instrument_closes" => {
697                self.consolidate_data_by_period_generic::<InstrumentClose>(
698                    identifier,
699                    period_nanos,
700                    start,
701                    end,
702                    ensure_contiguous_files,
703                )?;
704            }
705            _ => {
706                anyhow::bail!("Unknown data type for consolidation: {type_name}");
707            }
708        }
709
710        Ok(())
711    }
712
713    /// Generic consolidate data files by splitting them into fixed time periods.
714    ///
715    /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
716    /// to ensure compile-time correctness and enable reuse across different data types.
717    ///
718    /// # Type Parameters
719    ///
720    /// - `T`: The data type to consolidate, must implement required traits for serialization.
721    ///
722    /// # Parameters
723    ///
724    /// - `identifier`: Optional instrument ID to target a specific instrument's data.
725    /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
726    /// - `start`: Optional start timestamp for consolidation range.
727    /// - `end`: Optional end timestamp for consolidation range.
728    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
729    ///
730    /// # Returns
731    ///
732    /// Returns `Ok(())` on success, or an error if consolidation fails.
733    pub fn consolidate_data_by_period_generic<T>(
734        &mut self,
735        identifier: Option<String>,
736        period_nanos: Option<u64>,
737        start: Option<UnixNanos>,
738        end: Option<UnixNanos>,
739        ensure_contiguous_files: Option<bool>,
740    ) -> anyhow::Result<()>
741    where
742        T: DecodeDataFromRecordBatch
743            + CatalogPathPrefix
744            + EncodeToRecordBatch
745            + HasTsInit
746            + TryFrom<Data>
747            + Clone,
748    {
749        let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
750        let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
751
752        // Use get_intervals for cleaner implementation
753        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
754
755        if intervals.is_empty() {
756            return Ok(()); // No files to consolidate
757        }
758
759        // Use auxiliary function to prepare all queries for execution
760        let queries_to_execute = self.prepare_consolidation_queries(
761            T::path_prefix(),
762            identifier.clone(),
763            &intervals,
764            period_nanos,
765            start,
766            end,
767            ensure_contiguous_files,
768        )?;
769
770        if queries_to_execute.is_empty() {
771            return Ok(()); // No queries to execute
772        }
773
774        // Get directory for file operations
775        let directory = self.make_path(T::path_prefix(), identifier.clone())?;
776        let mut existing_files = self.list_parquet_files(&directory)?;
777        existing_files.sort();
778
779        // Track files to remove and maintain existing_files list
780        let mut files_to_remove = AHashSet::new();
781        let original_files_count = existing_files.len();
782
783        // Phase 2: Execute queries, write, and delete
784        let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
785
786        for query_info in queries_to_execute {
787            // Query data for this period using query_typed_data
788            let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
789
790            let period_data = self.query_typed_data::<T>(
791                instrument_ids,
792                Some(UnixNanos::from(query_info.query_start)),
793                Some(UnixNanos::from(query_info.query_end)),
794                None,
795                Some(existing_files.clone()),
796            )?;
797
798            if period_data.is_empty() {
799                // Skip if no data found, but maintain contiguity by using query start
800                if file_start_ns.is_none() {
801                    file_start_ns = Some(query_info.query_start);
802                }
803                continue;
804            }
805            file_start_ns = None;
806
807            // Determine final file timestamps
808            let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
809                // Use period boundaries for file naming, maintaining contiguity
810                if file_start_ns.is_none() {
811                    file_start_ns = Some(query_info.query_start);
812                }
813                (file_start_ns.unwrap(), query_info.query_end)
814            } else {
815                // Use actual data timestamps for file naming
816                let first_ts = period_data.first().unwrap().ts_init().as_u64();
817                let last_ts = period_data.last().unwrap().ts_init().as_u64();
818                (first_ts, last_ts)
819            };
820
821            // Check again if target file exists (in case it was created during this process)
822            let target_filename = format!(
823                "{}/{}",
824                directory,
825                timestamps_to_filename(
826                    UnixNanos::from(final_start_ns),
827                    UnixNanos::from(final_end_ns)
828                )
829            );
830
831            if self.file_exists(&target_filename)? {
832                // Skip if target file already exists
833                continue;
834            }
835
836            // Write consolidated data for this period using write_to_parquet
837            // Use skip_disjoint_check since we're managing file removal carefully
838            let start_ts = UnixNanos::from(final_start_ns);
839            let end_ts = UnixNanos::from(final_end_ns);
840            self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
841
842            // Identify files that are completely covered by this period
843            // Only remove files AFTER successfully writing a new file
844            // Use slice copy to avoid modification during iteration (match Python logic)
845            for file in existing_files.clone() {
846                if let Some(interval) = parse_filename_timestamps(&file)
847                    && interval.1 <= query_info.query_end
848                {
849                    files_to_remove.insert(file.clone());
850                    existing_files.retain(|f| f != &file);
851                }
852            }
853
854            // Remove files as soon as we have some to remove
855            if !files_to_remove.is_empty() {
856                for file in files_to_remove.drain() {
857                    self.delete_file(&file)?;
858                }
859            }
860        }
861
862        // Remove any remaining files that weren't removed in the loop
863        // This matches the Python implementation's final cleanup step
864        // Only remove files if any consolidation actually happened (i.e., files were processed)
865        let files_were_processed = existing_files.len() < original_files_count;
866        if files_were_processed {
867            for file in existing_files {
868                self.delete_file(&file)?;
869            }
870        }
871
872        Ok(())
873    }
874
875    /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
876    ///
877    /// This auxiliary function handles all the preparation logic for consolidation:
878    /// 1. Filters intervals by time range.
879    /// 2. Groups intervals into contiguous groups.
880    /// 3. Identifies and creates split operations for data preservation.
881    /// 4. Generates period-based consolidation queries.
882    /// 5. Checks for existing target files.
883    #[allow(clippy::too_many_arguments)]
884    pub fn prepare_consolidation_queries(
885        &self,
886        type_name: &str,
887        identifier: Option<String>,
888        intervals: &[(u64, u64)],
889        period_nanos: u64,
890        start: Option<UnixNanos>,
891        end: Option<UnixNanos>,
892        ensure_contiguous_files: bool,
893    ) -> anyhow::Result<Vec<ConsolidationQuery>> {
894        // Filter intervals by time range if specified
895        let used_start = start.map(|s| s.as_u64());
896        let used_end = end.map(|e| e.as_u64());
897
898        let mut filtered_intervals = Vec::new();
899        for &(interval_start, interval_end) in intervals {
900            // Check if interval overlaps with the specified range
901            if (used_start.is_none() || used_start.unwrap() <= interval_end)
902                && (used_end.is_none() || interval_start <= used_end.unwrap())
903            {
904                filtered_intervals.push((interval_start, interval_end));
905            }
906        }
907
908        if filtered_intervals.is_empty() {
909            return Ok(Vec::new()); // No intervals in the specified range
910        }
911
912        // Check contiguity of filtered intervals if required
913        if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
914            anyhow::bail!(
915                "Intervals are not contiguous. When ensure_contiguous_files=true, \
916                 all files in the consolidation range must have contiguous timestamps."
917            );
918        }
919
920        // Group intervals into contiguous groups to preserve holes between groups
921        // but allow consolidation within each contiguous group
922        let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
923
924        let mut queries_to_execute = Vec::new();
925
926        // Handle interval splitting by creating split operations for data preservation
927        if !filtered_intervals.is_empty() {
928            if let Some(start_ts) = used_start {
929                let first_interval = filtered_intervals[0];
930                if first_interval.0 < start_ts && start_ts <= first_interval.1 {
931                    // Split before start: preserve data from interval_start to start-1
932                    queries_to_execute.push(ConsolidationQuery {
933                        query_start: first_interval.0,
934                        query_end: start_ts - 1,
935                        use_period_boundaries: false,
936                    });
937                }
938            }
939
940            if let Some(end_ts) = used_end {
941                let last_interval = filtered_intervals[filtered_intervals.len() - 1];
942                if last_interval.0 <= end_ts && end_ts < last_interval.1 {
943                    // Split after end: preserve data from end+1 to interval_end
944                    queries_to_execute.push(ConsolidationQuery {
945                        query_start: end_ts + 1,
946                        query_end: last_interval.1,
947                        use_period_boundaries: false,
948                    });
949                }
950            }
951        }
952
953        // Generate period-based consolidation queries for each contiguous group
954        for group in contiguous_groups {
955            let group_start = group[0].0;
956            let group_end = group[group.len() - 1].1;
957
958            // Apply start/end filtering to the group
959            let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
960            let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
961
962            if effective_start > effective_end {
963                continue; // Skip if no overlap
964            }
965
966            // Generate period-based queries within this contiguous group
967            let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
968
969            // Add safety check to prevent infinite loops (match Python logic)
970            let max_iterations = 10000;
971            let mut iteration_count = 0;
972
973            while current_start_ns <= effective_end {
974                iteration_count += 1;
975                if iteration_count > max_iterations {
976                    // Safety break to prevent infinite loops
977                    break;
978                }
979                let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
980
981                // Check if target file already exists (only when ensure_contiguous_files is true)
982                if ensure_contiguous_files {
983                    let directory = self.make_path(type_name, identifier.clone())?;
984                    let target_filename = format!(
985                        "{}/{}",
986                        directory,
987                        timestamps_to_filename(
988                            UnixNanos::from(current_start_ns),
989                            UnixNanos::from(current_end_ns)
990                        )
991                    );
992
993                    if self.file_exists(&target_filename)? {
994                        // Skip if target file already exists
995                        current_start_ns += period_nanos;
996                        continue;
997                    }
998                }
999
1000                // Add query to execution list
1001                queries_to_execute.push(ConsolidationQuery {
1002                    query_start: current_start_ns,
1003                    query_end: current_end_ns,
1004                    use_period_boundaries: ensure_contiguous_files,
1005                });
1006
1007                // Move to next period
1008                current_start_ns += period_nanos;
1009
1010                if current_start_ns > effective_end {
1011                    break;
1012                }
1013            }
1014        }
1015
1016        // Sort queries by start date to enable efficient file removal
1017        // Files can be removed when interval[1] <= query_info["query_end"]
1018        // and processing in chronological order ensures optimal cleanup
1019        queries_to_execute.sort_by_key(|q| q.query_start);
1020
1021        Ok(queries_to_execute)
1022    }
1023
1024    /// Groups intervals into contiguous groups for efficient consolidation.
1025    ///
1026    /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1027    /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1028    /// before the start of the next interval. This grouping preserves data gaps while allowing
1029    /// consolidation within each contiguous group.
1030    ///
1031    /// # Parameters
1032    ///
1033    /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1034    ///
1035    /// # Returns
1036    ///
1037    /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1038    /// Returns an empty vector if the input is empty.
1039    ///
1040    /// # Algorithm
1041    ///
1042    /// 1. Starts with the first interval in a new group.
1043    /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1044    /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1045    /// 4. If not contiguous, starts a new group.
1046    /// 5. Returns all groups.
1047    ///
1048    /// # Examples
1049    ///
1050    /// ```text
1051    /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1052    /// Returns: [[(1,5), (6,10), (11,15)]]
1053    ///
1054    /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1055    /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1056    /// ```
1057    ///
1058    /// # Notes
1059    ///
1060    /// - Input intervals should be sorted by start timestamp.
1061    /// - Gaps between groups are preserved and not consolidated.
1062    /// - Used internally by period-based consolidation methods.
1063    #[must_use]
1064    pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1065        if intervals.is_empty() {
1066            return Vec::new();
1067        }
1068
1069        let mut contiguous_groups = Vec::new();
1070        let mut current_group = vec![intervals[0]];
1071
1072        for i in 1..intervals.len() {
1073            let prev_interval = intervals[i - 1];
1074            let curr_interval = intervals[i];
1075
1076            // Check if current interval is contiguous with previous (end + 1 == start)
1077            if prev_interval.1 + 1 == curr_interval.0 {
1078                current_group.push(curr_interval);
1079            } else {
1080                // Gap found, start new group
1081                contiguous_groups.push(current_group);
1082                current_group = vec![curr_interval];
1083            }
1084        }
1085
1086        // Add the last group
1087        contiguous_groups.push(current_group);
1088
1089        contiguous_groups
1090    }
1091
1092    /// Checks if a file exists in the object store.
1093    ///
1094    /// This method performs a HEAD operation on the object store to determine if a file
1095    /// exists without downloading its content. It works with both local and remote object stores.
1096    ///
1097    /// # Parameters
1098    ///
1099    /// - `path`: The file path to check, relative to the catalog structure.
1100    ///
1101    /// # Returns
1102    ///
1103    /// Returns `true` if the file exists, `false` if it doesn't exist.
1104    ///
1105    /// # Errors
1106    ///
1107    /// Returns an error if the object store operation fails due to network issues,
1108    /// authentication problems, or other I/O errors.
1109    fn file_exists(&self, path: &str) -> anyhow::Result<bool> {
1110        let object_path = self.to_object_path(path);
1111        let exists =
1112            self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1113        Ok(exists)
1114    }
1115
1116    /// Deletes a file from the object store.
1117    ///
1118    /// This method removes a file from the object store. The operation is permanent
1119    /// and cannot be undone. It works with both local filesystems and remote object stores.
1120    ///
1121    /// # Parameters
1122    ///
1123    /// - `path`: The file path to delete, relative to the catalog structure.
1124    ///
1125    /// # Returns
1126    ///
1127    /// Returns `Ok(())` on successful deletion.
1128    ///
1129    /// # Errors
1130    ///
1131    /// Returns an error if:
1132    /// - The file doesn't exist.
1133    /// - Permission is denied.
1134    /// - Network issues occur (for remote stores).
1135    /// - The object store operation fails.
1136    ///
1137    /// # Safety
1138    ///
1139    /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1140    fn delete_file(&self, path: &str) -> anyhow::Result<()> {
1141        let object_path = self.to_object_path(path);
1142        self.execute_async(async {
1143            self.object_store
1144                .delete(&object_path)
1145                .await
1146                .map_err(anyhow::Error::from)
1147        })?;
1148        Ok(())
1149    }
1150
1151    /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1152    ///
1153    /// This method scans all leaf data directories in the catalog and renames files based on
1154    /// the actual timestamp range of their content. This is useful when files have been
1155    /// modified or when filename conventions have changed.
1156    ///
1157    /// # Returns
1158    ///
1159    /// Returns `Ok(())` on success, or an error if the operation fails.
1160    ///
1161    /// # Errors
1162    ///
1163    /// Returns an error if:
1164    /// - Directory listing fails.
1165    /// - File metadata reading fails.
1166    /// - File rename operations fail.
1167    /// - Interval validation fails after renaming.
1168    ///
1169    /// # Examples
1170    ///
1171    /// ```rust,no_run
1172    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1173    ///
1174    /// let catalog = ParquetDataCatalog::new(/* ... */);
1175    ///
1176    /// // Reset all filenames in the catalog
1177    /// catalog.reset_all_file_names()?;
1178    /// # Ok::<(), anyhow::Error>(())
1179    /// ```
1180    pub fn reset_all_file_names(&self) -> anyhow::Result<()> {
1181        let leaf_directories = self.find_leaf_data_directories()?;
1182
1183        for directory in leaf_directories {
1184            self.reset_file_names(&directory)?;
1185        }
1186
1187        Ok(())
1188    }
1189
1190    /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1191    ///
1192    /// This method renames files in a specific directory based on the actual timestamp
1193    /// range of their content. This is useful for correcting filenames after data
1194    /// modifications or when filename conventions have changed.
1195    ///
1196    /// # Parameters
1197    ///
1198    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1199    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1200    ///
1201    /// # Returns
1202    ///
1203    /// Returns `Ok(())` on success, or an error if the operation fails.
1204    ///
1205    /// # Errors
1206    ///
1207    /// Returns an error if:
1208    /// - The directory path cannot be constructed.
1209    /// - File metadata reading fails.
1210    /// - File rename operations fail.
1211    /// - Interval validation fails after renaming.
1212    ///
1213    /// # Examples
1214    ///
1215    /// ```rust,no_run
1216    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1217    ///
1218    /// let catalog = ParquetDataCatalog::new(/* ... */);
1219    ///
1220    /// // Reset filenames for all quote files
1221    /// catalog.reset_data_file_names("quotes", None)?;
1222    ///
1223    /// // Reset filenames for a specific instrument's trade files
1224    /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1225    /// # Ok::<(), anyhow::Error>(())
1226    /// ```
1227    pub fn reset_data_file_names(
1228        &self,
1229        data_cls: &str,
1230        instrument_id: Option<String>,
1231    ) -> anyhow::Result<()> {
1232        let directory = self.make_path(data_cls, instrument_id)?;
1233        self.reset_file_names(&directory)
1234    }
1235
1236    /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1237    ///
1238    /// This internal method scans all Parquet files in a directory, reads their metadata to
1239    /// determine the actual timestamp range of their content, and renames the files accordingly.
1240    /// This ensures that filenames accurately reflect the data they contain.
1241    ///
1242    /// # Parameters
1243    ///
1244    /// - `directory`: The directory path containing Parquet files to rename.
1245    ///
1246    /// # Returns
1247    ///
1248    /// Returns `Ok(())` on success, or an error if the operation fails.
1249    ///
1250    /// # Process
1251    ///
1252    /// 1. Lists all Parquet files in the directory
1253    /// 2. For each file, reads metadata to extract min/max timestamps
1254    /// 3. Generates a new filename based on actual timestamp range
1255    /// 4. Moves the file to the new name using object store operations
1256    /// 5. Validates that intervals remain disjoint after renaming
1257    ///
1258    /// # Errors
1259    ///
1260    /// Returns an error if:
1261    /// - Directory listing fails.
1262    /// - Metadata reading fails for any file.
1263    /// - File move operations fail.
1264    /// - Interval validation fails after renaming.
1265    /// - Object store operations fail.
1266    ///
1267    /// # Notes
1268    ///
1269    /// - This operation can be time-consuming for directories with many files.
1270    /// - Files are processed sequentially to avoid conflicts.
1271    /// - The operation is atomic per file but not across the entire directory.
1272    fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1273        let parquet_files = self.list_parquet_files(directory)?;
1274
1275        for file in parquet_files {
1276            let object_path = ObjectPath::from(file.as_str());
1277            let (first_ts, last_ts) = self.execute_async(async {
1278                min_max_from_parquet_metadata_object_store(
1279                    self.object_store.clone(),
1280                    &object_path,
1281                    "ts_init",
1282                )
1283                .await
1284            })?;
1285
1286            let new_filename =
1287                timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1288            let new_file_path = make_object_store_path(directory, &[&new_filename]);
1289            let new_object_path = ObjectPath::from(new_file_path);
1290
1291            self.move_file(&object_path, &new_object_path)?;
1292        }
1293
1294        let intervals = self.get_directory_intervals(directory)?;
1295
1296        if !are_intervals_disjoint(&intervals) {
1297            anyhow::bail!("Intervals are not disjoint after resetting file names");
1298        }
1299
1300        Ok(())
1301    }
1302
1303    /// Finds all leaf data directories in the catalog.
1304    ///
1305    /// A leaf directory is one that contains data files but no subdirectories.
1306    /// This method is used to identify directories that can be processed for
1307    /// consolidation or other operations.
1308    ///
1309    /// # Returns
1310    ///
1311    /// Returns a vector of directory path strings representing leaf directories,
1312    /// or an error if directory traversal fails.
1313    ///
1314    /// # Errors
1315    ///
1316    /// Returns an error if:
1317    /// - Object store listing operations fail.
1318    /// - Directory structure cannot be analyzed.
1319    ///
1320    /// # Examples
1321    ///
1322    /// ```rust,no_run
1323    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1324    ///
1325    /// let catalog = ParquetDataCatalog::new(/* ... */);
1326    ///
1327    /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1328    /// for dir in leaf_dirs {
1329    ///     println!("Found leaf directory: {}", dir);
1330    /// }
1331    /// # Ok::<(), anyhow::Error>(())
1332    /// ```
1333    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1334        let data_dir = make_object_store_path(&self.base_path, &["data"]);
1335
1336        let leaf_dirs = self.execute_async(async {
1337            let mut all_paths = AHashSet::new();
1338            let mut directories = AHashSet::new();
1339            let mut files_in_dirs = AHashMap::new();
1340
1341            // List all objects under the data directory
1342            let prefix = ObjectPath::from(format!("{data_dir}/"));
1343            let mut stream = self.object_store.list(Some(&prefix));
1344
1345            while let Some(object) = stream.next().await {
1346                let object = object?;
1347                let path_str = object.location.to_string();
1348                all_paths.insert(path_str.clone());
1349
1350                // Extract directory path
1351                if let Some(parent) = std::path::Path::new(&path_str).parent() {
1352                    let parent_str = parent.to_string_lossy().to_string();
1353                    directories.insert(parent_str.clone());
1354
1355                    // Track files in each directory
1356                    files_in_dirs
1357                        .entry(parent_str)
1358                        .or_insert_with(Vec::new)
1359                        .push(path_str);
1360                }
1361            }
1362
1363            // Find leaf directories (directories with files but no subdirectories)
1364            let mut leaf_dirs = Vec::new();
1365            for dir in &directories {
1366                let has_files = files_in_dirs
1367                    .get(dir)
1368                    .is_some_and(|files| !files.is_empty());
1369                let has_subdirs = directories
1370                    .iter()
1371                    .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1372
1373                if has_files && !has_subdirs {
1374                    leaf_dirs.push(dir.clone());
1375                }
1376            }
1377
1378            Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1379        })?;
1380
1381        Ok(leaf_dirs)
1382    }
1383
1384    /// Deletes data within a specified time range for a specific data type and instrument.
1385    ///
1386    /// This method identifies all parquet files that intersect with the specified time range
1387    /// and handles them appropriately:
1388    /// - Files completely within the range are deleted
1389    /// - Files partially overlapping the range are split to preserve data outside the range
1390    /// - The original intersecting files are removed after processing
1391    ///
1392    /// # Parameters
1393    ///
1394    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1395    /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1396    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1397    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1398    ///
1399    /// # Returns
1400    ///
1401    /// Returns `Ok(())` on success, or an error if deletion fails.
1402    ///
1403    /// # Errors
1404    ///
1405    /// Returns an error if:
1406    /// - The directory path cannot be constructed.
1407    /// - File operations fail.
1408    /// - Data querying or writing fails.
1409    ///
1410    /// # Notes
1411    ///
1412    /// - This operation permanently removes data and cannot be undone.
1413    /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1414    /// - The method ensures data integrity by using atomic operations where possible.
1415    /// - Empty directories are not automatically removed after deletion.
1416    ///
1417    /// # Examples
1418    ///
1419    /// ```rust,no_run
1420    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1421    /// use nautilus_core::UnixNanos;
1422    ///
1423    /// let catalog = ParquetDataCatalog::new(/* ... */);
1424    ///
1425    /// // Delete all quote data for a specific instrument
1426    /// catalog.delete_data_range(
1427    ///     "quotes",
1428    ///     Some("BTCUSD".to_string()),
1429    ///     None,
1430    ///     None
1431    /// )?;
1432    ///
1433    /// // Delete trade data within a specific time range
1434    /// catalog.delete_data_range(
1435    ///     "trades",
1436    ///     None,
1437    ///     Some(UnixNanos::from(1609459200000000000)),
1438    ///     Some(UnixNanos::from(1609545600000000000))
1439    /// )?;
1440    /// # Ok::<(), anyhow::Error>(())
1441    /// ```
1442    pub fn delete_data_range(
1443        &mut self,
1444        type_name: &str,
1445        identifier: Option<String>,
1446        start: Option<UnixNanos>,
1447        end: Option<UnixNanos>,
1448    ) -> anyhow::Result<()> {
1449        // Use match statement to call the generic delete_data_range for various types
1450        match type_name {
1451            "quotes" => self.delete_data_range_generic::<QuoteTick>(identifier, start, end),
1452            "trades" => self.delete_data_range_generic::<TradeTick>(identifier, start, end),
1453            "bars" => self.delete_data_range_generic::<Bar>(identifier, start, end),
1454            "order_book_deltas" => {
1455                self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1456            }
1457            "order_book_depth10" => {
1458                self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1459            }
1460            _ => anyhow::bail!("Unsupported data type: {type_name}"),
1461        }
1462    }
1463
1464    /// Deletes data within a specified time range across the entire catalog.
1465    ///
1466    /// This method identifies all leaf directories in the catalog that contain parquet files
1467    /// and deletes data within the specified time range from each directory. A leaf directory
1468    /// is one that contains files but no subdirectories. This is a convenience method that
1469    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1470    ///
1471    /// # Parameters
1472    ///
1473    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1474    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1475    ///
1476    /// # Returns
1477    ///
1478    /// Returns `Ok(())` on success, or an error if deletion fails.
1479    ///
1480    /// # Errors
1481    ///
1482    /// Returns an error if:
1483    /// - Directory traversal fails.
1484    /// - Data class extraction from paths fails.
1485    /// - Individual delete operations fail.
1486    ///
1487    /// # Notes
1488    ///
1489    /// - This operation permanently removes data and cannot be undone.
1490    /// - The deletion process handles file intersections intelligently by splitting files
1491    ///   when they partially overlap with the deletion range.
1492    /// - Files completely within the deletion range are removed entirely.
1493    /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1494    /// - This method is useful for bulk data cleanup operations across the entire catalog.
1495    /// - Empty directories are not automatically removed after deletion.
1496    ///
1497    /// # Examples
1498    ///
1499    /// ```rust,no_run
1500    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1501    /// use nautilus_core::UnixNanos;
1502    ///
1503    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1504    ///
1505    /// // Delete all data before a specific date across entire catalog
1506    /// catalog.delete_catalog_range(
1507    ///     None,
1508    ///     Some(UnixNanos::from(1609459200000000000))
1509    /// )?;
1510    ///
1511    /// // Delete all data within a specific range across entire catalog
1512    /// catalog.delete_catalog_range(
1513    ///     Some(UnixNanos::from(1609459200000000000)),
1514    ///     Some(UnixNanos::from(1609545600000000000))
1515    /// )?;
1516    ///
1517    /// // Delete all data after a specific date across entire catalog
1518    /// catalog.delete_catalog_range(
1519    ///     Some(UnixNanos::from(1609459200000000000)),
1520    ///     None
1521    /// )?;
1522    /// # Ok::<(), anyhow::Error>(())
1523    /// ```
1524    pub fn delete_catalog_range(
1525        &mut self,
1526        start: Option<UnixNanos>,
1527        end: Option<UnixNanos>,
1528    ) -> anyhow::Result<()> {
1529        let leaf_directories = self.find_leaf_data_directories()?;
1530
1531        for directory in leaf_directories {
1532            if let Ok((Some(data_type), identifier)) =
1533                self.extract_data_cls_and_identifier_from_path(&directory)
1534            {
1535                // Call the existing delete_data_range method
1536                if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1537                    eprintln!("Failed to delete data in directory {directory}: {e}");
1538                    // Continue with other directories instead of failing completely
1539                }
1540            }
1541        }
1542
1543        Ok(())
1544    }
1545
1546    /// Generic implementation for deleting data within a specified time range.
1547    ///
1548    /// This method provides the core deletion logic that works with any data type
1549    /// that implements the required traits. It handles file intersection analysis,
1550    /// data splitting for partial overlaps, and file cleanup.
1551    ///
1552    /// # Type Parameters
1553    ///
1554    /// - `T`: The data type that implements required traits for catalog operations.
1555    ///
1556    /// # Parameters
1557    ///
1558    /// - `identifier`: Optional instrument ID to delete data for.
1559    /// - `start`: Optional start timestamp for the deletion range.
1560    /// - `end`: Optional end timestamp for the deletion range.
1561    ///
1562    /// # Returns
1563    ///
1564    /// Returns `Ok(())` on success, or an error if deletion fails.
1565    pub fn delete_data_range_generic<T>(
1566        &mut self,
1567        identifier: Option<String>,
1568        start: Option<UnixNanos>,
1569        end: Option<UnixNanos>,
1570    ) -> anyhow::Result<()>
1571    where
1572        T: DecodeDataFromRecordBatch
1573            + CatalogPathPrefix
1574            + EncodeToRecordBatch
1575            + HasTsInit
1576            + TryFrom<Data>
1577            + Clone,
1578    {
1579        // Get intervals for cleaner implementation
1580        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1581
1582        if intervals.is_empty() {
1583            return Ok(()); // No files to process
1584        }
1585
1586        // Prepare all operations for execution
1587        let operations_to_execute = self.prepare_delete_operations(
1588            T::path_prefix(),
1589            identifier.clone(),
1590            &intervals,
1591            start,
1592            end,
1593        )?;
1594
1595        if operations_to_execute.is_empty() {
1596            return Ok(()); // No operations to execute
1597        }
1598
1599        // Execute all operations
1600        let mut files_to_remove = AHashSet::<String>::new();
1601
1602        for operation in operations_to_execute {
1603            // Reset the session before each operation to ensure fresh data is loaded
1604            // This clears any cached table registrations that might interfere with file operations
1605            self.reset_session();
1606            match operation.operation_type.as_str() {
1607                "split_before" => {
1608                    // Query data before the deletion range and write it
1609                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1610                    let before_data = self.query_typed_data::<T>(
1611                        instrument_ids,
1612                        Some(UnixNanos::from(operation.query_start)),
1613                        Some(UnixNanos::from(operation.query_end)),
1614                        None,
1615                        Some(operation.files.clone()),
1616                    )?;
1617
1618                    if !before_data.is_empty() {
1619                        let start_ts = UnixNanos::from(operation.file_start_ns);
1620                        let end_ts = UnixNanos::from(operation.file_end_ns);
1621                        self.write_to_parquet(
1622                            before_data,
1623                            Some(start_ts),
1624                            Some(end_ts),
1625                            Some(true),
1626                        )?;
1627                    }
1628                }
1629                "split_after" => {
1630                    // Query data after the deletion range and write it
1631                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1632                    let after_data = self.query_typed_data::<T>(
1633                        instrument_ids,
1634                        Some(UnixNanos::from(operation.query_start)),
1635                        Some(UnixNanos::from(operation.query_end)),
1636                        None,
1637                        Some(operation.files.clone()),
1638                    )?;
1639
1640                    if !after_data.is_empty() {
1641                        let start_ts = UnixNanos::from(operation.file_start_ns);
1642                        let end_ts = UnixNanos::from(operation.file_end_ns);
1643                        self.write_to_parquet(
1644                            after_data,
1645                            Some(start_ts),
1646                            Some(end_ts),
1647                            Some(true),
1648                        )?;
1649                    }
1650                }
1651                _ => {
1652                    // For "remove" operations, just mark files for removal
1653                }
1654            }
1655
1656            // Mark files for removal (applies to all operation types)
1657            for file in operation.files {
1658                files_to_remove.insert(file);
1659            }
1660        }
1661
1662        // Remove all files that were processed
1663        for file in files_to_remove {
1664            if let Err(e) = self.delete_file(&file) {
1665                eprintln!("Failed to delete file {file}: {e}");
1666            }
1667        }
1668
1669        Ok(())
1670    }
1671
1672    /// Prepares all operations for data deletion by identifying files that need to be
1673    /// split or removed.
1674    ///
1675    /// This auxiliary function handles all the preparation logic for deletion:
1676    /// 1. Filters intervals by time range
1677    /// 2. Identifies files that intersect with the deletion range
1678    /// 3. Creates split operations for files that partially overlap
1679    /// 4. Generates removal operations for files completely within the range
1680    ///
1681    /// # Parameters
1682    ///
1683    /// - `type_name`: The data type directory name for path generation.
1684    /// - `identifier`: Optional instrument identifier for path generation.
1685    /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1686    /// - `start`: Optional start timestamp for deletion range.
1687    /// - `end`: Optional end timestamp for deletion range.
1688    ///
1689    /// # Returns
1690    ///
1691    /// Returns a vector of `DeleteOperation` structs ready for execution.
1692    pub fn prepare_delete_operations(
1693        &self,
1694        type_name: &str,
1695        identifier: Option<String>,
1696        intervals: &[(u64, u64)],
1697        start: Option<UnixNanos>,
1698        end: Option<UnixNanos>,
1699    ) -> anyhow::Result<Vec<DeleteOperation>> {
1700        // Convert start/end to nanoseconds
1701        let delete_start_ns = start.map(|s| s.as_u64());
1702        let delete_end_ns = end.map(|e| e.as_u64());
1703
1704        let mut operations = Vec::new();
1705
1706        // Get directory for file path construction
1707        let directory = self.make_path(type_name, identifier)?;
1708
1709        // Process each interval (which represents an actual file)
1710        for &(file_start_ns, file_end_ns) in intervals {
1711            // Check if file intersects with deletion range
1712            let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1713                && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1714
1715            if !intersects {
1716                continue; // File doesn't intersect with deletion range
1717            }
1718
1719            // Construct file path from interval timestamps
1720            let filename = timestamps_to_filename(
1721                UnixNanos::from(file_start_ns),
1722                UnixNanos::from(file_end_ns),
1723            );
1724            let file_path = make_object_store_path(&directory, &[&filename]);
1725
1726            // Determine what type of operation is needed
1727            let file_completely_within_range = (delete_start_ns.is_none()
1728                || delete_start_ns.unwrap() <= file_start_ns)
1729                && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1730
1731            if file_completely_within_range {
1732                // File is completely within deletion range - just mark for removal
1733                operations.push(DeleteOperation {
1734                    operation_type: "remove".to_string(),
1735                    files: vec![file_path],
1736                    query_start: 0,
1737                    query_end: 0,
1738                    file_start_ns: 0,
1739                    file_end_ns: 0,
1740                });
1741            } else {
1742                // File partially overlaps - need to split
1743                if let Some(delete_start) = delete_start_ns
1744                    && file_start_ns < delete_start
1745                {
1746                    // Keep data before deletion range
1747                    operations.push(DeleteOperation {
1748                        operation_type: "split_before".to_string(),
1749                        files: vec![file_path.clone()],
1750                        query_start: file_start_ns,
1751                        query_end: delete_start.saturating_sub(1), // Exclusive end
1752                        file_start_ns,
1753                        file_end_ns: delete_start.saturating_sub(1),
1754                    });
1755                }
1756
1757                if let Some(delete_end) = delete_end_ns
1758                    && delete_end < file_end_ns
1759                {
1760                    // Keep data after deletion range
1761                    operations.push(DeleteOperation {
1762                        operation_type: "split_after".to_string(),
1763                        files: vec![file_path.clone()],
1764                        query_start: delete_end.saturating_add(1), // Exclusive start
1765                        query_end: file_end_ns,
1766                        file_start_ns: delete_end.saturating_add(1),
1767                        file_end_ns,
1768                    });
1769                }
1770            }
1771        }
1772
1773        Ok(operations)
1774    }
1775}