nautilus_persistence/backend/
catalog_operations.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use std::collections::HashSet;
22
23use anyhow::Result;
24use futures::StreamExt;
25use nautilus_core::UnixNanos;
26use nautilus_model::data::{Data, HasTsInit};
27use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
28use object_store::path::Path as ObjectPath;
29
30use crate::{
31    backend::catalog::{
32        CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
33        extract_path_components, make_object_store_path, parse_filename_timestamps,
34        timestamps_to_filename,
35    },
36    parquet::{
37        combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
38    },
39};
40
41/// Information about a consolidation query to be executed.
42///
43/// This struct encapsulates all the information needed to execute a single consolidation
44/// operation, including the data range to query and file naming strategy.
45///
46/// # Fields
47///
48/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
49/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
50/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
51///
52/// # Usage
53///
54/// This struct is used internally by the consolidation system to plan and execute
55/// data consolidation operations. It allows the system to:
56/// - Separate query planning from execution.
57/// - Handle complex scenarios like data splitting.
58/// - Optimize file naming strategies.
59/// - Batch multiple operations efficiently.
60/// - Maintain file contiguity across periods.
61///
62/// # Examples
63///
64/// ```rust,no_run
65/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
66///
67/// // Regular consolidation query
68/// let query = ConsolidationQuery {
69///     query_start: 1609459200000000000,
70///     query_end: 1609545600000000000,
71///     use_period_boundaries: true,
72/// };
73///
74/// // Split operation to preserve data
75/// let split_query = ConsolidationQuery {
76///     query_start: 1609459200000000000,
77///     query_end: 1609462800000000000,
78///     use_period_boundaries: false,
79/// };
80/// ```
81#[derive(Debug, Clone)]
82pub struct ConsolidationQuery {
83    /// Start timestamp for the query range (inclusive, in nanoseconds)
84    pub query_start: u64,
85    /// End timestamp for the query range (inclusive, in nanoseconds)
86    pub query_end: u64,
87    /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
88    pub use_period_boundaries: bool,
89}
90
91/// Information about a deletion operation to be executed.
92///
93/// This struct encapsulates all the information needed to execute a single deletion
94/// operation, including the type of operation and file handling details.
95#[derive(Debug, Clone)]
96pub struct DeleteOperation {
97    /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
98    pub operation_type: String,
99    /// List of files involved in this operation.
100    pub files: Vec<String>,
101    /// Start timestamp for data query (used for split operations).
102    pub query_start: u64,
103    /// End timestamp for data query (used for split operations).
104    pub query_end: u64,
105    /// Start timestamp for new file naming (used for split operations).
106    pub file_start_ns: u64,
107    /// End timestamp for new file naming (used for split operations).
108    pub file_end_ns: u64,
109}
110
111impl ParquetDataCatalog {
112    /// Consolidates all data files in the catalog.
113    ///
114    /// This method identifies all leaf directories in the catalog that contain parquet files
115    /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
116    /// This is a convenience method that effectively calls `consolidate_data` for all data types
117    /// and instrument IDs in the catalog.
118    ///
119    /// # Parameters
120    ///
121    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
122    ///   greater than or equal to this value will be consolidated. If None, all files
123    ///   from the beginning of time will be considered.
124    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
125    ///   less than or equal to this value will be consolidated. If None, all files
126    ///   up to the end of time will be considered.
127    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
128    ///
129    /// # Returns
130    ///
131    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
132    ///
133    /// # Errors
134    ///
135    /// This function will return an error if:
136    /// - Directory listing fails.
137    /// - File consolidation operations fail.
138    /// - Interval validation fails (when `ensure_contiguous_files` is true).
139    ///
140    /// # Examples
141    ///
142    /// ```rust,no_run
143    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
144    /// use nautilus_core::UnixNanos;
145    ///
146    /// let catalog = ParquetDataCatalog::new(/* ... */);
147    ///
148    /// // Consolidate all files in the catalog
149    /// catalog.consolidate_catalog(None, None, None)?;
150    ///
151    /// // Consolidate only files within a specific time range
152    /// catalog.consolidate_catalog(
153    ///     Some(UnixNanos::from(1609459200000000000)),
154    ///     Some(UnixNanos::from(1609545600000000000)),
155    ///     Some(true)
156    /// )?;
157    /// # Ok::<(), anyhow::Error>(())
158    /// ```
159    pub fn consolidate_catalog(
160        &self,
161        start: Option<UnixNanos>,
162        end: Option<UnixNanos>,
163        ensure_contiguous_files: Option<bool>,
164    ) -> Result<()> {
165        let leaf_directories = self.find_leaf_data_directories()?;
166
167        for directory in leaf_directories {
168            self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
169        }
170
171        Ok(())
172    }
173
174    /// Consolidates data files for a specific data type and instrument.
175    ///
176    /// This method consolidates Parquet files within a specific directory (defined by data type
177    /// and optional instrument ID) by merging multiple files into a single file. This improves
178    /// query performance and can reduce storage overhead.
179    ///
180    /// # Parameters
181    ///
182    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
183    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
184    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
185    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
186    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
187    ///
188    /// # Returns
189    ///
190    /// Returns `Ok(())` on success, or an error if consolidation fails.
191    ///
192    /// # Errors
193    ///
194    /// This function will return an error if:
195    /// - The directory path cannot be constructed.
196    /// - File consolidation operations fail.
197    /// - Interval validation fails (when `ensure_contiguous_files` is true).
198    ///
199    /// # Examples
200    ///
201    /// ```rust,no_run
202    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
203    /// use nautilus_core::UnixNanos;
204    ///
205    /// let catalog = ParquetDataCatalog::new(/* ... */);
206    ///
207    /// // Consolidate all quote files for a specific instrument
208    /// catalog.consolidate_data(
209    ///     "quotes",
210    ///     Some("BTCUSD".to_string()),
211    ///     None,
212    ///     None,
213    ///     None
214    /// )?;
215    ///
216    /// // Consolidate trade files within a time range
217    /// catalog.consolidate_data(
218    ///     "trades",
219    ///     None,
220    ///     Some(UnixNanos::from(1609459200000000000)),
221    ///     Some(UnixNanos::from(1609545600000000000)),
222    ///     Some(true)
223    /// )?;
224    /// # Ok::<(), anyhow::Error>(())
225    /// ```
226    pub fn consolidate_data(
227        &self,
228        type_name: &str,
229        instrument_id: Option<String>,
230        start: Option<UnixNanos>,
231        end: Option<UnixNanos>,
232        ensure_contiguous_files: Option<bool>,
233    ) -> Result<()> {
234        let directory = self.make_path(type_name, instrument_id)?;
235        self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
236    }
237
238    /// Consolidates Parquet files within a specific directory by merging them into a single file.
239    ///
240    /// This internal method performs the actual consolidation work for a single directory.
241    /// It identifies files within the specified time range, validates their intervals,
242    /// and combines them into a single Parquet file with optimized storage.
243    ///
244    /// # Parameters
245    ///
246    /// - `directory`: The directory path containing Parquet files to consolidate.
247    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
248    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
249    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
250    ///
251    /// # Returns
252    ///
253    /// Returns `Ok(())` on success, or an error if consolidation fails.
254    ///
255    /// # Behavior
256    ///
257    /// - Skips consolidation if directory contains 1 or fewer files.
258    /// - Filters files by timestamp range if start/end are specified.
259    /// - Sorts intervals by start timestamp before consolidation.
260    /// - Creates a new file spanning the entire time range of input files.
261    /// - Validates interval disjointness after consolidation (if enabled).
262    ///
263    /// # Errors
264    ///
265    /// This function will return an error if:
266    /// - Directory listing fails.
267    /// - File combination operations fail.
268    /// - Interval validation fails (when `ensure_contiguous_files` is true).
269    /// - Object store operations fail.
270    fn consolidate_directory(
271        &self,
272        directory: &str,
273        start: Option<UnixNanos>,
274        end: Option<UnixNanos>,
275        ensure_contiguous_files: Option<bool>,
276    ) -> Result<()> {
277        let parquet_files = self.list_parquet_files(directory)?;
278
279        if parquet_files.len() <= 1 {
280            return Ok(());
281        }
282
283        let mut files_to_consolidate = Vec::new();
284        let mut intervals = Vec::new();
285        let start = start.map(|t| t.as_u64());
286        let end = end.map(|t| t.as_u64());
287
288        for file in parquet_files {
289            if let Some(interval) = parse_filename_timestamps(&file) {
290                let (interval_start, interval_end) = interval;
291                let include_file = match (start, end) {
292                    (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
293                    (Some(s), None) => interval_start >= s,
294                    (None, Some(e)) => interval_end <= e,
295                    (None, None) => true,
296                };
297
298                if include_file {
299                    files_to_consolidate.push(file);
300                    intervals.push(interval);
301                }
302            }
303        }
304
305        intervals.sort_by_key(|&(start, _)| start);
306
307        if !intervals.is_empty() {
308            let file_name = timestamps_to_filename(
309                UnixNanos::from(intervals[0].0),
310                UnixNanos::from(intervals.last().unwrap().1),
311            );
312            let path = make_object_store_path(directory, &[&file_name]);
313
314            // Convert string paths to ObjectPath for the function call
315            let object_paths: Vec<ObjectPath> = files_to_consolidate
316                .iter()
317                .map(|path| ObjectPath::from(path.as_str()))
318                .collect();
319
320            self.execute_async(async {
321                combine_parquet_files_from_object_store(
322                    self.object_store.clone(),
323                    object_paths,
324                    &ObjectPath::from(path),
325                    Some(self.compression),
326                    Some(self.max_row_group_size),
327                )
328                .await
329            })?;
330        }
331
332        if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
333            anyhow::bail!("Intervals are not disjoint after consolidating a directory");
334        }
335
336        Ok(())
337    }
338
339    /// Consolidates all data files in the catalog by splitting them into fixed time periods.
340    ///
341    /// This method identifies all leaf directories in the catalog that contain parquet files
342    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
343    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
344    /// and instrument IDs in the catalog.
345    ///
346    /// # Parameters
347    ///
348    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
349    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
350    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
351    ///   greater than or equal to this value will be consolidated. If None, all files
352    ///   from the beginning of time will be considered.
353    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
354    ///   less than or equal to this value will be consolidated. If None, all files
355    ///   up to the end of time will be considered.
356    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
357    ///   If false, uses actual data timestamps for file naming.
358    ///
359    /// # Returns
360    ///
361    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
362    ///
363    /// # Errors
364    ///
365    /// This function will return an error if:
366    /// - Directory listing fails.
367    /// - Data type extraction from path fails.
368    /// - Period-based consolidation operations fail.
369    ///
370    /// # Notes
371    ///
372    /// - This operation can be resource-intensive for large catalogs with many data types.
373    ///   and instruments.
374    /// - The consolidation process splits data into fixed time periods rather than combining.
375    ///   all files into a single file per directory.
376    /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
377    /// - Original files are removed and replaced with period-based consolidated files.
378    /// - This method is useful for periodic maintenance of the catalog to standardize.
379    ///   file organization by time periods.
380    ///
381    /// # Examples
382    ///
383    /// ```rust,no_run
384    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
385    /// use nautilus_core::UnixNanos;
386    ///
387    /// let catalog = ParquetDataCatalog::new(/* ... */);
388    ///
389    /// // Consolidate all files in the catalog by 1-day periods
390    /// catalog.consolidate_catalog_by_period(
391    ///     Some(86400000000000), // 1 day in nanoseconds
392    ///     None,
393    ///     None,
394    ///     Some(true)
395    /// )?;
396    ///
397    /// // Consolidate only files within a specific time range by 1-hour periods
398    /// catalog.consolidate_catalog_by_period(
399    ///     Some(3600000000000), // 1 hour in nanoseconds
400    ///     Some(UnixNanos::from(1609459200000000000)),
401    ///     Some(UnixNanos::from(1609545600000000000)),
402    ///     Some(false)
403    /// )?;
404    /// # Ok::<(), anyhow::Error>(())
405    /// ```
406    pub fn consolidate_catalog_by_period(
407        &mut self,
408        period_nanos: Option<u64>,
409        start: Option<UnixNanos>,
410        end: Option<UnixNanos>,
411        ensure_contiguous_files: Option<bool>,
412    ) -> Result<()> {
413        let leaf_directories = self.find_leaf_data_directories()?;
414
415        for directory in leaf_directories {
416            let (data_cls, identifier) =
417                self.extract_data_cls_and_identifier_from_path(&directory)?;
418
419            if let Some(data_cls_name) = data_cls {
420                // Use match statement to call the generic consolidate_data_by_period for various types
421                match data_cls_name.as_str() {
422                    "quotes" => {
423                        use nautilus_model::data::QuoteTick;
424                        self.consolidate_data_by_period_generic::<QuoteTick>(
425                            identifier,
426                            period_nanos,
427                            start,
428                            end,
429                            ensure_contiguous_files,
430                        )?;
431                    }
432                    "trades" => {
433                        use nautilus_model::data::TradeTick;
434                        self.consolidate_data_by_period_generic::<TradeTick>(
435                            identifier,
436                            period_nanos,
437                            start,
438                            end,
439                            ensure_contiguous_files,
440                        )?;
441                    }
442                    "order_book_deltas" => {
443                        use nautilus_model::data::OrderBookDelta;
444                        self.consolidate_data_by_period_generic::<OrderBookDelta>(
445                            identifier,
446                            period_nanos,
447                            start,
448                            end,
449                            ensure_contiguous_files,
450                        )?;
451                    }
452                    "order_book_depths" => {
453                        use nautilus_model::data::OrderBookDepth10;
454                        self.consolidate_data_by_period_generic::<OrderBookDepth10>(
455                            identifier,
456                            period_nanos,
457                            start,
458                            end,
459                            ensure_contiguous_files,
460                        )?;
461                    }
462                    "bars" => {
463                        use nautilus_model::data::Bar;
464                        self.consolidate_data_by_period_generic::<Bar>(
465                            identifier,
466                            period_nanos,
467                            start,
468                            end,
469                            ensure_contiguous_files,
470                        )?;
471                    }
472                    "index_prices" => {
473                        use nautilus_model::data::IndexPriceUpdate;
474                        self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
475                            identifier,
476                            period_nanos,
477                            start,
478                            end,
479                            ensure_contiguous_files,
480                        )?;
481                    }
482                    "mark_prices" => {
483                        use nautilus_model::data::MarkPriceUpdate;
484                        self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
485                            identifier,
486                            period_nanos,
487                            start,
488                            end,
489                            ensure_contiguous_files,
490                        )?;
491                    }
492                    "instrument_closes" => {
493                        use nautilus_model::data::close::InstrumentClose;
494                        self.consolidate_data_by_period_generic::<InstrumentClose>(
495                            identifier,
496                            period_nanos,
497                            start,
498                            end,
499                            ensure_contiguous_files,
500                        )?;
501                    }
502                    _ => {
503                        // Skip unknown data types
504                        log::warn!("Unknown data type for consolidation: {data_cls_name}");
505                        continue;
506                    }
507                }
508            }
509        }
510
511        Ok(())
512    }
513
514    /// Extracts data class and identifier from a directory path.
515    ///
516    /// This method parses a directory path to extract the data type and optional
517    /// instrument identifier. It's used to determine what type of data consolidation
518    /// to perform for each directory.
519    ///
520    /// # Parameters
521    ///
522    /// - `path`: The directory path to parse.
523    ///
524    /// # Returns
525    ///
526    /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
527    pub fn extract_data_cls_and_identifier_from_path(
528        &self,
529        path: &str,
530    ) -> Result<(Option<String>, Option<String>)> {
531        // Use cross-platform path parsing
532        let path_components = extract_path_components(path);
533
534        // Find the "data" directory in the path
535        if let Some(data_index) = path_components.iter().position(|part| part == "data")
536            && data_index + 1 < path_components.len()
537        {
538            let data_cls = path_components[data_index + 1].clone();
539
540            // Check if there's an identifier (instrument ID) after the data class
541            let identifier = if data_index + 2 < path_components.len() {
542                Some(path_components[data_index + 2].clone())
543            } else {
544                None
545            };
546
547            return Ok((Some(data_cls), identifier));
548        }
549
550        // If we can't parse the path, return None for both
551        Ok((None, None))
552    }
553
554    /// Consolidates data files by splitting them into fixed time periods.
555    ///
556    /// This method queries data by period and writes consolidated files immediately,
557    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
558    /// the function automatically splits those files to preserve all data.
559    ///
560    /// # Parameters
561    ///
562    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
563    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
564    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
565    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
566    /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
567    ///   If specified and intersects existing files, those files will be split to preserve
568    ///   data outside the consolidation range.
569    /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
570    ///   If specified and intersects existing files, those files will be split to preserve
571    ///   data outside the consolidation range.
572    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
573    ///   If false, uses actual data timestamps for file naming.
574    ///
575    /// # Returns
576    ///
577    /// Returns `Ok(())` on success, or an error if consolidation fails.
578    ///
579    /// # Errors
580    ///
581    /// This function will return an error if:
582    /// - The directory path cannot be constructed.
583    /// - File operations fail.
584    /// - Data querying or writing fails.
585    ///
586    /// # Notes
587    ///
588    /// - Uses two-phase approach: first determines all queries, then executes them.
589    /// - Groups intervals into contiguous groups to preserve holes between groups.
590    /// - Allows consolidation across multiple files within each contiguous group.
591    /// - Skips queries if target files already exist for efficiency.
592    /// - Original files are removed immediately after querying each period.
593    /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
594    /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
595    /// - Uses modulo arithmetic for efficient period boundary calculation.
596    /// - Preserves holes in data by preventing queries from spanning across gaps.
597    /// - Automatically splits files at start/end boundaries to preserve all data.
598    /// - Split operations are executed before consolidation to ensure data preservation.
599    ///
600    /// # Examples
601    ///
602    /// ```rust,no_run
603    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
604    /// use nautilus_core::UnixNanos;
605    ///
606    /// let catalog = ParquetDataCatalog::new(/* ... */);
607    ///
608    /// // Consolidate all quote files by 1-day periods
609    /// catalog.consolidate_data_by_period(
610    ///     "quotes",
611    ///     None,
612    ///     Some(86400000000000), // 1 day in nanoseconds
613    ///     None,
614    ///     None,
615    ///     Some(true)
616    /// )?;
617    ///
618    /// // Consolidate specific instrument by 1-hour periods
619    /// catalog.consolidate_data_by_period(
620    ///     "trades",
621    ///     Some("BTCUSD".to_string()),
622    ///     Some(3600000000000), // 1 hour in nanoseconds
623    ///     Some(UnixNanos::from(1609459200000000000)),
624    ///     Some(UnixNanos::from(1609545600000000000)),
625    ///     Some(false)
626    /// )?;
627    /// # Ok::<(), anyhow::Error>(())
628    /// ```
629    pub fn consolidate_data_by_period(
630        &mut self,
631        type_name: &str,
632        identifier: Option<String>,
633        period_nanos: Option<u64>,
634        start: Option<UnixNanos>,
635        end: Option<UnixNanos>,
636        ensure_contiguous_files: Option<bool>,
637    ) -> Result<()> {
638        // Use match statement to call the generic consolidate_data_by_period for various types
639        match type_name {
640            "quotes" => {
641                use nautilus_model::data::QuoteTick;
642                self.consolidate_data_by_period_generic::<QuoteTick>(
643                    identifier,
644                    period_nanos,
645                    start,
646                    end,
647                    ensure_contiguous_files,
648                )?;
649            }
650            "trades" => {
651                use nautilus_model::data::TradeTick;
652                self.consolidate_data_by_period_generic::<TradeTick>(
653                    identifier,
654                    period_nanos,
655                    start,
656                    end,
657                    ensure_contiguous_files,
658                )?;
659            }
660            "order_book_deltas" => {
661                use nautilus_model::data::OrderBookDelta;
662                self.consolidate_data_by_period_generic::<OrderBookDelta>(
663                    identifier,
664                    period_nanos,
665                    start,
666                    end,
667                    ensure_contiguous_files,
668                )?;
669            }
670            "order_book_depths" => {
671                use nautilus_model::data::OrderBookDepth10;
672                self.consolidate_data_by_period_generic::<OrderBookDepth10>(
673                    identifier,
674                    period_nanos,
675                    start,
676                    end,
677                    ensure_contiguous_files,
678                )?;
679            }
680            "bars" => {
681                use nautilus_model::data::Bar;
682                self.consolidate_data_by_period_generic::<Bar>(
683                    identifier,
684                    period_nanos,
685                    start,
686                    end,
687                    ensure_contiguous_files,
688                )?;
689            }
690            "index_prices" => {
691                use nautilus_model::data::IndexPriceUpdate;
692                self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
693                    identifier,
694                    period_nanos,
695                    start,
696                    end,
697                    ensure_contiguous_files,
698                )?;
699            }
700            "mark_prices" => {
701                use nautilus_model::data::MarkPriceUpdate;
702                self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
703                    identifier,
704                    period_nanos,
705                    start,
706                    end,
707                    ensure_contiguous_files,
708                )?;
709            }
710            "instrument_closes" => {
711                use nautilus_model::data::close::InstrumentClose;
712                self.consolidate_data_by_period_generic::<InstrumentClose>(
713                    identifier,
714                    period_nanos,
715                    start,
716                    end,
717                    ensure_contiguous_files,
718                )?;
719            }
720            _ => {
721                anyhow::bail!("Unknown data type for consolidation: {}", type_name);
722            }
723        }
724
725        Ok(())
726    }
727
728    /// Generic consolidate data files by splitting them into fixed time periods.
729    ///
730    /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
731    /// to ensure compile-time correctness and enable reuse across different data types.
732    ///
733    /// # Type Parameters
734    ///
735    /// - `T`: The data type to consolidate, must implement required traits for serialization.
736    ///
737    /// # Parameters
738    ///
739    /// - `identifier`: Optional instrument ID to target a specific instrument's data.
740    /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
741    /// - `start`: Optional start timestamp for consolidation range.
742    /// - `end`: Optional end timestamp for consolidation range.
743    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
744    ///
745    /// # Returns
746    ///
747    /// Returns `Ok(())` on success, or an error if consolidation fails.
748    pub fn consolidate_data_by_period_generic<T>(
749        &mut self,
750        identifier: Option<String>,
751        period_nanos: Option<u64>,
752        start: Option<UnixNanos>,
753        end: Option<UnixNanos>,
754        ensure_contiguous_files: Option<bool>,
755    ) -> Result<()>
756    where
757        T: DecodeDataFromRecordBatch
758            + CatalogPathPrefix
759            + EncodeToRecordBatch
760            + HasTsInit
761            + TryFrom<Data>
762            + Clone,
763    {
764        let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
765        let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
766
767        // Use get_intervals for cleaner implementation
768        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
769
770        if intervals.is_empty() {
771            return Ok(()); // No files to consolidate
772        }
773
774        // Use auxiliary function to prepare all queries for execution
775        let queries_to_execute = self.prepare_consolidation_queries(
776            T::path_prefix(),
777            identifier.clone(),
778            &intervals,
779            period_nanos,
780            start,
781            end,
782            ensure_contiguous_files,
783        )?;
784
785        if queries_to_execute.is_empty() {
786            return Ok(()); // No queries to execute
787        }
788
789        // Get directory for file operations
790        let directory = self.make_path(T::path_prefix(), identifier.clone())?;
791        let mut existing_files = self.list_parquet_files(&directory)?;
792        existing_files.sort();
793
794        // Track files to remove and maintain existing_files list
795        let mut files_to_remove = HashSet::new();
796        let original_files_count = existing_files.len();
797
798        // Phase 2: Execute queries, write, and delete
799        let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
800
801        for query_info in queries_to_execute {
802            // Query data for this period using query_typed_data
803            let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
804
805            let period_data = self.query_typed_data::<T>(
806                instrument_ids,
807                Some(UnixNanos::from(query_info.query_start)),
808                Some(UnixNanos::from(query_info.query_end)),
809                None,
810                Some(existing_files.clone()),
811            )?;
812
813            if period_data.is_empty() {
814                // Skip if no data found, but maintain contiguity by using query start
815                if file_start_ns.is_none() {
816                    file_start_ns = Some(query_info.query_start);
817                }
818                continue;
819            }
820            file_start_ns = None;
821
822            // Determine final file timestamps
823            let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
824                // Use period boundaries for file naming, maintaining contiguity
825                if file_start_ns.is_none() {
826                    file_start_ns = Some(query_info.query_start);
827                }
828                (file_start_ns.unwrap(), query_info.query_end)
829            } else {
830                // Use actual data timestamps for file naming
831                let first_ts = period_data.first().unwrap().ts_init().as_u64();
832                let last_ts = period_data.last().unwrap().ts_init().as_u64();
833                (first_ts, last_ts)
834            };
835
836            // Check again if target file exists (in case it was created during this process)
837            let target_filename = format!(
838                "{}/{}",
839                directory,
840                timestamps_to_filename(
841                    UnixNanos::from(final_start_ns),
842                    UnixNanos::from(final_end_ns)
843                )
844            );
845
846            if self.file_exists(&target_filename)? {
847                // Skip if target file already exists
848                continue;
849            }
850
851            // Write consolidated data for this period using write_to_parquet
852            // Use skip_disjoint_check since we're managing file removal carefully
853            let start_ts = UnixNanos::from(final_start_ns);
854            let end_ts = UnixNanos::from(final_end_ns);
855            self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
856
857            // Identify files that are completely covered by this period
858            // Only remove files AFTER successfully writing a new file
859            // Use slice copy to avoid modification during iteration (match Python logic)
860            for file in existing_files.clone() {
861                if let Some(interval) = parse_filename_timestamps(&file)
862                    && interval.1 <= query_info.query_end
863                {
864                    files_to_remove.insert(file.clone());
865                    existing_files.retain(|f| f != &file);
866                }
867            }
868
869            // Remove files as soon as we have some to remove
870            if !files_to_remove.is_empty() {
871                for file in files_to_remove.drain() {
872                    self.delete_file(&file)?;
873                }
874            }
875        }
876
877        // Remove any remaining files that weren't removed in the loop
878        // This matches the Python implementation's final cleanup step
879        // Only remove files if any consolidation actually happened (i.e., files were processed)
880        let files_were_processed = existing_files.len() < original_files_count;
881        if files_were_processed {
882            for file in existing_files {
883                self.delete_file(&file)?;
884            }
885        }
886
887        Ok(())
888    }
889
890    /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
891    ///
892    /// This auxiliary function handles all the preparation logic for consolidation:
893    /// 1. Filters intervals by time range.
894    /// 2. Groups intervals into contiguous groups.
895    /// 3. Identifies and creates split operations for data preservation.
896    /// 4. Generates period-based consolidation queries.
897    /// 5. Checks for existing target files.
898    #[allow(clippy::too_many_arguments)]
899    pub fn prepare_consolidation_queries(
900        &self,
901        type_name: &str,
902        identifier: Option<String>,
903        intervals: &[(u64, u64)],
904        period_nanos: u64,
905        start: Option<UnixNanos>,
906        end: Option<UnixNanos>,
907        ensure_contiguous_files: bool,
908    ) -> Result<Vec<ConsolidationQuery>> {
909        // Filter intervals by time range if specified
910        let used_start = start.map(|s| s.as_u64());
911        let used_end = end.map(|e| e.as_u64());
912
913        let mut filtered_intervals = Vec::new();
914        for &(interval_start, interval_end) in intervals {
915            // Check if interval overlaps with the specified range
916            if (used_start.is_none() || used_start.unwrap() <= interval_end)
917                && (used_end.is_none() || interval_start <= used_end.unwrap())
918            {
919                filtered_intervals.push((interval_start, interval_end));
920            }
921        }
922
923        if filtered_intervals.is_empty() {
924            return Ok(Vec::new()); // No intervals in the specified range
925        }
926
927        // Check contiguity of filtered intervals if required
928        if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
929            anyhow::bail!(
930                "Intervals are not contiguous. When ensure_contiguous_files=true, \
931                 all files in the consolidation range must have contiguous timestamps."
932            );
933        }
934
935        // Group intervals into contiguous groups to preserve holes between groups
936        // but allow consolidation within each contiguous group
937        let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
938
939        let mut queries_to_execute = Vec::new();
940
941        // Handle interval splitting by creating split operations for data preservation
942        if !filtered_intervals.is_empty() {
943            if let Some(start_ts) = used_start {
944                let first_interval = filtered_intervals[0];
945                if first_interval.0 < start_ts && start_ts <= first_interval.1 {
946                    // Split before start: preserve data from interval_start to start-1
947                    queries_to_execute.push(ConsolidationQuery {
948                        query_start: first_interval.0,
949                        query_end: start_ts - 1,
950                        use_period_boundaries: false,
951                    });
952                }
953            }
954
955            if let Some(end_ts) = used_end {
956                let last_interval = filtered_intervals[filtered_intervals.len() - 1];
957                if last_interval.0 <= end_ts && end_ts < last_interval.1 {
958                    // Split after end: preserve data from end+1 to interval_end
959                    queries_to_execute.push(ConsolidationQuery {
960                        query_start: end_ts + 1,
961                        query_end: last_interval.1,
962                        use_period_boundaries: false,
963                    });
964                }
965            }
966        }
967
968        // Generate period-based consolidation queries for each contiguous group
969        for group in contiguous_groups {
970            let group_start = group[0].0;
971            let group_end = group[group.len() - 1].1;
972
973            // Apply start/end filtering to the group
974            let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
975            let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
976
977            if effective_start > effective_end {
978                continue; // Skip if no overlap
979            }
980
981            // Generate period-based queries within this contiguous group
982            let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
983
984            // Add safety check to prevent infinite loops (match Python logic)
985            let max_iterations = 10000;
986            let mut iteration_count = 0;
987
988            while current_start_ns <= effective_end {
989                iteration_count += 1;
990                if iteration_count > max_iterations {
991                    // Safety break to prevent infinite loops
992                    break;
993                }
994                let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
995
996                // Check if target file already exists (only when ensure_contiguous_files is true)
997                if ensure_contiguous_files {
998                    let directory = self.make_path(type_name, identifier.clone())?;
999                    let target_filename = format!(
1000                        "{}/{}",
1001                        directory,
1002                        timestamps_to_filename(
1003                            UnixNanos::from(current_start_ns),
1004                            UnixNanos::from(current_end_ns)
1005                        )
1006                    );
1007
1008                    if self.file_exists(&target_filename)? {
1009                        // Skip if target file already exists
1010                        current_start_ns += period_nanos;
1011                        continue;
1012                    }
1013                }
1014
1015                // Add query to execution list
1016                queries_to_execute.push(ConsolidationQuery {
1017                    query_start: current_start_ns,
1018                    query_end: current_end_ns,
1019                    use_period_boundaries: ensure_contiguous_files,
1020                });
1021
1022                // Move to next period
1023                current_start_ns += period_nanos;
1024
1025                if current_start_ns > effective_end {
1026                    break;
1027                }
1028            }
1029        }
1030
1031        // Sort queries by start date to enable efficient file removal
1032        // Files can be removed when interval[1] <= query_info["query_end"]
1033        // and processing in chronological order ensures optimal cleanup
1034        queries_to_execute.sort_by_key(|q| q.query_start);
1035
1036        Ok(queries_to_execute)
1037    }
1038
1039    /// Groups intervals into contiguous groups for efficient consolidation.
1040    ///
1041    /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1042    /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1043    /// before the start of the next interval. This grouping preserves data gaps while allowing
1044    /// consolidation within each contiguous group.
1045    ///
1046    /// # Parameters
1047    ///
1048    /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1049    ///
1050    /// # Returns
1051    ///
1052    /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1053    /// Returns an empty vector if the input is empty.
1054    ///
1055    /// # Algorithm
1056    ///
1057    /// 1. Starts with the first interval in a new group.
1058    /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1059    /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1060    /// 4. If not contiguous, starts a new group.
1061    /// 5. Returns all groups.
1062    ///
1063    /// # Examples
1064    ///
1065    /// ```text
1066    /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1067    /// Returns: [[(1,5), (6,10), (11,15)]]
1068    ///
1069    /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1070    /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1071    /// ```
1072    ///
1073    /// # Notes
1074    ///
1075    /// - Input intervals should be sorted by start timestamp.
1076    /// - Gaps between groups are preserved and not consolidated.
1077    /// - Used internally by period-based consolidation methods.
1078    #[must_use]
1079    pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1080        if intervals.is_empty() {
1081            return Vec::new();
1082        }
1083
1084        let mut contiguous_groups = Vec::new();
1085        let mut current_group = vec![intervals[0]];
1086
1087        for i in 1..intervals.len() {
1088            let prev_interval = intervals[i - 1];
1089            let curr_interval = intervals[i];
1090
1091            // Check if current interval is contiguous with previous (end + 1 == start)
1092            if prev_interval.1 + 1 == curr_interval.0 {
1093                current_group.push(curr_interval);
1094            } else {
1095                // Gap found, start new group
1096                contiguous_groups.push(current_group);
1097                current_group = vec![curr_interval];
1098            }
1099        }
1100
1101        // Add the last group
1102        contiguous_groups.push(current_group);
1103
1104        contiguous_groups
1105    }
1106
1107    /// Checks if a file exists in the object store.
1108    ///
1109    /// This method performs a HEAD operation on the object store to determine if a file
1110    /// exists without downloading its content. It works with both local and remote object stores.
1111    ///
1112    /// # Parameters
1113    ///
1114    /// - `path`: The file path to check, relative to the catalog structure.
1115    ///
1116    /// # Returns
1117    ///
1118    /// Returns `true` if the file exists, `false` if it doesn't exist.
1119    ///
1120    /// # Errors
1121    ///
1122    /// Returns an error if the object store operation fails due to network issues,
1123    /// authentication problems, or other I/O errors.
1124    fn file_exists(&self, path: &str) -> Result<bool> {
1125        let object_path = self.to_object_path(path);
1126        let exists =
1127            self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1128        Ok(exists)
1129    }
1130
1131    /// Deletes a file from the object store.
1132    ///
1133    /// This method removes a file from the object store. The operation is permanent
1134    /// and cannot be undone. It works with both local filesystems and remote object stores.
1135    ///
1136    /// # Parameters
1137    ///
1138    /// - `path`: The file path to delete, relative to the catalog structure.
1139    ///
1140    /// # Returns
1141    ///
1142    /// Returns `Ok(())` on successful deletion.
1143    ///
1144    /// # Errors
1145    ///
1146    /// Returns an error if:
1147    /// - The file doesn't exist.
1148    /// - Permission is denied.
1149    /// - Network issues occur (for remote stores).
1150    /// - The object store operation fails.
1151    ///
1152    /// # Safety
1153    ///
1154    /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1155    fn delete_file(&self, path: &str) -> Result<()> {
1156        let object_path = self.to_object_path(path);
1157        self.execute_async(async {
1158            self.object_store
1159                .delete(&object_path)
1160                .await
1161                .map_err(anyhow::Error::from)
1162        })?;
1163        Ok(())
1164    }
1165
1166    /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1167    ///
1168    /// This method scans all leaf data directories in the catalog and renames files based on
1169    /// the actual timestamp range of their content. This is useful when files have been
1170    /// modified or when filename conventions have changed.
1171    ///
1172    /// # Returns
1173    ///
1174    /// Returns `Ok(())` on success, or an error if the operation fails.
1175    ///
1176    /// # Errors
1177    ///
1178    /// This function will return an error if:
1179    /// - Directory listing fails.
1180    /// - File metadata reading fails.
1181    /// - File rename operations fail.
1182    /// - Interval validation fails after renaming.
1183    ///
1184    /// # Examples
1185    ///
1186    /// ```rust,no_run
1187    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1188    ///
1189    /// let catalog = ParquetDataCatalog::new(/* ... */);
1190    ///
1191    /// // Reset all filenames in the catalog
1192    /// catalog.reset_all_file_names()?;
1193    /// # Ok::<(), anyhow::Error>(())
1194    /// ```
1195    pub fn reset_all_file_names(&self) -> Result<()> {
1196        let leaf_directories = self.find_leaf_data_directories()?;
1197
1198        for directory in leaf_directories {
1199            self.reset_file_names(&directory)?;
1200        }
1201
1202        Ok(())
1203    }
1204
1205    /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1206    ///
1207    /// This method renames files in a specific directory based on the actual timestamp
1208    /// range of their content. This is useful for correcting filenames after data
1209    /// modifications or when filename conventions have changed.
1210    ///
1211    /// # Parameters
1212    ///
1213    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1214    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1215    ///
1216    /// # Returns
1217    ///
1218    /// Returns `Ok(())` on success, or an error if the operation fails.
1219    ///
1220    /// # Errors
1221    ///
1222    /// This function will return an error if:
1223    /// - The directory path cannot be constructed.
1224    /// - File metadata reading fails.
1225    /// - File rename operations fail.
1226    /// - Interval validation fails after renaming.
1227    ///
1228    /// # Examples
1229    ///
1230    /// ```rust,no_run
1231    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1232    ///
1233    /// let catalog = ParquetDataCatalog::new(/* ... */);
1234    ///
1235    /// // Reset filenames for all quote files
1236    /// catalog.reset_data_file_names("quotes", None)?;
1237    ///
1238    /// // Reset filenames for a specific instrument's trade files
1239    /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1240    /// # Ok::<(), anyhow::Error>(())
1241    /// ```
1242    pub fn reset_data_file_names(
1243        &self,
1244        data_cls: &str,
1245        instrument_id: Option<String>,
1246    ) -> Result<()> {
1247        let directory = self.make_path(data_cls, instrument_id)?;
1248        self.reset_file_names(&directory)
1249    }
1250
1251    /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1252    ///
1253    /// This internal method scans all Parquet files in a directory, reads their metadata to
1254    /// determine the actual timestamp range of their content, and renames the files accordingly.
1255    /// This ensures that filenames accurately reflect the data they contain.
1256    ///
1257    /// # Parameters
1258    ///
1259    /// - `directory`: The directory path containing Parquet files to rename.
1260    ///
1261    /// # Returns
1262    ///
1263    /// Returns `Ok(())` on success, or an error if the operation fails.
1264    ///
1265    /// # Process
1266    ///
1267    /// 1. Lists all Parquet files in the directory
1268    /// 2. For each file, reads metadata to extract min/max timestamps
1269    /// 3. Generates a new filename based on actual timestamp range
1270    /// 4. Moves the file to the new name using object store operations
1271    /// 5. Validates that intervals remain disjoint after renaming
1272    ///
1273    /// # Errors
1274    ///
1275    /// This function will return an error if:
1276    /// - Directory listing fails.
1277    /// - Metadata reading fails for any file.
1278    /// - File move operations fail.
1279    /// - Interval validation fails after renaming.
1280    /// - Object store operations fail.
1281    ///
1282    /// # Notes
1283    ///
1284    /// - This operation can be time-consuming for directories with many files.
1285    /// - Files are processed sequentially to avoid conflicts.
1286    /// - The operation is atomic per file but not across the entire directory.
1287    fn reset_file_names(&self, directory: &str) -> Result<()> {
1288        let parquet_files = self.list_parquet_files(directory)?;
1289
1290        for file in parquet_files {
1291            let object_path = ObjectPath::from(file.as_str());
1292            let (first_ts, last_ts) = self.execute_async(async {
1293                min_max_from_parquet_metadata_object_store(
1294                    self.object_store.clone(),
1295                    &object_path,
1296                    "ts_init",
1297                )
1298                .await
1299            })?;
1300
1301            let new_filename =
1302                timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1303            let new_file_path = make_object_store_path(directory, &[&new_filename]);
1304            let new_object_path = ObjectPath::from(new_file_path);
1305
1306            self.move_file(&object_path, &new_object_path)?;
1307        }
1308
1309        let intervals = self.get_directory_intervals(directory)?;
1310
1311        if !are_intervals_disjoint(&intervals) {
1312            anyhow::bail!("Intervals are not disjoint after resetting file names");
1313        }
1314
1315        Ok(())
1316    }
1317
1318    /// Finds all leaf data directories in the catalog.
1319    ///
1320    /// A leaf directory is one that contains data files but no subdirectories.
1321    /// This method is used to identify directories that can be processed for
1322    /// consolidation or other operations.
1323    ///
1324    /// # Returns
1325    ///
1326    /// Returns a vector of directory path strings representing leaf directories,
1327    /// or an error if directory traversal fails.
1328    ///
1329    /// # Errors
1330    ///
1331    /// This function will return an error if:
1332    /// - Object store listing operations fail.
1333    /// - Directory structure cannot be analyzed.
1334    ///
1335    /// # Examples
1336    ///
1337    /// ```rust,no_run
1338    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1339    ///
1340    /// let catalog = ParquetDataCatalog::new(/* ... */);
1341    ///
1342    /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1343    /// for dir in leaf_dirs {
1344    ///     println!("Found leaf directory: {}", dir);
1345    /// }
1346    /// # Ok::<(), anyhow::Error>(())
1347    /// ```
1348    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1349        let data_dir = make_object_store_path(&self.base_path, &["data"]);
1350
1351        let leaf_dirs = self.execute_async(async {
1352            let mut all_paths = std::collections::HashSet::new();
1353            let mut directories = std::collections::HashSet::new();
1354            let mut files_in_dirs = std::collections::HashMap::new();
1355
1356            // List all objects under the data directory
1357            let prefix = ObjectPath::from(format!("{data_dir}/"));
1358            let mut stream = self.object_store.list(Some(&prefix));
1359
1360            while let Some(object) = stream.next().await {
1361                let object = object?;
1362                let path_str = object.location.to_string();
1363                all_paths.insert(path_str.clone());
1364
1365                // Extract directory path
1366                if let Some(parent) = std::path::Path::new(&path_str).parent() {
1367                    let parent_str = parent.to_string_lossy().to_string();
1368                    directories.insert(parent_str.clone());
1369
1370                    // Track files in each directory
1371                    files_in_dirs
1372                        .entry(parent_str)
1373                        .or_insert_with(Vec::new)
1374                        .push(path_str);
1375                }
1376            }
1377
1378            // Find leaf directories (directories with files but no subdirectories)
1379            let mut leaf_dirs = Vec::new();
1380            for dir in &directories {
1381                let has_files = files_in_dirs
1382                    .get(dir)
1383                    .is_some_and(|files| !files.is_empty());
1384                let has_subdirs = directories
1385                    .iter()
1386                    .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1387
1388                if has_files && !has_subdirs {
1389                    leaf_dirs.push(dir.clone());
1390                }
1391            }
1392
1393            Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1394        })?;
1395
1396        Ok(leaf_dirs)
1397    }
1398
1399    /// Deletes data within a specified time range for a specific data type and instrument.
1400    ///
1401    /// This method identifies all parquet files that intersect with the specified time range
1402    /// and handles them appropriately:
1403    /// - Files completely within the range are deleted
1404    /// - Files partially overlapping the range are split to preserve data outside the range
1405    /// - The original intersecting files are removed after processing
1406    ///
1407    /// # Parameters
1408    ///
1409    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1410    /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1411    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1412    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1413    ///
1414    /// # Returns
1415    ///
1416    /// Returns `Ok(())` on success, or an error if deletion fails.
1417    ///
1418    /// # Errors
1419    ///
1420    /// This function will return an error if:
1421    /// - The directory path cannot be constructed.
1422    /// - File operations fail.
1423    /// - Data querying or writing fails.
1424    ///
1425    /// # Notes
1426    ///
1427    /// - This operation permanently removes data and cannot be undone.
1428    /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1429    /// - The method ensures data integrity by using atomic operations where possible.
1430    /// - Empty directories are not automatically removed after deletion.
1431    ///
1432    /// # Examples
1433    ///
1434    /// ```rust,no_run
1435    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1436    /// use nautilus_core::UnixNanos;
1437    ///
1438    /// let catalog = ParquetDataCatalog::new(/* ... */);
1439    ///
1440    /// // Delete all quote data for a specific instrument
1441    /// catalog.delete_data_range(
1442    ///     "quotes",
1443    ///     Some("BTCUSD".to_string()),
1444    ///     None,
1445    ///     None
1446    /// )?;
1447    ///
1448    /// // Delete trade data within a specific time range
1449    /// catalog.delete_data_range(
1450    ///     "trades",
1451    ///     None,
1452    ///     Some(UnixNanos::from(1609459200000000000)),
1453    ///     Some(UnixNanos::from(1609545600000000000))
1454    /// )?;
1455    /// # Ok::<(), anyhow::Error>(())
1456    /// ```
1457    pub fn delete_data_range(
1458        &mut self,
1459        type_name: &str,
1460        identifier: Option<String>,
1461        start: Option<UnixNanos>,
1462        end: Option<UnixNanos>,
1463    ) -> Result<()> {
1464        // Use match statement to call the generic delete_data_range for various types
1465        match type_name {
1466            "quotes" => {
1467                use nautilus_model::data::QuoteTick;
1468                self.delete_data_range_generic::<QuoteTick>(identifier, start, end)
1469            }
1470            "trades" => {
1471                use nautilus_model::data::TradeTick;
1472                self.delete_data_range_generic::<TradeTick>(identifier, start, end)
1473            }
1474            "bars" => {
1475                use nautilus_model::data::Bar;
1476                self.delete_data_range_generic::<Bar>(identifier, start, end)
1477            }
1478            "order_book_deltas" => {
1479                use nautilus_model::data::OrderBookDelta;
1480                self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1481            }
1482            "order_book_depth10" => {
1483                use nautilus_model::data::OrderBookDepth10;
1484                self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1485            }
1486            _ => Err(anyhow::anyhow!("Unsupported data type: {}", type_name)),
1487        }
1488    }
1489
1490    /// Deletes data within a specified time range across the entire catalog.
1491    ///
1492    /// This method identifies all leaf directories in the catalog that contain parquet files
1493    /// and deletes data within the specified time range from each directory. A leaf directory
1494    /// is one that contains files but no subdirectories. This is a convenience method that
1495    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1496    ///
1497    /// # Parameters
1498    ///
1499    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1500    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1501    ///
1502    /// # Returns
1503    ///
1504    /// Returns `Ok(())` on success, or an error if deletion fails.
1505    ///
1506    /// # Errors
1507    ///
1508    /// This function will return an error if:
1509    /// - Directory traversal fails.
1510    /// - Data class extraction from paths fails.
1511    /// - Individual delete operations fail.
1512    ///
1513    /// # Notes
1514    ///
1515    /// - This operation permanently removes data and cannot be undone.
1516    /// - The deletion process handles file intersections intelligently by splitting files
1517    ///   when they partially overlap with the deletion range.
1518    /// - Files completely within the deletion range are removed entirely.
1519    /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1520    /// - This method is useful for bulk data cleanup operations across the entire catalog.
1521    /// - Empty directories are not automatically removed after deletion.
1522    ///
1523    /// # Examples
1524    ///
1525    /// ```rust,no_run
1526    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1527    /// use nautilus_core::UnixNanos;
1528    ///
1529    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1530    ///
1531    /// // Delete all data before a specific date across entire catalog
1532    /// catalog.delete_catalog_range(
1533    ///     None,
1534    ///     Some(UnixNanos::from(1609459200000000000))
1535    /// )?;
1536    ///
1537    /// // Delete all data within a specific range across entire catalog
1538    /// catalog.delete_catalog_range(
1539    ///     Some(UnixNanos::from(1609459200000000000)),
1540    ///     Some(UnixNanos::from(1609545600000000000))
1541    /// )?;
1542    ///
1543    /// // Delete all data after a specific date across entire catalog
1544    /// catalog.delete_catalog_range(
1545    ///     Some(UnixNanos::from(1609459200000000000)),
1546    ///     None
1547    /// )?;
1548    /// # Ok::<(), anyhow::Error>(())
1549    /// ```
1550    pub fn delete_catalog_range(
1551        &mut self,
1552        start: Option<UnixNanos>,
1553        end: Option<UnixNanos>,
1554    ) -> Result<()> {
1555        let leaf_directories = self.find_leaf_data_directories()?;
1556
1557        for directory in leaf_directories {
1558            if let Ok((Some(data_type), identifier)) =
1559                self.extract_data_cls_and_identifier_from_path(&directory)
1560            {
1561                // Call the existing delete_data_range method
1562                if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1563                    eprintln!("Failed to delete data in directory {directory}: {e}");
1564                    // Continue with other directories instead of failing completely
1565                }
1566            }
1567        }
1568
1569        Ok(())
1570    }
1571
1572    /// Generic implementation for deleting data within a specified time range.
1573    ///
1574    /// This method provides the core deletion logic that works with any data type
1575    /// that implements the required traits. It handles file intersection analysis,
1576    /// data splitting for partial overlaps, and file cleanup.
1577    ///
1578    /// # Type Parameters
1579    ///
1580    /// - `T`: The data type that implements required traits for catalog operations.
1581    ///
1582    /// # Parameters
1583    ///
1584    /// - `identifier`: Optional instrument ID to delete data for.
1585    /// - `start`: Optional start timestamp for the deletion range.
1586    /// - `end`: Optional end timestamp for the deletion range.
1587    ///
1588    /// # Returns
1589    ///
1590    /// Returns `Ok(())` on success, or an error if deletion fails.
1591    pub fn delete_data_range_generic<T>(
1592        &mut self,
1593        identifier: Option<String>,
1594        start: Option<UnixNanos>,
1595        end: Option<UnixNanos>,
1596    ) -> Result<()>
1597    where
1598        T: DecodeDataFromRecordBatch
1599            + CatalogPathPrefix
1600            + EncodeToRecordBatch
1601            + HasTsInit
1602            + TryFrom<Data>
1603            + Clone,
1604    {
1605        // Get intervals for cleaner implementation
1606        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1607
1608        if intervals.is_empty() {
1609            return Ok(()); // No files to process
1610        }
1611
1612        // Prepare all operations for execution
1613        let operations_to_execute = self.prepare_delete_operations(
1614            T::path_prefix(),
1615            identifier.clone(),
1616            &intervals,
1617            start,
1618            end,
1619        )?;
1620
1621        if operations_to_execute.is_empty() {
1622            return Ok(()); // No operations to execute
1623        }
1624
1625        // Execute all operations
1626        let mut files_to_remove = std::collections::HashSet::new();
1627
1628        for operation in operations_to_execute {
1629            // Reset the session before each operation to ensure fresh data is loaded
1630            // This clears any cached table registrations that might interfere with file operations
1631            self.reset_session();
1632            match operation.operation_type.as_str() {
1633                "split_before" => {
1634                    // Query data before the deletion range and write it
1635                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1636                    let before_data = self.query_typed_data::<T>(
1637                        instrument_ids,
1638                        Some(UnixNanos::from(operation.query_start)),
1639                        Some(UnixNanos::from(operation.query_end)),
1640                        None,
1641                        Some(operation.files.clone()),
1642                    )?;
1643
1644                    if !before_data.is_empty() {
1645                        let start_ts = UnixNanos::from(operation.file_start_ns);
1646                        let end_ts = UnixNanos::from(operation.file_end_ns);
1647                        self.write_to_parquet(
1648                            before_data,
1649                            Some(start_ts),
1650                            Some(end_ts),
1651                            Some(true),
1652                        )?;
1653                    }
1654                }
1655                "split_after" => {
1656                    // Query data after the deletion range and write it
1657                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1658                    let after_data = self.query_typed_data::<T>(
1659                        instrument_ids,
1660                        Some(UnixNanos::from(operation.query_start)),
1661                        Some(UnixNanos::from(operation.query_end)),
1662                        None,
1663                        Some(operation.files.clone()),
1664                    )?;
1665
1666                    if !after_data.is_empty() {
1667                        let start_ts = UnixNanos::from(operation.file_start_ns);
1668                        let end_ts = UnixNanos::from(operation.file_end_ns);
1669                        self.write_to_parquet(
1670                            after_data,
1671                            Some(start_ts),
1672                            Some(end_ts),
1673                            Some(true),
1674                        )?;
1675                    }
1676                }
1677                _ => {
1678                    // For "remove" operations, just mark files for removal
1679                }
1680            }
1681
1682            // Mark files for removal (applies to all operation types)
1683            for file in operation.files {
1684                files_to_remove.insert(file);
1685            }
1686        }
1687
1688        // Remove all files that were processed
1689        for file in files_to_remove {
1690            if let Err(e) = self.delete_file(&file) {
1691                eprintln!("Failed to delete file {file}: {e}");
1692            }
1693        }
1694
1695        Ok(())
1696    }
1697
1698    /// Prepares all operations for data deletion by identifying files that need to be
1699    /// split or removed.
1700    ///
1701    /// This auxiliary function handles all the preparation logic for deletion:
1702    /// 1. Filters intervals by time range
1703    /// 2. Identifies files that intersect with the deletion range
1704    /// 3. Creates split operations for files that partially overlap
1705    /// 4. Generates removal operations for files completely within the range
1706    ///
1707    /// # Parameters
1708    ///
1709    /// - `type_name`: The data type directory name for path generation.
1710    /// - `identifier`: Optional instrument identifier for path generation.
1711    /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1712    /// - `start`: Optional start timestamp for deletion range.
1713    /// - `end`: Optional end timestamp for deletion range.
1714    ///
1715    /// # Returns
1716    ///
1717    /// Returns a vector of `DeleteOperation` structs ready for execution.
1718    pub fn prepare_delete_operations(
1719        &self,
1720        type_name: &str,
1721        identifier: Option<String>,
1722        intervals: &[(u64, u64)],
1723        start: Option<UnixNanos>,
1724        end: Option<UnixNanos>,
1725    ) -> Result<Vec<DeleteOperation>> {
1726        // Convert start/end to nanoseconds
1727        let delete_start_ns = start.map(|s| s.as_u64());
1728        let delete_end_ns = end.map(|e| e.as_u64());
1729
1730        let mut operations = Vec::new();
1731
1732        // Get directory for file path construction
1733        let directory = self.make_path(type_name, identifier)?;
1734
1735        // Process each interval (which represents an actual file)
1736        for &(file_start_ns, file_end_ns) in intervals {
1737            // Check if file intersects with deletion range
1738            let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1739                && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1740
1741            if !intersects {
1742                continue; // File doesn't intersect with deletion range
1743            }
1744
1745            // Construct file path from interval timestamps
1746            let filename = timestamps_to_filename(
1747                UnixNanos::from(file_start_ns),
1748                UnixNanos::from(file_end_ns),
1749            );
1750            let file_path = make_object_store_path(&directory, &[&filename]);
1751
1752            // Determine what type of operation is needed
1753            let file_completely_within_range = (delete_start_ns.is_none()
1754                || delete_start_ns.unwrap() <= file_start_ns)
1755                && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1756
1757            if file_completely_within_range {
1758                // File is completely within deletion range - just mark for removal
1759                operations.push(DeleteOperation {
1760                    operation_type: "remove".to_string(),
1761                    files: vec![file_path],
1762                    query_start: 0,
1763                    query_end: 0,
1764                    file_start_ns: 0,
1765                    file_end_ns: 0,
1766                });
1767            } else {
1768                // File partially overlaps - need to split
1769                if let Some(delete_start) = delete_start_ns
1770                    && file_start_ns < delete_start
1771                {
1772                    // Keep data before deletion range
1773                    operations.push(DeleteOperation {
1774                        operation_type: "split_before".to_string(),
1775                        files: vec![file_path.clone()],
1776                        query_start: file_start_ns,
1777                        query_end: delete_start.saturating_sub(1), // Exclusive end
1778                        file_start_ns,
1779                        file_end_ns: delete_start.saturating_sub(1),
1780                    });
1781                }
1782
1783                if let Some(delete_end) = delete_end_ns
1784                    && delete_end < file_end_ns
1785                {
1786                    // Keep data after deletion range
1787                    operations.push(DeleteOperation {
1788                        operation_type: "split_after".to_string(),
1789                        files: vec![file_path.clone()],
1790                        query_start: delete_end.saturating_add(1), // Exclusive start
1791                        query_end: file_end_ns,
1792                        file_start_ns: delete_end.saturating_add(1),
1793                        file_end_ns,
1794                    });
1795                }
1796            }
1797        }
1798
1799        Ok(operations)
1800    }
1801}