nautilus_persistence/backend/
catalog_operations.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use std::collections::HashSet;
22
23use futures::StreamExt;
24use nautilus_core::UnixNanos;
25use nautilus_model::data::{
26    Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
27    QuoteTick, TradeTick, close::InstrumentClose,
28};
29use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
30use object_store::path::Path as ObjectPath;
31
32use crate::{
33    backend::catalog::{
34        CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
35        extract_path_components, make_object_store_path, parse_filename_timestamps,
36        timestamps_to_filename,
37    },
38    parquet::{
39        combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
40    },
41};
42
43/// Information about a consolidation query to be executed.
44///
45/// This struct encapsulates all the information needed to execute a single consolidation
46/// operation, including the data range to query and file naming strategy.
47///
48/// # Fields
49///
50/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
51/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
52/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
53///
54/// # Usage
55///
56/// This struct is used internally by the consolidation system to plan and execute
57/// data consolidation operations. It allows the system to:
58/// - Separate query planning from execution.
59/// - Handle complex scenarios like data splitting.
60/// - Optimize file naming strategies.
61/// - Batch multiple operations efficiently.
62/// - Maintain file contiguity across periods.
63///
64/// # Examples
65///
66/// ```rust,no_run
67/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
68///
69/// // Regular consolidation query
70/// let query = ConsolidationQuery {
71///     query_start: 1609459200000000000,
72///     query_end: 1609545600000000000,
73///     use_period_boundaries: true,
74/// };
75///
76/// // Split operation to preserve data
77/// let split_query = ConsolidationQuery {
78///     query_start: 1609459200000000000,
79///     query_end: 1609462800000000000,
80///     use_period_boundaries: false,
81/// };
82/// ```
83#[derive(Debug, Clone)]
84pub struct ConsolidationQuery {
85    /// Start timestamp for the query range (inclusive, in nanoseconds)
86    pub query_start: u64,
87    /// End timestamp for the query range (inclusive, in nanoseconds)
88    pub query_end: u64,
89    /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
90    pub use_period_boundaries: bool,
91}
92
93/// Information about a deletion operation to be executed.
94///
95/// This struct encapsulates all the information needed to execute a single deletion
96/// operation, including the type of operation and file handling details.
97#[derive(Debug, Clone)]
98pub struct DeleteOperation {
99    /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
100    pub operation_type: String,
101    /// List of files involved in this operation.
102    pub files: Vec<String>,
103    /// Start timestamp for data query (used for split operations).
104    pub query_start: u64,
105    /// End timestamp for data query (used for split operations).
106    pub query_end: u64,
107    /// Start timestamp for new file naming (used for split operations).
108    pub file_start_ns: u64,
109    /// End timestamp for new file naming (used for split operations).
110    pub file_end_ns: u64,
111}
112
113impl ParquetDataCatalog {
114    /// Consolidates all data files in the catalog.
115    ///
116    /// This method identifies all leaf directories in the catalog that contain parquet files
117    /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
118    /// This is a convenience method that effectively calls `consolidate_data` for all data types
119    /// and instrument IDs in the catalog.
120    ///
121    /// # Parameters
122    ///
123    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
124    ///   greater than or equal to this value will be consolidated. If None, all files
125    ///   from the beginning of time will be considered.
126    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
127    ///   less than or equal to this value will be consolidated. If None, all files
128    ///   up to the end of time will be considered.
129    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
130    ///
131    /// # Returns
132    ///
133    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if:
138    /// - Directory listing fails.
139    /// - File consolidation operations fail.
140    /// - Interval validation fails (when `ensure_contiguous_files` is true).
141    ///
142    /// # Examples
143    ///
144    /// ```rust,no_run
145    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
146    /// use nautilus_core::UnixNanos;
147    ///
148    /// let catalog = ParquetDataCatalog::new(/* ... */);
149    ///
150    /// // Consolidate all files in the catalog
151    /// catalog.consolidate_catalog(None, None, None)?;
152    ///
153    /// // Consolidate only files within a specific time range
154    /// catalog.consolidate_catalog(
155    ///     Some(UnixNanos::from(1609459200000000000)),
156    ///     Some(UnixNanos::from(1609545600000000000)),
157    ///     Some(true)
158    /// )?;
159    /// # Ok::<(), anyhow::Error>(())
160    /// ```
161    pub fn consolidate_catalog(
162        &self,
163        start: Option<UnixNanos>,
164        end: Option<UnixNanos>,
165        ensure_contiguous_files: Option<bool>,
166    ) -> anyhow::Result<()> {
167        let leaf_directories = self.find_leaf_data_directories()?;
168
169        for directory in leaf_directories {
170            self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
171        }
172
173        Ok(())
174    }
175
176    /// Consolidates data files for a specific data type and instrument.
177    ///
178    /// This method consolidates Parquet files within a specific directory (defined by data type
179    /// and optional instrument ID) by merging multiple files into a single file. This improves
180    /// query performance and can reduce storage overhead.
181    ///
182    /// # Parameters
183    ///
184    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
185    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
186    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
187    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
188    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
189    ///
190    /// # Returns
191    ///
192    /// Returns `Ok(())` on success, or an error if consolidation fails.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if:
197    /// - The directory path cannot be constructed.
198    /// - File consolidation operations fail.
199    /// - Interval validation fails (when `ensure_contiguous_files` is true).
200    ///
201    /// # Examples
202    ///
203    /// ```rust,no_run
204    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
205    /// use nautilus_core::UnixNanos;
206    ///
207    /// let catalog = ParquetDataCatalog::new(/* ... */);
208    ///
209    /// // Consolidate all quote files for a specific instrument
210    /// catalog.consolidate_data(
211    ///     "quotes",
212    ///     Some("BTCUSD".to_string()),
213    ///     None,
214    ///     None,
215    ///     None
216    /// )?;
217    ///
218    /// // Consolidate trade files within a time range
219    /// catalog.consolidate_data(
220    ///     "trades",
221    ///     None,
222    ///     Some(UnixNanos::from(1609459200000000000)),
223    ///     Some(UnixNanos::from(1609545600000000000)),
224    ///     Some(true)
225    /// )?;
226    /// # Ok::<(), anyhow::Error>(())
227    /// ```
228    pub fn consolidate_data(
229        &self,
230        type_name: &str,
231        instrument_id: Option<String>,
232        start: Option<UnixNanos>,
233        end: Option<UnixNanos>,
234        ensure_contiguous_files: Option<bool>,
235    ) -> anyhow::Result<()> {
236        let directory = self.make_path(type_name, instrument_id)?;
237        self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
238    }
239
240    /// Consolidates Parquet files within a specific directory by merging them into a single file.
241    ///
242    /// This internal method performs the actual consolidation work for a single directory.
243    /// It identifies files within the specified time range, validates their intervals,
244    /// and combines them into a single Parquet file with optimized storage.
245    ///
246    /// # Parameters
247    ///
248    /// - `directory`: The directory path containing Parquet files to consolidate.
249    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
250    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
251    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
252    ///
253    /// # Returns
254    ///
255    /// Returns `Ok(())` on success, or an error if consolidation fails.
256    ///
257    /// # Behavior
258    ///
259    /// - Skips consolidation if directory contains 1 or fewer files.
260    /// - Filters files by timestamp range if start/end are specified.
261    /// - Sorts intervals by start timestamp before consolidation.
262    /// - Creates a new file spanning the entire time range of input files.
263    /// - Validates interval disjointness after consolidation (if enabled).
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if:
268    /// - Directory listing fails.
269    /// - File combination operations fail.
270    /// - Interval validation fails (when `ensure_contiguous_files` is true).
271    /// - Object store operations fail.
272    fn consolidate_directory(
273        &self,
274        directory: &str,
275        start: Option<UnixNanos>,
276        end: Option<UnixNanos>,
277        ensure_contiguous_files: Option<bool>,
278    ) -> anyhow::Result<()> {
279        let parquet_files = self.list_parquet_files(directory)?;
280
281        if parquet_files.len() <= 1 {
282            return Ok(());
283        }
284
285        let mut files_to_consolidate = Vec::new();
286        let mut intervals = Vec::new();
287        let start = start.map(|t| t.as_u64());
288        let end = end.map(|t| t.as_u64());
289
290        for file in parquet_files {
291            if let Some(interval) = parse_filename_timestamps(&file) {
292                let (interval_start, interval_end) = interval;
293                let include_file = match (start, end) {
294                    (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
295                    (Some(s), None) => interval_start >= s,
296                    (None, Some(e)) => interval_end <= e,
297                    (None, None) => true,
298                };
299
300                if include_file {
301                    files_to_consolidate.push(file);
302                    intervals.push(interval);
303                }
304            }
305        }
306
307        intervals.sort_by_key(|&(start, _)| start);
308
309        if !intervals.is_empty() {
310            let file_name = timestamps_to_filename(
311                UnixNanos::from(intervals[0].0),
312                UnixNanos::from(intervals.last().unwrap().1),
313            );
314            let path = make_object_store_path(directory, &[&file_name]);
315
316            // Convert string paths to ObjectPath for the function call
317            let object_paths: Vec<ObjectPath> = files_to_consolidate
318                .iter()
319                .map(|path| ObjectPath::from(path.as_str()))
320                .collect();
321
322            self.execute_async(async {
323                combine_parquet_files_from_object_store(
324                    self.object_store.clone(),
325                    object_paths,
326                    &ObjectPath::from(path),
327                    Some(self.compression),
328                    Some(self.max_row_group_size),
329                )
330                .await
331            })?;
332        }
333
334        if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
335            anyhow::bail!("Intervals are not disjoint after consolidating a directory");
336        }
337
338        Ok(())
339    }
340
341    /// Consolidates all data files in the catalog by splitting them into fixed time periods.
342    ///
343    /// This method identifies all leaf directories in the catalog that contain parquet files
344    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
345    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
346    /// and instrument IDs in the catalog.
347    ///
348    /// # Parameters
349    ///
350    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
351    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
352    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
353    ///   greater than or equal to this value will be consolidated. If None, all files
354    ///   from the beginning of time will be considered.
355    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
356    ///   less than or equal to this value will be consolidated. If None, all files
357    ///   up to the end of time will be considered.
358    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
359    ///   If false, uses actual data timestamps for file naming.
360    ///
361    /// # Returns
362    ///
363    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if:
368    /// - Directory listing fails.
369    /// - Data type extraction from path fails.
370    /// - Period-based consolidation operations fail.
371    ///
372    /// # Notes
373    ///
374    /// - This operation can be resource-intensive for large catalogs with many data types.
375    ///   and instruments.
376    /// - The consolidation process splits data into fixed time periods rather than combining.
377    ///   all files into a single file per directory.
378    /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
379    /// - Original files are removed and replaced with period-based consolidated files.
380    /// - This method is useful for periodic maintenance of the catalog to standardize.
381    ///   file organization by time periods.
382    ///
383    /// # Examples
384    ///
385    /// ```rust,no_run
386    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
387    /// use nautilus_core::UnixNanos;
388    ///
389    /// let catalog = ParquetDataCatalog::new(/* ... */);
390    ///
391    /// // Consolidate all files in the catalog by 1-day periods
392    /// catalog.consolidate_catalog_by_period(
393    ///     Some(86400000000000), // 1 day in nanoseconds
394    ///     None,
395    ///     None,
396    ///     Some(true)
397    /// )?;
398    ///
399    /// // Consolidate only files within a specific time range by 1-hour periods
400    /// catalog.consolidate_catalog_by_period(
401    ///     Some(3600000000000), // 1 hour in nanoseconds
402    ///     Some(UnixNanos::from(1609459200000000000)),
403    ///     Some(UnixNanos::from(1609545600000000000)),
404    ///     Some(false)
405    /// )?;
406    /// # Ok::<(), anyhow::Error>(())
407    /// ```
408    pub fn consolidate_catalog_by_period(
409        &mut self,
410        period_nanos: Option<u64>,
411        start: Option<UnixNanos>,
412        end: Option<UnixNanos>,
413        ensure_contiguous_files: Option<bool>,
414    ) -> anyhow::Result<()> {
415        let leaf_directories = self.find_leaf_data_directories()?;
416
417        for directory in leaf_directories {
418            let (data_cls, identifier) =
419                self.extract_data_cls_and_identifier_from_path(&directory)?;
420
421            if let Some(data_cls_name) = data_cls {
422                // Use match statement to call the generic consolidate_data_by_period for various types
423                match data_cls_name.as_str() {
424                    "quotes" => {
425                        self.consolidate_data_by_period_generic::<QuoteTick>(
426                            identifier,
427                            period_nanos,
428                            start,
429                            end,
430                            ensure_contiguous_files,
431                        )?;
432                    }
433                    "trades" => {
434                        self.consolidate_data_by_period_generic::<TradeTick>(
435                            identifier,
436                            period_nanos,
437                            start,
438                            end,
439                            ensure_contiguous_files,
440                        )?;
441                    }
442                    "order_book_deltas" => {
443                        self.consolidate_data_by_period_generic::<OrderBookDelta>(
444                            identifier,
445                            period_nanos,
446                            start,
447                            end,
448                            ensure_contiguous_files,
449                        )?;
450                    }
451                    "order_book_depths" => {
452                        self.consolidate_data_by_period_generic::<OrderBookDepth10>(
453                            identifier,
454                            period_nanos,
455                            start,
456                            end,
457                            ensure_contiguous_files,
458                        )?;
459                    }
460                    "bars" => {
461                        self.consolidate_data_by_period_generic::<Bar>(
462                            identifier,
463                            period_nanos,
464                            start,
465                            end,
466                            ensure_contiguous_files,
467                        )?;
468                    }
469                    "index_prices" => {
470                        self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
471                            identifier,
472                            period_nanos,
473                            start,
474                            end,
475                            ensure_contiguous_files,
476                        )?;
477                    }
478                    "mark_prices" => {
479                        self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
480                            identifier,
481                            period_nanos,
482                            start,
483                            end,
484                            ensure_contiguous_files,
485                        )?;
486                    }
487                    "instrument_closes" => {
488                        self.consolidate_data_by_period_generic::<InstrumentClose>(
489                            identifier,
490                            period_nanos,
491                            start,
492                            end,
493                            ensure_contiguous_files,
494                        )?;
495                    }
496                    _ => {
497                        // Skip unknown data types
498                        log::warn!("Unknown data type for consolidation: {data_cls_name}");
499                        continue;
500                    }
501                }
502            }
503        }
504
505        Ok(())
506    }
507
508    /// Extracts data class and identifier from a directory path.
509    ///
510    /// This method parses a directory path to extract the data type and optional
511    /// instrument identifier. It's used to determine what type of data consolidation
512    /// to perform for each directory.
513    ///
514    /// # Parameters
515    ///
516    /// - `path`: The directory path to parse.
517    ///
518    /// # Returns
519    ///
520    /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
521    pub fn extract_data_cls_and_identifier_from_path(
522        &self,
523        path: &str,
524    ) -> anyhow::Result<(Option<String>, Option<String>)> {
525        // Use cross-platform path parsing
526        let path_components = extract_path_components(path);
527
528        // Find the "data" directory in the path
529        if let Some(data_index) = path_components.iter().position(|part| part == "data")
530            && data_index + 1 < path_components.len()
531        {
532            let data_cls = path_components[data_index + 1].clone();
533
534            // Check if there's an identifier (instrument ID) after the data class
535            let identifier = if data_index + 2 < path_components.len() {
536                Some(path_components[data_index + 2].clone())
537            } else {
538                None
539            };
540
541            return Ok((Some(data_cls), identifier));
542        }
543
544        // If we can't parse the path, return None for both
545        Ok((None, None))
546    }
547
548    /// Consolidates data files by splitting them into fixed time periods.
549    ///
550    /// This method queries data by period and writes consolidated files immediately,
551    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
552    /// the function automatically splits those files to preserve all data.
553    ///
554    /// # Parameters
555    ///
556    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
557    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
558    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
559    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
560    /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
561    ///   If specified and intersects existing files, those files will be split to preserve
562    ///   data outside the consolidation range.
563    /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
564    ///   If specified and intersects existing files, those files will be split to preserve
565    ///   data outside the consolidation range.
566    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
567    ///   If false, uses actual data timestamps for file naming.
568    ///
569    /// # Returns
570    ///
571    /// Returns `Ok(())` on success, or an error if consolidation fails.
572    ///
573    /// # Errors
574    ///
575    /// Returns an error if:
576    /// - The directory path cannot be constructed.
577    /// - File operations fail.
578    /// - Data querying or writing fails.
579    ///
580    /// # Notes
581    ///
582    /// - Uses two-phase approach: first determines all queries, then executes them.
583    /// - Groups intervals into contiguous groups to preserve holes between groups.
584    /// - Allows consolidation across multiple files within each contiguous group.
585    /// - Skips queries if target files already exist for efficiency.
586    /// - Original files are removed immediately after querying each period.
587    /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
588    /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
589    /// - Uses modulo arithmetic for efficient period boundary calculation.
590    /// - Preserves holes in data by preventing queries from spanning across gaps.
591    /// - Automatically splits files at start/end boundaries to preserve all data.
592    /// - Split operations are executed before consolidation to ensure data preservation.
593    ///
594    /// # Examples
595    ///
596    /// ```rust,no_run
597    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
598    /// use nautilus_core::UnixNanos;
599    ///
600    /// let catalog = ParquetDataCatalog::new(/* ... */);
601    ///
602    /// // Consolidate all quote files by 1-day periods
603    /// catalog.consolidate_data_by_period(
604    ///     "quotes",
605    ///     None,
606    ///     Some(86400000000000), // 1 day in nanoseconds
607    ///     None,
608    ///     None,
609    ///     Some(true)
610    /// )?;
611    ///
612    /// // Consolidate specific instrument by 1-hour periods
613    /// catalog.consolidate_data_by_period(
614    ///     "trades",
615    ///     Some("BTCUSD".to_string()),
616    ///     Some(3600000000000), // 1 hour in nanoseconds
617    ///     Some(UnixNanos::from(1609459200000000000)),
618    ///     Some(UnixNanos::from(1609545600000000000)),
619    ///     Some(false)
620    /// )?;
621    /// # Ok::<(), anyhow::Error>(())
622    /// ```
623    pub fn consolidate_data_by_period(
624        &mut self,
625        type_name: &str,
626        identifier: Option<String>,
627        period_nanos: Option<u64>,
628        start: Option<UnixNanos>,
629        end: Option<UnixNanos>,
630        ensure_contiguous_files: Option<bool>,
631    ) -> anyhow::Result<()> {
632        // Use match statement to call the generic consolidate_data_by_period for various types
633        match type_name {
634            "quotes" => {
635                self.consolidate_data_by_period_generic::<QuoteTick>(
636                    identifier,
637                    period_nanos,
638                    start,
639                    end,
640                    ensure_contiguous_files,
641                )?;
642            }
643            "trades" => {
644                self.consolidate_data_by_period_generic::<TradeTick>(
645                    identifier,
646                    period_nanos,
647                    start,
648                    end,
649                    ensure_contiguous_files,
650                )?;
651            }
652            "order_book_deltas" => {
653                self.consolidate_data_by_period_generic::<OrderBookDelta>(
654                    identifier,
655                    period_nanos,
656                    start,
657                    end,
658                    ensure_contiguous_files,
659                )?;
660            }
661            "order_book_depths" => {
662                self.consolidate_data_by_period_generic::<OrderBookDepth10>(
663                    identifier,
664                    period_nanos,
665                    start,
666                    end,
667                    ensure_contiguous_files,
668                )?;
669            }
670            "bars" => {
671                self.consolidate_data_by_period_generic::<Bar>(
672                    identifier,
673                    period_nanos,
674                    start,
675                    end,
676                    ensure_contiguous_files,
677                )?;
678            }
679            "index_prices" => {
680                self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
681                    identifier,
682                    period_nanos,
683                    start,
684                    end,
685                    ensure_contiguous_files,
686                )?;
687            }
688            "mark_prices" => {
689                self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
690                    identifier,
691                    period_nanos,
692                    start,
693                    end,
694                    ensure_contiguous_files,
695                )?;
696            }
697            "instrument_closes" => {
698                self.consolidate_data_by_period_generic::<InstrumentClose>(
699                    identifier,
700                    period_nanos,
701                    start,
702                    end,
703                    ensure_contiguous_files,
704                )?;
705            }
706            _ => {
707                anyhow::bail!("Unknown data type for consolidation: {}", type_name);
708            }
709        }
710
711        Ok(())
712    }
713
714    /// Generic consolidate data files by splitting them into fixed time periods.
715    ///
716    /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
717    /// to ensure compile-time correctness and enable reuse across different data types.
718    ///
719    /// # Type Parameters
720    ///
721    /// - `T`: The data type to consolidate, must implement required traits for serialization.
722    ///
723    /// # Parameters
724    ///
725    /// - `identifier`: Optional instrument ID to target a specific instrument's data.
726    /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
727    /// - `start`: Optional start timestamp for consolidation range.
728    /// - `end`: Optional end timestamp for consolidation range.
729    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
730    ///
731    /// # Returns
732    ///
733    /// Returns `Ok(())` on success, or an error if consolidation fails.
734    pub fn consolidate_data_by_period_generic<T>(
735        &mut self,
736        identifier: Option<String>,
737        period_nanos: Option<u64>,
738        start: Option<UnixNanos>,
739        end: Option<UnixNanos>,
740        ensure_contiguous_files: Option<bool>,
741    ) -> anyhow::Result<()>
742    where
743        T: DecodeDataFromRecordBatch
744            + CatalogPathPrefix
745            + EncodeToRecordBatch
746            + HasTsInit
747            + TryFrom<Data>
748            + Clone,
749    {
750        let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
751        let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
752
753        // Use get_intervals for cleaner implementation
754        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
755
756        if intervals.is_empty() {
757            return Ok(()); // No files to consolidate
758        }
759
760        // Use auxiliary function to prepare all queries for execution
761        let queries_to_execute = self.prepare_consolidation_queries(
762            T::path_prefix(),
763            identifier.clone(),
764            &intervals,
765            period_nanos,
766            start,
767            end,
768            ensure_contiguous_files,
769        )?;
770
771        if queries_to_execute.is_empty() {
772            return Ok(()); // No queries to execute
773        }
774
775        // Get directory for file operations
776        let directory = self.make_path(T::path_prefix(), identifier.clone())?;
777        let mut existing_files = self.list_parquet_files(&directory)?;
778        existing_files.sort();
779
780        // Track files to remove and maintain existing_files list
781        let mut files_to_remove = HashSet::new();
782        let original_files_count = existing_files.len();
783
784        // Phase 2: Execute queries, write, and delete
785        let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
786
787        for query_info in queries_to_execute {
788            // Query data for this period using query_typed_data
789            let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
790
791            let period_data = self.query_typed_data::<T>(
792                instrument_ids,
793                Some(UnixNanos::from(query_info.query_start)),
794                Some(UnixNanos::from(query_info.query_end)),
795                None,
796                Some(existing_files.clone()),
797            )?;
798
799            if period_data.is_empty() {
800                // Skip if no data found, but maintain contiguity by using query start
801                if file_start_ns.is_none() {
802                    file_start_ns = Some(query_info.query_start);
803                }
804                continue;
805            }
806            file_start_ns = None;
807
808            // Determine final file timestamps
809            let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
810                // Use period boundaries for file naming, maintaining contiguity
811                if file_start_ns.is_none() {
812                    file_start_ns = Some(query_info.query_start);
813                }
814                (file_start_ns.unwrap(), query_info.query_end)
815            } else {
816                // Use actual data timestamps for file naming
817                let first_ts = period_data.first().unwrap().ts_init().as_u64();
818                let last_ts = period_data.last().unwrap().ts_init().as_u64();
819                (first_ts, last_ts)
820            };
821
822            // Check again if target file exists (in case it was created during this process)
823            let target_filename = format!(
824                "{}/{}",
825                directory,
826                timestamps_to_filename(
827                    UnixNanos::from(final_start_ns),
828                    UnixNanos::from(final_end_ns)
829                )
830            );
831
832            if self.file_exists(&target_filename)? {
833                // Skip if target file already exists
834                continue;
835            }
836
837            // Write consolidated data for this period using write_to_parquet
838            // Use skip_disjoint_check since we're managing file removal carefully
839            let start_ts = UnixNanos::from(final_start_ns);
840            let end_ts = UnixNanos::from(final_end_ns);
841            self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
842
843            // Identify files that are completely covered by this period
844            // Only remove files AFTER successfully writing a new file
845            // Use slice copy to avoid modification during iteration (match Python logic)
846            for file in existing_files.clone() {
847                if let Some(interval) = parse_filename_timestamps(&file)
848                    && interval.1 <= query_info.query_end
849                {
850                    files_to_remove.insert(file.clone());
851                    existing_files.retain(|f| f != &file);
852                }
853            }
854
855            // Remove files as soon as we have some to remove
856            if !files_to_remove.is_empty() {
857                for file in files_to_remove.drain() {
858                    self.delete_file(&file)?;
859                }
860            }
861        }
862
863        // Remove any remaining files that weren't removed in the loop
864        // This matches the Python implementation's final cleanup step
865        // Only remove files if any consolidation actually happened (i.e., files were processed)
866        let files_were_processed = existing_files.len() < original_files_count;
867        if files_were_processed {
868            for file in existing_files {
869                self.delete_file(&file)?;
870            }
871        }
872
873        Ok(())
874    }
875
876    /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
877    ///
878    /// This auxiliary function handles all the preparation logic for consolidation:
879    /// 1. Filters intervals by time range.
880    /// 2. Groups intervals into contiguous groups.
881    /// 3. Identifies and creates split operations for data preservation.
882    /// 4. Generates period-based consolidation queries.
883    /// 5. Checks for existing target files.
884    #[allow(clippy::too_many_arguments)]
885    pub fn prepare_consolidation_queries(
886        &self,
887        type_name: &str,
888        identifier: Option<String>,
889        intervals: &[(u64, u64)],
890        period_nanos: u64,
891        start: Option<UnixNanos>,
892        end: Option<UnixNanos>,
893        ensure_contiguous_files: bool,
894    ) -> anyhow::Result<Vec<ConsolidationQuery>> {
895        // Filter intervals by time range if specified
896        let used_start = start.map(|s| s.as_u64());
897        let used_end = end.map(|e| e.as_u64());
898
899        let mut filtered_intervals = Vec::new();
900        for &(interval_start, interval_end) in intervals {
901            // Check if interval overlaps with the specified range
902            if (used_start.is_none() || used_start.unwrap() <= interval_end)
903                && (used_end.is_none() || interval_start <= used_end.unwrap())
904            {
905                filtered_intervals.push((interval_start, interval_end));
906            }
907        }
908
909        if filtered_intervals.is_empty() {
910            return Ok(Vec::new()); // No intervals in the specified range
911        }
912
913        // Check contiguity of filtered intervals if required
914        if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
915            anyhow::bail!(
916                "Intervals are not contiguous. When ensure_contiguous_files=true, \
917                 all files in the consolidation range must have contiguous timestamps."
918            );
919        }
920
921        // Group intervals into contiguous groups to preserve holes between groups
922        // but allow consolidation within each contiguous group
923        let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
924
925        let mut queries_to_execute = Vec::new();
926
927        // Handle interval splitting by creating split operations for data preservation
928        if !filtered_intervals.is_empty() {
929            if let Some(start_ts) = used_start {
930                let first_interval = filtered_intervals[0];
931                if first_interval.0 < start_ts && start_ts <= first_interval.1 {
932                    // Split before start: preserve data from interval_start to start-1
933                    queries_to_execute.push(ConsolidationQuery {
934                        query_start: first_interval.0,
935                        query_end: start_ts - 1,
936                        use_period_boundaries: false,
937                    });
938                }
939            }
940
941            if let Some(end_ts) = used_end {
942                let last_interval = filtered_intervals[filtered_intervals.len() - 1];
943                if last_interval.0 <= end_ts && end_ts < last_interval.1 {
944                    // Split after end: preserve data from end+1 to interval_end
945                    queries_to_execute.push(ConsolidationQuery {
946                        query_start: end_ts + 1,
947                        query_end: last_interval.1,
948                        use_period_boundaries: false,
949                    });
950                }
951            }
952        }
953
954        // Generate period-based consolidation queries for each contiguous group
955        for group in contiguous_groups {
956            let group_start = group[0].0;
957            let group_end = group[group.len() - 1].1;
958
959            // Apply start/end filtering to the group
960            let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
961            let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
962
963            if effective_start > effective_end {
964                continue; // Skip if no overlap
965            }
966
967            // Generate period-based queries within this contiguous group
968            let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
969
970            // Add safety check to prevent infinite loops (match Python logic)
971            let max_iterations = 10000;
972            let mut iteration_count = 0;
973
974            while current_start_ns <= effective_end {
975                iteration_count += 1;
976                if iteration_count > max_iterations {
977                    // Safety break to prevent infinite loops
978                    break;
979                }
980                let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
981
982                // Check if target file already exists (only when ensure_contiguous_files is true)
983                if ensure_contiguous_files {
984                    let directory = self.make_path(type_name, identifier.clone())?;
985                    let target_filename = format!(
986                        "{}/{}",
987                        directory,
988                        timestamps_to_filename(
989                            UnixNanos::from(current_start_ns),
990                            UnixNanos::from(current_end_ns)
991                        )
992                    );
993
994                    if self.file_exists(&target_filename)? {
995                        // Skip if target file already exists
996                        current_start_ns += period_nanos;
997                        continue;
998                    }
999                }
1000
1001                // Add query to execution list
1002                queries_to_execute.push(ConsolidationQuery {
1003                    query_start: current_start_ns,
1004                    query_end: current_end_ns,
1005                    use_period_boundaries: ensure_contiguous_files,
1006                });
1007
1008                // Move to next period
1009                current_start_ns += period_nanos;
1010
1011                if current_start_ns > effective_end {
1012                    break;
1013                }
1014            }
1015        }
1016
1017        // Sort queries by start date to enable efficient file removal
1018        // Files can be removed when interval[1] <= query_info["query_end"]
1019        // and processing in chronological order ensures optimal cleanup
1020        queries_to_execute.sort_by_key(|q| q.query_start);
1021
1022        Ok(queries_to_execute)
1023    }
1024
1025    /// Groups intervals into contiguous groups for efficient consolidation.
1026    ///
1027    /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1028    /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1029    /// before the start of the next interval. This grouping preserves data gaps while allowing
1030    /// consolidation within each contiguous group.
1031    ///
1032    /// # Parameters
1033    ///
1034    /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1035    ///
1036    /// # Returns
1037    ///
1038    /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1039    /// Returns an empty vector if the input is empty.
1040    ///
1041    /// # Algorithm
1042    ///
1043    /// 1. Starts with the first interval in a new group.
1044    /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1045    /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1046    /// 4. If not contiguous, starts a new group.
1047    /// 5. Returns all groups.
1048    ///
1049    /// # Examples
1050    ///
1051    /// ```text
1052    /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1053    /// Returns: [[(1,5), (6,10), (11,15)]]
1054    ///
1055    /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1056    /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1057    /// ```
1058    ///
1059    /// # Notes
1060    ///
1061    /// - Input intervals should be sorted by start timestamp.
1062    /// - Gaps between groups are preserved and not consolidated.
1063    /// - Used internally by period-based consolidation methods.
1064    #[must_use]
1065    pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1066        if intervals.is_empty() {
1067            return Vec::new();
1068        }
1069
1070        let mut contiguous_groups = Vec::new();
1071        let mut current_group = vec![intervals[0]];
1072
1073        for i in 1..intervals.len() {
1074            let prev_interval = intervals[i - 1];
1075            let curr_interval = intervals[i];
1076
1077            // Check if current interval is contiguous with previous (end + 1 == start)
1078            if prev_interval.1 + 1 == curr_interval.0 {
1079                current_group.push(curr_interval);
1080            } else {
1081                // Gap found, start new group
1082                contiguous_groups.push(current_group);
1083                current_group = vec![curr_interval];
1084            }
1085        }
1086
1087        // Add the last group
1088        contiguous_groups.push(current_group);
1089
1090        contiguous_groups
1091    }
1092
1093    /// Checks if a file exists in the object store.
1094    ///
1095    /// This method performs a HEAD operation on the object store to determine if a file
1096    /// exists without downloading its content. It works with both local and remote object stores.
1097    ///
1098    /// # Parameters
1099    ///
1100    /// - `path`: The file path to check, relative to the catalog structure.
1101    ///
1102    /// # Returns
1103    ///
1104    /// Returns `true` if the file exists, `false` if it doesn't exist.
1105    ///
1106    /// # Errors
1107    ///
1108    /// Returns an error if the object store operation fails due to network issues,
1109    /// authentication problems, or other I/O errors.
1110    fn file_exists(&self, path: &str) -> anyhow::Result<bool> {
1111        let object_path = self.to_object_path(path);
1112        let exists =
1113            self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1114        Ok(exists)
1115    }
1116
1117    /// Deletes a file from the object store.
1118    ///
1119    /// This method removes a file from the object store. The operation is permanent
1120    /// and cannot be undone. It works with both local filesystems and remote object stores.
1121    ///
1122    /// # Parameters
1123    ///
1124    /// - `path`: The file path to delete, relative to the catalog structure.
1125    ///
1126    /// # Returns
1127    ///
1128    /// Returns `Ok(())` on successful deletion.
1129    ///
1130    /// # Errors
1131    ///
1132    /// Returns an error if:
1133    /// - The file doesn't exist.
1134    /// - Permission is denied.
1135    /// - Network issues occur (for remote stores).
1136    /// - The object store operation fails.
1137    ///
1138    /// # Safety
1139    ///
1140    /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1141    fn delete_file(&self, path: &str) -> anyhow::Result<()> {
1142        let object_path = self.to_object_path(path);
1143        self.execute_async(async {
1144            self.object_store
1145                .delete(&object_path)
1146                .await
1147                .map_err(anyhow::Error::from)
1148        })?;
1149        Ok(())
1150    }
1151
1152    /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1153    ///
1154    /// This method scans all leaf data directories in the catalog and renames files based on
1155    /// the actual timestamp range of their content. This is useful when files have been
1156    /// modified or when filename conventions have changed.
1157    ///
1158    /// # Returns
1159    ///
1160    /// Returns `Ok(())` on success, or an error if the operation fails.
1161    ///
1162    /// # Errors
1163    ///
1164    /// Returns an error if:
1165    /// - Directory listing fails.
1166    /// - File metadata reading fails.
1167    /// - File rename operations fail.
1168    /// - Interval validation fails after renaming.
1169    ///
1170    /// # Examples
1171    ///
1172    /// ```rust,no_run
1173    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1174    ///
1175    /// let catalog = ParquetDataCatalog::new(/* ... */);
1176    ///
1177    /// // Reset all filenames in the catalog
1178    /// catalog.reset_all_file_names()?;
1179    /// # Ok::<(), anyhow::Error>(())
1180    /// ```
1181    pub fn reset_all_file_names(&self) -> anyhow::Result<()> {
1182        let leaf_directories = self.find_leaf_data_directories()?;
1183
1184        for directory in leaf_directories {
1185            self.reset_file_names(&directory)?;
1186        }
1187
1188        Ok(())
1189    }
1190
1191    /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1192    ///
1193    /// This method renames files in a specific directory based on the actual timestamp
1194    /// range of their content. This is useful for correcting filenames after data
1195    /// modifications or when filename conventions have changed.
1196    ///
1197    /// # Parameters
1198    ///
1199    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1200    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1201    ///
1202    /// # Returns
1203    ///
1204    /// Returns `Ok(())` on success, or an error if the operation fails.
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns an error if:
1209    /// - The directory path cannot be constructed.
1210    /// - File metadata reading fails.
1211    /// - File rename operations fail.
1212    /// - Interval validation fails after renaming.
1213    ///
1214    /// # Examples
1215    ///
1216    /// ```rust,no_run
1217    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1218    ///
1219    /// let catalog = ParquetDataCatalog::new(/* ... */);
1220    ///
1221    /// // Reset filenames for all quote files
1222    /// catalog.reset_data_file_names("quotes", None)?;
1223    ///
1224    /// // Reset filenames for a specific instrument's trade files
1225    /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1226    /// # Ok::<(), anyhow::Error>(())
1227    /// ```
1228    pub fn reset_data_file_names(
1229        &self,
1230        data_cls: &str,
1231        instrument_id: Option<String>,
1232    ) -> anyhow::Result<()> {
1233        let directory = self.make_path(data_cls, instrument_id)?;
1234        self.reset_file_names(&directory)
1235    }
1236
1237    /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1238    ///
1239    /// This internal method scans all Parquet files in a directory, reads their metadata to
1240    /// determine the actual timestamp range of their content, and renames the files accordingly.
1241    /// This ensures that filenames accurately reflect the data they contain.
1242    ///
1243    /// # Parameters
1244    ///
1245    /// - `directory`: The directory path containing Parquet files to rename.
1246    ///
1247    /// # Returns
1248    ///
1249    /// Returns `Ok(())` on success, or an error if the operation fails.
1250    ///
1251    /// # Process
1252    ///
1253    /// 1. Lists all Parquet files in the directory
1254    /// 2. For each file, reads metadata to extract min/max timestamps
1255    /// 3. Generates a new filename based on actual timestamp range
1256    /// 4. Moves the file to the new name using object store operations
1257    /// 5. Validates that intervals remain disjoint after renaming
1258    ///
1259    /// # Errors
1260    ///
1261    /// Returns an error if:
1262    /// - Directory listing fails.
1263    /// - Metadata reading fails for any file.
1264    /// - File move operations fail.
1265    /// - Interval validation fails after renaming.
1266    /// - Object store operations fail.
1267    ///
1268    /// # Notes
1269    ///
1270    /// - This operation can be time-consuming for directories with many files.
1271    /// - Files are processed sequentially to avoid conflicts.
1272    /// - The operation is atomic per file but not across the entire directory.
1273    fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1274        let parquet_files = self.list_parquet_files(directory)?;
1275
1276        for file in parquet_files {
1277            let object_path = ObjectPath::from(file.as_str());
1278            let (first_ts, last_ts) = self.execute_async(async {
1279                min_max_from_parquet_metadata_object_store(
1280                    self.object_store.clone(),
1281                    &object_path,
1282                    "ts_init",
1283                )
1284                .await
1285            })?;
1286
1287            let new_filename =
1288                timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1289            let new_file_path = make_object_store_path(directory, &[&new_filename]);
1290            let new_object_path = ObjectPath::from(new_file_path);
1291
1292            self.move_file(&object_path, &new_object_path)?;
1293        }
1294
1295        let intervals = self.get_directory_intervals(directory)?;
1296
1297        if !are_intervals_disjoint(&intervals) {
1298            anyhow::bail!("Intervals are not disjoint after resetting file names");
1299        }
1300
1301        Ok(())
1302    }
1303
1304    /// Finds all leaf data directories in the catalog.
1305    ///
1306    /// A leaf directory is one that contains data files but no subdirectories.
1307    /// This method is used to identify directories that can be processed for
1308    /// consolidation or other operations.
1309    ///
1310    /// # Returns
1311    ///
1312    /// Returns a vector of directory path strings representing leaf directories,
1313    /// or an error if directory traversal fails.
1314    ///
1315    /// # Errors
1316    ///
1317    /// Returns an error if:
1318    /// - Object store listing operations fail.
1319    /// - Directory structure cannot be analyzed.
1320    ///
1321    /// # Examples
1322    ///
1323    /// ```rust,no_run
1324    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1325    ///
1326    /// let catalog = ParquetDataCatalog::new(/* ... */);
1327    ///
1328    /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1329    /// for dir in leaf_dirs {
1330    ///     println!("Found leaf directory: {}", dir);
1331    /// }
1332    /// # Ok::<(), anyhow::Error>(())
1333    /// ```
1334    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1335        let data_dir = make_object_store_path(&self.base_path, &["data"]);
1336
1337        let leaf_dirs = self.execute_async(async {
1338            let mut all_paths = std::collections::HashSet::new();
1339            let mut directories = std::collections::HashSet::new();
1340            let mut files_in_dirs = std::collections::HashMap::new();
1341
1342            // List all objects under the data directory
1343            let prefix = ObjectPath::from(format!("{data_dir}/"));
1344            let mut stream = self.object_store.list(Some(&prefix));
1345
1346            while let Some(object) = stream.next().await {
1347                let object = object?;
1348                let path_str = object.location.to_string();
1349                all_paths.insert(path_str.clone());
1350
1351                // Extract directory path
1352                if let Some(parent) = std::path::Path::new(&path_str).parent() {
1353                    let parent_str = parent.to_string_lossy().to_string();
1354                    directories.insert(parent_str.clone());
1355
1356                    // Track files in each directory
1357                    files_in_dirs
1358                        .entry(parent_str)
1359                        .or_insert_with(Vec::new)
1360                        .push(path_str);
1361                }
1362            }
1363
1364            // Find leaf directories (directories with files but no subdirectories)
1365            let mut leaf_dirs = Vec::new();
1366            for dir in &directories {
1367                let has_files = files_in_dirs
1368                    .get(dir)
1369                    .is_some_and(|files| !files.is_empty());
1370                let has_subdirs = directories
1371                    .iter()
1372                    .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1373
1374                if has_files && !has_subdirs {
1375                    leaf_dirs.push(dir.clone());
1376                }
1377            }
1378
1379            Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1380        })?;
1381
1382        Ok(leaf_dirs)
1383    }
1384
1385    /// Deletes data within a specified time range for a specific data type and instrument.
1386    ///
1387    /// This method identifies all parquet files that intersect with the specified time range
1388    /// and handles them appropriately:
1389    /// - Files completely within the range are deleted
1390    /// - Files partially overlapping the range are split to preserve data outside the range
1391    /// - The original intersecting files are removed after processing
1392    ///
1393    /// # Parameters
1394    ///
1395    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1396    /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1397    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1398    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1399    ///
1400    /// # Returns
1401    ///
1402    /// Returns `Ok(())` on success, or an error if deletion fails.
1403    ///
1404    /// # Errors
1405    ///
1406    /// Returns an error if:
1407    /// - The directory path cannot be constructed.
1408    /// - File operations fail.
1409    /// - Data querying or writing fails.
1410    ///
1411    /// # Notes
1412    ///
1413    /// - This operation permanently removes data and cannot be undone.
1414    /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1415    /// - The method ensures data integrity by using atomic operations where possible.
1416    /// - Empty directories are not automatically removed after deletion.
1417    ///
1418    /// # Examples
1419    ///
1420    /// ```rust,no_run
1421    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1422    /// use nautilus_core::UnixNanos;
1423    ///
1424    /// let catalog = ParquetDataCatalog::new(/* ... */);
1425    ///
1426    /// // Delete all quote data for a specific instrument
1427    /// catalog.delete_data_range(
1428    ///     "quotes",
1429    ///     Some("BTCUSD".to_string()),
1430    ///     None,
1431    ///     None
1432    /// )?;
1433    ///
1434    /// // Delete trade data within a specific time range
1435    /// catalog.delete_data_range(
1436    ///     "trades",
1437    ///     None,
1438    ///     Some(UnixNanos::from(1609459200000000000)),
1439    ///     Some(UnixNanos::from(1609545600000000000))
1440    /// )?;
1441    /// # Ok::<(), anyhow::Error>(())
1442    /// ```
1443    pub fn delete_data_range(
1444        &mut self,
1445        type_name: &str,
1446        identifier: Option<String>,
1447        start: Option<UnixNanos>,
1448        end: Option<UnixNanos>,
1449    ) -> anyhow::Result<()> {
1450        // Use match statement to call the generic delete_data_range for various types
1451        match type_name {
1452            "quotes" => self.delete_data_range_generic::<QuoteTick>(identifier, start, end),
1453            "trades" => self.delete_data_range_generic::<TradeTick>(identifier, start, end),
1454            "bars" => self.delete_data_range_generic::<Bar>(identifier, start, end),
1455            "order_book_deltas" => {
1456                self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1457            }
1458            "order_book_depth10" => {
1459                self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1460            }
1461            _ => anyhow::bail!("Unsupported data type: {type_name}"),
1462        }
1463    }
1464
1465    /// Deletes data within a specified time range across the entire catalog.
1466    ///
1467    /// This method identifies all leaf directories in the catalog that contain parquet files
1468    /// and deletes data within the specified time range from each directory. A leaf directory
1469    /// is one that contains files but no subdirectories. This is a convenience method that
1470    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1471    ///
1472    /// # Parameters
1473    ///
1474    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1475    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1476    ///
1477    /// # Returns
1478    ///
1479    /// Returns `Ok(())` on success, or an error if deletion fails.
1480    ///
1481    /// # Errors
1482    ///
1483    /// Returns an error if:
1484    /// - Directory traversal fails.
1485    /// - Data class extraction from paths fails.
1486    /// - Individual delete operations fail.
1487    ///
1488    /// # Notes
1489    ///
1490    /// - This operation permanently removes data and cannot be undone.
1491    /// - The deletion process handles file intersections intelligently by splitting files
1492    ///   when they partially overlap with the deletion range.
1493    /// - Files completely within the deletion range are removed entirely.
1494    /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1495    /// - This method is useful for bulk data cleanup operations across the entire catalog.
1496    /// - Empty directories are not automatically removed after deletion.
1497    ///
1498    /// # Examples
1499    ///
1500    /// ```rust,no_run
1501    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1502    /// use nautilus_core::UnixNanos;
1503    ///
1504    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1505    ///
1506    /// // Delete all data before a specific date across entire catalog
1507    /// catalog.delete_catalog_range(
1508    ///     None,
1509    ///     Some(UnixNanos::from(1609459200000000000))
1510    /// )?;
1511    ///
1512    /// // Delete all data within a specific range across entire catalog
1513    /// catalog.delete_catalog_range(
1514    ///     Some(UnixNanos::from(1609459200000000000)),
1515    ///     Some(UnixNanos::from(1609545600000000000))
1516    /// )?;
1517    ///
1518    /// // Delete all data after a specific date across entire catalog
1519    /// catalog.delete_catalog_range(
1520    ///     Some(UnixNanos::from(1609459200000000000)),
1521    ///     None
1522    /// )?;
1523    /// # Ok::<(), anyhow::Error>(())
1524    /// ```
1525    pub fn delete_catalog_range(
1526        &mut self,
1527        start: Option<UnixNanos>,
1528        end: Option<UnixNanos>,
1529    ) -> anyhow::Result<()> {
1530        let leaf_directories = self.find_leaf_data_directories()?;
1531
1532        for directory in leaf_directories {
1533            if let Ok((Some(data_type), identifier)) =
1534                self.extract_data_cls_and_identifier_from_path(&directory)
1535            {
1536                // Call the existing delete_data_range method
1537                if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1538                    eprintln!("Failed to delete data in directory {directory}: {e}");
1539                    // Continue with other directories instead of failing completely
1540                }
1541            }
1542        }
1543
1544        Ok(())
1545    }
1546
1547    /// Generic implementation for deleting data within a specified time range.
1548    ///
1549    /// This method provides the core deletion logic that works with any data type
1550    /// that implements the required traits. It handles file intersection analysis,
1551    /// data splitting for partial overlaps, and file cleanup.
1552    ///
1553    /// # Type Parameters
1554    ///
1555    /// - `T`: The data type that implements required traits for catalog operations.
1556    ///
1557    /// # Parameters
1558    ///
1559    /// - `identifier`: Optional instrument ID to delete data for.
1560    /// - `start`: Optional start timestamp for the deletion range.
1561    /// - `end`: Optional end timestamp for the deletion range.
1562    ///
1563    /// # Returns
1564    ///
1565    /// Returns `Ok(())` on success, or an error if deletion fails.
1566    pub fn delete_data_range_generic<T>(
1567        &mut self,
1568        identifier: Option<String>,
1569        start: Option<UnixNanos>,
1570        end: Option<UnixNanos>,
1571    ) -> anyhow::Result<()>
1572    where
1573        T: DecodeDataFromRecordBatch
1574            + CatalogPathPrefix
1575            + EncodeToRecordBatch
1576            + HasTsInit
1577            + TryFrom<Data>
1578            + Clone,
1579    {
1580        // Get intervals for cleaner implementation
1581        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1582
1583        if intervals.is_empty() {
1584            return Ok(()); // No files to process
1585        }
1586
1587        // Prepare all operations for execution
1588        let operations_to_execute = self.prepare_delete_operations(
1589            T::path_prefix(),
1590            identifier.clone(),
1591            &intervals,
1592            start,
1593            end,
1594        )?;
1595
1596        if operations_to_execute.is_empty() {
1597            return Ok(()); // No operations to execute
1598        }
1599
1600        // Execute all operations
1601        let mut files_to_remove = HashSet::<String>::new();
1602
1603        for operation in operations_to_execute {
1604            // Reset the session before each operation to ensure fresh data is loaded
1605            // This clears any cached table registrations that might interfere with file operations
1606            self.reset_session();
1607            match operation.operation_type.as_str() {
1608                "split_before" => {
1609                    // Query data before the deletion range and write it
1610                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1611                    let before_data = self.query_typed_data::<T>(
1612                        instrument_ids,
1613                        Some(UnixNanos::from(operation.query_start)),
1614                        Some(UnixNanos::from(operation.query_end)),
1615                        None,
1616                        Some(operation.files.clone()),
1617                    )?;
1618
1619                    if !before_data.is_empty() {
1620                        let start_ts = UnixNanos::from(operation.file_start_ns);
1621                        let end_ts = UnixNanos::from(operation.file_end_ns);
1622                        self.write_to_parquet(
1623                            before_data,
1624                            Some(start_ts),
1625                            Some(end_ts),
1626                            Some(true),
1627                        )?;
1628                    }
1629                }
1630                "split_after" => {
1631                    // Query data after the deletion range and write it
1632                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1633                    let after_data = self.query_typed_data::<T>(
1634                        instrument_ids,
1635                        Some(UnixNanos::from(operation.query_start)),
1636                        Some(UnixNanos::from(operation.query_end)),
1637                        None,
1638                        Some(operation.files.clone()),
1639                    )?;
1640
1641                    if !after_data.is_empty() {
1642                        let start_ts = UnixNanos::from(operation.file_start_ns);
1643                        let end_ts = UnixNanos::from(operation.file_end_ns);
1644                        self.write_to_parquet(
1645                            after_data,
1646                            Some(start_ts),
1647                            Some(end_ts),
1648                            Some(true),
1649                        )?;
1650                    }
1651                }
1652                _ => {
1653                    // For "remove" operations, just mark files for removal
1654                }
1655            }
1656
1657            // Mark files for removal (applies to all operation types)
1658            for file in operation.files {
1659                files_to_remove.insert(file);
1660            }
1661        }
1662
1663        // Remove all files that were processed
1664        for file in files_to_remove {
1665            if let Err(e) = self.delete_file(&file) {
1666                eprintln!("Failed to delete file {file}: {e}");
1667            }
1668        }
1669
1670        Ok(())
1671    }
1672
1673    /// Prepares all operations for data deletion by identifying files that need to be
1674    /// split or removed.
1675    ///
1676    /// This auxiliary function handles all the preparation logic for deletion:
1677    /// 1. Filters intervals by time range
1678    /// 2. Identifies files that intersect with the deletion range
1679    /// 3. Creates split operations for files that partially overlap
1680    /// 4. Generates removal operations for files completely within the range
1681    ///
1682    /// # Parameters
1683    ///
1684    /// - `type_name`: The data type directory name for path generation.
1685    /// - `identifier`: Optional instrument identifier for path generation.
1686    /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1687    /// - `start`: Optional start timestamp for deletion range.
1688    /// - `end`: Optional end timestamp for deletion range.
1689    ///
1690    /// # Returns
1691    ///
1692    /// Returns a vector of `DeleteOperation` structs ready for execution.
1693    pub fn prepare_delete_operations(
1694        &self,
1695        type_name: &str,
1696        identifier: Option<String>,
1697        intervals: &[(u64, u64)],
1698        start: Option<UnixNanos>,
1699        end: Option<UnixNanos>,
1700    ) -> anyhow::Result<Vec<DeleteOperation>> {
1701        // Convert start/end to nanoseconds
1702        let delete_start_ns = start.map(|s| s.as_u64());
1703        let delete_end_ns = end.map(|e| e.as_u64());
1704
1705        let mut operations = Vec::new();
1706
1707        // Get directory for file path construction
1708        let directory = self.make_path(type_name, identifier)?;
1709
1710        // Process each interval (which represents an actual file)
1711        for &(file_start_ns, file_end_ns) in intervals {
1712            // Check if file intersects with deletion range
1713            let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1714                && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1715
1716            if !intersects {
1717                continue; // File doesn't intersect with deletion range
1718            }
1719
1720            // Construct file path from interval timestamps
1721            let filename = timestamps_to_filename(
1722                UnixNanos::from(file_start_ns),
1723                UnixNanos::from(file_end_ns),
1724            );
1725            let file_path = make_object_store_path(&directory, &[&filename]);
1726
1727            // Determine what type of operation is needed
1728            let file_completely_within_range = (delete_start_ns.is_none()
1729                || delete_start_ns.unwrap() <= file_start_ns)
1730                && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1731
1732            if file_completely_within_range {
1733                // File is completely within deletion range - just mark for removal
1734                operations.push(DeleteOperation {
1735                    operation_type: "remove".to_string(),
1736                    files: vec![file_path],
1737                    query_start: 0,
1738                    query_end: 0,
1739                    file_start_ns: 0,
1740                    file_end_ns: 0,
1741                });
1742            } else {
1743                // File partially overlaps - need to split
1744                if let Some(delete_start) = delete_start_ns
1745                    && file_start_ns < delete_start
1746                {
1747                    // Keep data before deletion range
1748                    operations.push(DeleteOperation {
1749                        operation_type: "split_before".to_string(),
1750                        files: vec![file_path.clone()],
1751                        query_start: file_start_ns,
1752                        query_end: delete_start.saturating_sub(1), // Exclusive end
1753                        file_start_ns,
1754                        file_end_ns: delete_start.saturating_sub(1),
1755                    });
1756                }
1757
1758                if let Some(delete_end) = delete_end_ns
1759                    && delete_end < file_end_ns
1760                {
1761                    // Keep data after deletion range
1762                    operations.push(DeleteOperation {
1763                        operation_type: "split_after".to_string(),
1764                        files: vec![file_path.clone()],
1765                        query_start: delete_end.saturating_add(1), // Exclusive start
1766                        query_end: file_end_ns,
1767                        file_start_ns: delete_end.saturating_add(1),
1768                        file_end_ns,
1769                    });
1770                }
1771            }
1772        }
1773
1774        Ok(operations)
1775    }
1776}