nautilus_persistence/backend/
catalog_operations.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use std::collections::HashSet;
22
23use anyhow::Result;
24use futures::StreamExt;
25use nautilus_core::UnixNanos;
26use nautilus_model::data::{Data, HasTsInit};
27use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
28use object_store::path::Path as ObjectPath;
29
30use crate::{
31    backend::catalog::{
32        CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
33        parse_filename_timestamps, timestamps_to_filename,
34    },
35    parquet::{
36        combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
37    },
38};
39
40/// Information about a consolidation query to be executed.
41///
42/// This struct encapsulates all the information needed to execute a single consolidation
43/// operation, including the data range to query and file naming strategy.
44///
45/// # Fields
46///
47/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
48/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
49/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
50///
51/// # Usage
52///
53/// This struct is used internally by the consolidation system to plan and execute
54/// data consolidation operations. It allows the system to:
55/// - Separate query planning from execution.
56/// - Handle complex scenarios like data splitting.
57/// - Optimize file naming strategies.
58/// - Batch multiple operations efficiently.
59/// - Maintain file contiguity across periods.
60///
61/// # Examples
62///
63/// ```rust,no_run
64/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
65///
66/// // Regular consolidation query
67/// let query = ConsolidationQuery {
68///     query_start: 1609459200000000000,
69///     query_end: 1609545600000000000,
70///     use_period_boundaries: true,
71/// };
72///
73/// // Split operation to preserve data
74/// let split_query = ConsolidationQuery {
75///     query_start: 1609459200000000000,
76///     query_end: 1609462800000000000,
77///     use_period_boundaries: false,
78/// };
79/// ```
80#[derive(Debug, Clone)]
81pub struct ConsolidationQuery {
82    /// Start timestamp for the query range (inclusive, in nanoseconds)
83    pub query_start: u64,
84    /// End timestamp for the query range (inclusive, in nanoseconds)
85    pub query_end: u64,
86    /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
87    pub use_period_boundaries: bool,
88}
89
90/// Information about a deletion operation to be executed.
91///
92/// This struct encapsulates all the information needed to execute a single deletion
93/// operation, including the type of operation and file handling details.
94#[derive(Debug, Clone)]
95pub struct DeleteOperation {
96    /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
97    pub operation_type: String,
98    /// List of files involved in this operation.
99    pub files: Vec<String>,
100    /// Start timestamp for data query (used for split operations).
101    pub query_start: u64,
102    /// End timestamp for data query (used for split operations).
103    pub query_end: u64,
104    /// Start timestamp for new file naming (used for split operations).
105    pub file_start_ns: u64,
106    /// End timestamp for new file naming (used for split operations).
107    pub file_end_ns: u64,
108}
109
110impl ParquetDataCatalog {
111    /// Consolidates all data files in the catalog.
112    ///
113    /// This method identifies all leaf directories in the catalog that contain parquet files
114    /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
115    /// This is a convenience method that effectively calls `consolidate_data` for all data types
116    /// and instrument IDs in the catalog.
117    ///
118    /// # Parameters
119    ///
120    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
121    ///   greater than or equal to this value will be consolidated. If None, all files
122    ///   from the beginning of time will be considered.
123    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
124    ///   less than or equal to this value will be consolidated. If None, all files
125    ///   up to the end of time will be considered.
126    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
127    ///
128    /// # Returns
129    ///
130    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
131    ///
132    /// # Errors
133    ///
134    /// This function will return an error if:
135    /// - Directory listing fails.
136    /// - File consolidation operations fail.
137    /// - Interval validation fails (when `ensure_contiguous_files` is true).
138    ///
139    /// # Examples
140    ///
141    /// ```rust,no_run
142    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
143    /// use nautilus_core::UnixNanos;
144    ///
145    /// let catalog = ParquetDataCatalog::new(/* ... */);
146    ///
147    /// // Consolidate all files in the catalog
148    /// catalog.consolidate_catalog(None, None, None)?;
149    ///
150    /// // Consolidate only files within a specific time range
151    /// catalog.consolidate_catalog(
152    ///     Some(UnixNanos::from(1609459200000000000)),
153    ///     Some(UnixNanos::from(1609545600000000000)),
154    ///     Some(true)
155    /// )?;
156    /// # Ok::<(), anyhow::Error>(())
157    /// ```
158    pub fn consolidate_catalog(
159        &self,
160        start: Option<UnixNanos>,
161        end: Option<UnixNanos>,
162        ensure_contiguous_files: Option<bool>,
163    ) -> Result<()> {
164        let leaf_directories = self.find_leaf_data_directories()?;
165
166        for directory in leaf_directories {
167            self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
168        }
169
170        Ok(())
171    }
172
173    /// Consolidates data files for a specific data type and instrument.
174    ///
175    /// This method consolidates Parquet files within a specific directory (defined by data type
176    /// and optional instrument ID) by merging multiple files into a single file. This improves
177    /// query performance and can reduce storage overhead.
178    ///
179    /// # Parameters
180    ///
181    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
182    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
183    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
184    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
185    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
186    ///
187    /// # Returns
188    ///
189    /// Returns `Ok(())` on success, or an error if consolidation fails.
190    ///
191    /// # Errors
192    ///
193    /// This function will return an error if:
194    /// - The directory path cannot be constructed.
195    /// - File consolidation operations fail.
196    /// - Interval validation fails (when `ensure_contiguous_files` is true).
197    ///
198    /// # Examples
199    ///
200    /// ```rust,no_run
201    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
202    /// use nautilus_core::UnixNanos;
203    ///
204    /// let catalog = ParquetDataCatalog::new(/* ... */);
205    ///
206    /// // Consolidate all quote files for a specific instrument
207    /// catalog.consolidate_data(
208    ///     "quotes",
209    ///     Some("BTCUSD".to_string()),
210    ///     None,
211    ///     None,
212    ///     None
213    /// )?;
214    ///
215    /// // Consolidate trade files within a time range
216    /// catalog.consolidate_data(
217    ///     "trades",
218    ///     None,
219    ///     Some(UnixNanos::from(1609459200000000000)),
220    ///     Some(UnixNanos::from(1609545600000000000)),
221    ///     Some(true)
222    /// )?;
223    /// # Ok::<(), anyhow::Error>(())
224    /// ```
225    pub fn consolidate_data(
226        &self,
227        type_name: &str,
228        instrument_id: Option<String>,
229        start: Option<UnixNanos>,
230        end: Option<UnixNanos>,
231        ensure_contiguous_files: Option<bool>,
232    ) -> Result<()> {
233        let directory = self.make_path(type_name, instrument_id)?;
234        self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
235    }
236
237    /// Consolidates Parquet files within a specific directory by merging them into a single file.
238    ///
239    /// This internal method performs the actual consolidation work for a single directory.
240    /// It identifies files within the specified time range, validates their intervals,
241    /// and combines them into a single Parquet file with optimized storage.
242    ///
243    /// # Parameters
244    ///
245    /// - `directory`: The directory path containing Parquet files to consolidate.
246    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
247    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
248    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
249    ///
250    /// # Returns
251    ///
252    /// Returns `Ok(())` on success, or an error if consolidation fails.
253    ///
254    /// # Behavior
255    ///
256    /// - Skips consolidation if directory contains 1 or fewer files.
257    /// - Filters files by timestamp range if start/end are specified.
258    /// - Sorts intervals by start timestamp before consolidation.
259    /// - Creates a new file spanning the entire time range of input files.
260    /// - Validates interval disjointness after consolidation (if enabled).
261    ///
262    /// # Errors
263    ///
264    /// This function will return an error if:
265    /// - Directory listing fails.
266    /// - File combination operations fail.
267    /// - Interval validation fails (when `ensure_contiguous_files` is true).
268    /// - Object store operations fail.
269    fn consolidate_directory(
270        &self,
271        directory: &str,
272        start: Option<UnixNanos>,
273        end: Option<UnixNanos>,
274        ensure_contiguous_files: Option<bool>,
275    ) -> Result<()> {
276        let parquet_files = self.list_parquet_files(directory)?;
277
278        if parquet_files.len() <= 1 {
279            return Ok(());
280        }
281
282        let mut files_to_consolidate = Vec::new();
283        let mut intervals = Vec::new();
284        let start = start.map(|t| t.as_u64());
285        let end = end.map(|t| t.as_u64());
286
287        for file in parquet_files {
288            if let Some(interval) = parse_filename_timestamps(&file) {
289                let (interval_start, interval_end) = interval;
290                let include_file = match (start, end) {
291                    (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
292                    (Some(s), None) => interval_start >= s,
293                    (None, Some(e)) => interval_end <= e,
294                    (None, None) => true,
295                };
296
297                if include_file {
298                    files_to_consolidate.push(file);
299                    intervals.push(interval);
300                }
301            }
302        }
303
304        intervals.sort_by_key(|&(start, _)| start);
305
306        if !intervals.is_empty() {
307            let file_name = timestamps_to_filename(
308                UnixNanos::from(intervals[0].0),
309                UnixNanos::from(intervals.last().unwrap().1),
310            );
311            let path = format!("{directory}/{file_name}");
312
313            // Convert string paths to ObjectPath for the function call
314            let object_paths: Vec<ObjectPath> = files_to_consolidate
315                .iter()
316                .map(|path| ObjectPath::from(path.as_str()))
317                .collect();
318
319            self.execute_async(async {
320                combine_parquet_files_from_object_store(
321                    self.object_store.clone(),
322                    object_paths,
323                    &ObjectPath::from(path),
324                    Some(self.compression),
325                    Some(self.max_row_group_size),
326                )
327                .await
328            })?;
329        }
330
331        if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
332            anyhow::bail!("Intervals are not disjoint after consolidating a directory");
333        }
334
335        Ok(())
336    }
337
338    /// Consolidates all data files in the catalog by splitting them into fixed time periods.
339    ///
340    /// This method identifies all leaf directories in the catalog that contain parquet files
341    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
342    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
343    /// and instrument IDs in the catalog.
344    ///
345    /// # Parameters
346    ///
347    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
348    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
349    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
350    ///   greater than or equal to this value will be consolidated. If None, all files
351    ///   from the beginning of time will be considered.
352    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
353    ///   less than or equal to this value will be consolidated. If None, all files
354    ///   up to the end of time will be considered.
355    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
356    ///   If false, uses actual data timestamps for file naming.
357    ///
358    /// # Returns
359    ///
360    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
361    ///
362    /// # Errors
363    ///
364    /// This function will return an error if:
365    /// - Directory listing fails.
366    /// - Data type extraction from path fails.
367    /// - Period-based consolidation operations fail.
368    ///
369    /// # Notes
370    ///
371    /// - This operation can be resource-intensive for large catalogs with many data types.
372    ///   and instruments.
373    /// - The consolidation process splits data into fixed time periods rather than combining.
374    ///   all files into a single file per directory.
375    /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
376    /// - Original files are removed and replaced with period-based consolidated files.
377    /// - This method is useful for periodic maintenance of the catalog to standardize.
378    ///   file organization by time periods.
379    ///
380    /// # Examples
381    ///
382    /// ```rust,no_run
383    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
384    /// use nautilus_core::UnixNanos;
385    ///
386    /// let catalog = ParquetDataCatalog::new(/* ... */);
387    ///
388    /// // Consolidate all files in the catalog by 1-day periods
389    /// catalog.consolidate_catalog_by_period(
390    ///     Some(86400000000000), // 1 day in nanoseconds
391    ///     None,
392    ///     None,
393    ///     Some(true)
394    /// )?;
395    ///
396    /// // Consolidate only files within a specific time range by 1-hour periods
397    /// catalog.consolidate_catalog_by_period(
398    ///     Some(3600000000000), // 1 hour in nanoseconds
399    ///     Some(UnixNanos::from(1609459200000000000)),
400    ///     Some(UnixNanos::from(1609545600000000000)),
401    ///     Some(false)
402    /// )?;
403    /// # Ok::<(), anyhow::Error>(())
404    /// ```
405    pub fn consolidate_catalog_by_period(
406        &mut self,
407        period_nanos: Option<u64>,
408        start: Option<UnixNanos>,
409        end: Option<UnixNanos>,
410        ensure_contiguous_files: Option<bool>,
411    ) -> Result<()> {
412        let leaf_directories = self.find_leaf_data_directories()?;
413
414        for directory in leaf_directories {
415            let (data_cls, identifier) =
416                self.extract_data_cls_and_identifier_from_path(&directory)?;
417
418            if let Some(data_cls_name) = data_cls {
419                // Use match statement to call the generic consolidate_data_by_period for various types
420                match data_cls_name.as_str() {
421                    "quotes" => {
422                        use nautilus_model::data::QuoteTick;
423                        self.consolidate_data_by_period_generic::<QuoteTick>(
424                            identifier,
425                            period_nanos,
426                            start,
427                            end,
428                            ensure_contiguous_files,
429                        )?;
430                    }
431                    "trades" => {
432                        use nautilus_model::data::TradeTick;
433                        self.consolidate_data_by_period_generic::<TradeTick>(
434                            identifier,
435                            period_nanos,
436                            start,
437                            end,
438                            ensure_contiguous_files,
439                        )?;
440                    }
441                    "order_book_deltas" => {
442                        use nautilus_model::data::OrderBookDelta;
443                        self.consolidate_data_by_period_generic::<OrderBookDelta>(
444                            identifier,
445                            period_nanos,
446                            start,
447                            end,
448                            ensure_contiguous_files,
449                        )?;
450                    }
451                    "order_book_depths" => {
452                        use nautilus_model::data::OrderBookDepth10;
453                        self.consolidate_data_by_period_generic::<OrderBookDepth10>(
454                            identifier,
455                            period_nanos,
456                            start,
457                            end,
458                            ensure_contiguous_files,
459                        )?;
460                    }
461                    "bars" => {
462                        use nautilus_model::data::Bar;
463                        self.consolidate_data_by_period_generic::<Bar>(
464                            identifier,
465                            period_nanos,
466                            start,
467                            end,
468                            ensure_contiguous_files,
469                        )?;
470                    }
471                    "index_prices" => {
472                        use nautilus_model::data::IndexPriceUpdate;
473                        self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
474                            identifier,
475                            period_nanos,
476                            start,
477                            end,
478                            ensure_contiguous_files,
479                        )?;
480                    }
481                    "mark_prices" => {
482                        use nautilus_model::data::MarkPriceUpdate;
483                        self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
484                            identifier,
485                            period_nanos,
486                            start,
487                            end,
488                            ensure_contiguous_files,
489                        )?;
490                    }
491                    "instrument_closes" => {
492                        use nautilus_model::data::close::InstrumentClose;
493                        self.consolidate_data_by_period_generic::<InstrumentClose>(
494                            identifier,
495                            period_nanos,
496                            start,
497                            end,
498                            ensure_contiguous_files,
499                        )?;
500                    }
501                    _ => {
502                        // Skip unknown data types
503                        log::warn!("Unknown data type for consolidation: {data_cls_name}");
504                        continue;
505                    }
506                }
507            }
508        }
509
510        Ok(())
511    }
512
513    /// Extracts data class and identifier from a directory path.
514    ///
515    /// This method parses a directory path to extract the data type and optional
516    /// instrument identifier. It's used to determine what type of data consolidation
517    /// to perform for each directory.
518    ///
519    /// # Parameters
520    ///
521    /// - `path`: The directory path to parse.
522    ///
523    /// # Returns
524    ///
525    /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
526    pub fn extract_data_cls_and_identifier_from_path(
527        &self,
528        path: &str,
529    ) -> Result<(Option<String>, Option<String>)> {
530        // Split the path and look for the data directory structure
531        let path_parts: Vec<&str> = path.split('/').collect();
532
533        // Find the "data" directory in the path
534        if let Some(data_index) = path_parts.iter().position(|&part| part == "data")
535            && data_index + 1 < path_parts.len()
536        {
537            let data_cls = path_parts[data_index + 1].to_string();
538
539            // Check if there's an identifier (instrument ID) after the data class
540            let identifier = if data_index + 2 < path_parts.len() {
541                Some(path_parts[data_index + 2].to_string())
542            } else {
543                None
544            };
545
546            return Ok((Some(data_cls), identifier));
547        }
548
549        // If we can't parse the path, return None for both
550        Ok((None, None))
551    }
552
553    /// Consolidates data files by splitting them into fixed time periods.
554    ///
555    /// This method queries data by period and writes consolidated files immediately,
556    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
557    /// the function automatically splits those files to preserve all data.
558    ///
559    /// # Parameters
560    ///
561    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
562    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
563    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
564    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
565    /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
566    ///   If specified and intersects existing files, those files will be split to preserve
567    ///   data outside the consolidation range.
568    /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
569    ///   If specified and intersects existing files, those files will be split to preserve
570    ///   data outside the consolidation range.
571    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
572    ///   If false, uses actual data timestamps for file naming.
573    ///
574    /// # Returns
575    ///
576    /// Returns `Ok(())` on success, or an error if consolidation fails.
577    ///
578    /// # Errors
579    ///
580    /// This function will return an error if:
581    /// - The directory path cannot be constructed.
582    /// - File operations fail.
583    /// - Data querying or writing fails.
584    ///
585    /// # Notes
586    ///
587    /// - Uses two-phase approach: first determines all queries, then executes them.
588    /// - Groups intervals into contiguous groups to preserve holes between groups.
589    /// - Allows consolidation across multiple files within each contiguous group.
590    /// - Skips queries if target files already exist for efficiency.
591    /// - Original files are removed immediately after querying each period.
592    /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
593    /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
594    /// - Uses modulo arithmetic for efficient period boundary calculation.
595    /// - Preserves holes in data by preventing queries from spanning across gaps.
596    /// - Automatically splits files at start/end boundaries to preserve all data.
597    /// - Split operations are executed before consolidation to ensure data preservation.
598    ///
599    /// # Examples
600    ///
601    /// ```rust,no_run
602    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
603    /// use nautilus_core::UnixNanos;
604    ///
605    /// let catalog = ParquetDataCatalog::new(/* ... */);
606    ///
607    /// // Consolidate all quote files by 1-day periods
608    /// catalog.consolidate_data_by_period(
609    ///     "quotes",
610    ///     None,
611    ///     Some(86400000000000), // 1 day in nanoseconds
612    ///     None,
613    ///     None,
614    ///     Some(true)
615    /// )?;
616    ///
617    /// // Consolidate specific instrument by 1-hour periods
618    /// catalog.consolidate_data_by_period(
619    ///     "trades",
620    ///     Some("BTCUSD".to_string()),
621    ///     Some(3600000000000), // 1 hour in nanoseconds
622    ///     Some(UnixNanos::from(1609459200000000000)),
623    ///     Some(UnixNanos::from(1609545600000000000)),
624    ///     Some(false)
625    /// )?;
626    /// # Ok::<(), anyhow::Error>(())
627    /// ```
628    pub fn consolidate_data_by_period(
629        &mut self,
630        type_name: &str,
631        identifier: Option<String>,
632        period_nanos: Option<u64>,
633        start: Option<UnixNanos>,
634        end: Option<UnixNanos>,
635        ensure_contiguous_files: Option<bool>,
636    ) -> Result<()> {
637        // Use match statement to call the generic consolidate_data_by_period for various types
638        match type_name {
639            "quotes" => {
640                use nautilus_model::data::QuoteTick;
641                self.consolidate_data_by_period_generic::<QuoteTick>(
642                    identifier,
643                    period_nanos,
644                    start,
645                    end,
646                    ensure_contiguous_files,
647                )?;
648            }
649            "trades" => {
650                use nautilus_model::data::TradeTick;
651                self.consolidate_data_by_period_generic::<TradeTick>(
652                    identifier,
653                    period_nanos,
654                    start,
655                    end,
656                    ensure_contiguous_files,
657                )?;
658            }
659            "order_book_deltas" => {
660                use nautilus_model::data::OrderBookDelta;
661                self.consolidate_data_by_period_generic::<OrderBookDelta>(
662                    identifier,
663                    period_nanos,
664                    start,
665                    end,
666                    ensure_contiguous_files,
667                )?;
668            }
669            "order_book_depths" => {
670                use nautilus_model::data::OrderBookDepth10;
671                self.consolidate_data_by_period_generic::<OrderBookDepth10>(
672                    identifier,
673                    period_nanos,
674                    start,
675                    end,
676                    ensure_contiguous_files,
677                )?;
678            }
679            "bars" => {
680                use nautilus_model::data::Bar;
681                self.consolidate_data_by_period_generic::<Bar>(
682                    identifier,
683                    period_nanos,
684                    start,
685                    end,
686                    ensure_contiguous_files,
687                )?;
688            }
689            "index_prices" => {
690                use nautilus_model::data::IndexPriceUpdate;
691                self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
692                    identifier,
693                    period_nanos,
694                    start,
695                    end,
696                    ensure_contiguous_files,
697                )?;
698            }
699            "mark_prices" => {
700                use nautilus_model::data::MarkPriceUpdate;
701                self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
702                    identifier,
703                    period_nanos,
704                    start,
705                    end,
706                    ensure_contiguous_files,
707                )?;
708            }
709            "instrument_closes" => {
710                use nautilus_model::data::close::InstrumentClose;
711                self.consolidate_data_by_period_generic::<InstrumentClose>(
712                    identifier,
713                    period_nanos,
714                    start,
715                    end,
716                    ensure_contiguous_files,
717                )?;
718            }
719            _ => {
720                anyhow::bail!("Unknown data type for consolidation: {}", type_name);
721            }
722        }
723
724        Ok(())
725    }
726
727    /// Generic consolidate data files by splitting them into fixed time periods.
728    ///
729    /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
730    /// to ensure compile-time correctness and enable reuse across different data types.
731    ///
732    /// # Type Parameters
733    ///
734    /// - `T`: The data type to consolidate, must implement required traits for serialization.
735    ///
736    /// # Parameters
737    ///
738    /// - `identifier`: Optional instrument ID to target a specific instrument's data.
739    /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
740    /// - `start`: Optional start timestamp for consolidation range.
741    /// - `end`: Optional end timestamp for consolidation range.
742    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
743    ///
744    /// # Returns
745    ///
746    /// Returns `Ok(())` on success, or an error if consolidation fails.
747    pub fn consolidate_data_by_period_generic<T>(
748        &mut self,
749        identifier: Option<String>,
750        period_nanos: Option<u64>,
751        start: Option<UnixNanos>,
752        end: Option<UnixNanos>,
753        ensure_contiguous_files: Option<bool>,
754    ) -> Result<()>
755    where
756        T: DecodeDataFromRecordBatch
757            + CatalogPathPrefix
758            + EncodeToRecordBatch
759            + HasTsInit
760            + TryFrom<Data>
761            + Clone,
762    {
763        let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
764        let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
765
766        // Use get_intervals for cleaner implementation
767        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
768
769        if intervals.is_empty() {
770            return Ok(()); // No files to consolidate
771        }
772
773        // Use auxiliary function to prepare all queries for execution
774        let queries_to_execute = self.prepare_consolidation_queries(
775            T::path_prefix(),
776            identifier.clone(),
777            &intervals,
778            period_nanos,
779            start,
780            end,
781            ensure_contiguous_files,
782        )?;
783
784        if queries_to_execute.is_empty() {
785            return Ok(()); // No queries to execute
786        }
787
788        // Get directory for file operations
789        let directory = self.make_path(T::path_prefix(), identifier.clone())?;
790        let mut existing_files = self.list_parquet_files(&directory)?;
791        existing_files.sort();
792
793        // Track files to remove and maintain existing_files list
794        let mut files_to_remove = HashSet::new();
795        let original_files_count = existing_files.len();
796
797        // Phase 2: Execute queries, write, and delete
798        let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
799
800        for query_info in queries_to_execute {
801            // Query data for this period using query_typed_data
802            let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
803
804            let period_data = self.query_typed_data::<T>(
805                instrument_ids,
806                Some(UnixNanos::from(query_info.query_start)),
807                Some(UnixNanos::from(query_info.query_end)),
808                None,
809                Some(existing_files.clone()),
810            )?;
811
812            if period_data.is_empty() {
813                // Skip if no data found, but maintain contiguity by using query start
814                if file_start_ns.is_none() {
815                    file_start_ns = Some(query_info.query_start);
816                }
817                continue;
818            }
819            file_start_ns = None;
820
821            // Determine final file timestamps
822            let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
823                // Use period boundaries for file naming, maintaining contiguity
824                if file_start_ns.is_none() {
825                    file_start_ns = Some(query_info.query_start);
826                }
827                (file_start_ns.unwrap(), query_info.query_end)
828            } else {
829                // Use actual data timestamps for file naming
830                let first_ts = period_data.first().unwrap().ts_init().as_u64();
831                let last_ts = period_data.last().unwrap().ts_init().as_u64();
832                (first_ts, last_ts)
833            };
834
835            // Check again if target file exists (in case it was created during this process)
836            let target_filename = format!(
837                "{}/{}",
838                directory,
839                timestamps_to_filename(
840                    UnixNanos::from(final_start_ns),
841                    UnixNanos::from(final_end_ns)
842                )
843            );
844
845            if self.file_exists(&target_filename)? {
846                // Skip if target file already exists
847                continue;
848            }
849
850            // Write consolidated data for this period using write_to_parquet
851            // Use skip_disjoint_check since we're managing file removal carefully
852            let start_ts = UnixNanos::from(final_start_ns);
853            let end_ts = UnixNanos::from(final_end_ns);
854            self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
855
856            // Identify files that are completely covered by this period
857            // Only remove files AFTER successfully writing a new file
858            // Use slice copy to avoid modification during iteration (match Python logic)
859            for file in existing_files.clone() {
860                if let Some(interval) = parse_filename_timestamps(&file)
861                    && interval.1 <= query_info.query_end
862                {
863                    files_to_remove.insert(file.clone());
864                    existing_files.retain(|f| f != &file);
865                }
866            }
867
868            // Remove files as soon as we have some to remove
869            if !files_to_remove.is_empty() {
870                for file in files_to_remove.drain() {
871                    self.delete_file(&file)?;
872                }
873            }
874        }
875
876        // Remove any remaining files that weren't removed in the loop
877        // This matches the Python implementation's final cleanup step
878        // Only remove files if any consolidation actually happened (i.e., files were processed)
879        let files_were_processed = existing_files.len() < original_files_count;
880        if files_were_processed {
881            for file in existing_files {
882                self.delete_file(&file)?;
883            }
884        }
885
886        Ok(())
887    }
888
889    /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
890    ///
891    /// This auxiliary function handles all the preparation logic for consolidation:
892    /// 1. Filters intervals by time range.
893    /// 2. Groups intervals into contiguous groups.
894    /// 3. Identifies and creates split operations for data preservation.
895    /// 4. Generates period-based consolidation queries.
896    /// 5. Checks for existing target files.
897    #[allow(clippy::too_many_arguments)]
898    pub fn prepare_consolidation_queries(
899        &self,
900        type_name: &str,
901        identifier: Option<String>,
902        intervals: &[(u64, u64)],
903        period_nanos: u64,
904        start: Option<UnixNanos>,
905        end: Option<UnixNanos>,
906        ensure_contiguous_files: bool,
907    ) -> Result<Vec<ConsolidationQuery>> {
908        // Filter intervals by time range if specified
909        let used_start = start.map(|s| s.as_u64());
910        let used_end = end.map(|e| e.as_u64());
911
912        let mut filtered_intervals = Vec::new();
913        for &(interval_start, interval_end) in intervals {
914            // Check if interval overlaps with the specified range
915            if (used_start.is_none() || used_start.unwrap() <= interval_end)
916                && (used_end.is_none() || interval_start <= used_end.unwrap())
917            {
918                filtered_intervals.push((interval_start, interval_end));
919            }
920        }
921
922        if filtered_intervals.is_empty() {
923            return Ok(Vec::new()); // No intervals in the specified range
924        }
925
926        // Check contiguity of filtered intervals if required
927        if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
928            anyhow::bail!(
929                "Intervals are not contiguous. When ensure_contiguous_files=true, \
930                 all files in the consolidation range must have contiguous timestamps."
931            );
932        }
933
934        // Group intervals into contiguous groups to preserve holes between groups
935        // but allow consolidation within each contiguous group
936        let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
937
938        let mut queries_to_execute = Vec::new();
939
940        // Handle interval splitting by creating split operations for data preservation
941        if !filtered_intervals.is_empty() {
942            if let Some(start_ts) = used_start {
943                let first_interval = filtered_intervals[0];
944                if first_interval.0 < start_ts && start_ts <= first_interval.1 {
945                    // Split before start: preserve data from interval_start to start-1
946                    queries_to_execute.push(ConsolidationQuery {
947                        query_start: first_interval.0,
948                        query_end: start_ts - 1,
949                        use_period_boundaries: false,
950                    });
951                }
952            }
953
954            if let Some(end_ts) = used_end {
955                let last_interval = filtered_intervals[filtered_intervals.len() - 1];
956                if last_interval.0 <= end_ts && end_ts < last_interval.1 {
957                    // Split after end: preserve data from end+1 to interval_end
958                    queries_to_execute.push(ConsolidationQuery {
959                        query_start: end_ts + 1,
960                        query_end: last_interval.1,
961                        use_period_boundaries: false,
962                    });
963                }
964            }
965        }
966
967        // Generate period-based consolidation queries for each contiguous group
968        for group in contiguous_groups {
969            let group_start = group[0].0;
970            let group_end = group[group.len() - 1].1;
971
972            // Apply start/end filtering to the group
973            let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
974            let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
975
976            if effective_start > effective_end {
977                continue; // Skip if no overlap
978            }
979
980            // Generate period-based queries within this contiguous group
981            let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
982
983            // Add safety check to prevent infinite loops (match Python logic)
984            let max_iterations = 10000;
985            let mut iteration_count = 0;
986
987            while current_start_ns <= effective_end {
988                iteration_count += 1;
989                if iteration_count > max_iterations {
990                    // Safety break to prevent infinite loops
991                    break;
992                }
993                let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
994
995                // Check if target file already exists (only when ensure_contiguous_files is true)
996                if ensure_contiguous_files {
997                    let directory = self.make_path(type_name, identifier.clone())?;
998                    let target_filename = format!(
999                        "{}/{}",
1000                        directory,
1001                        timestamps_to_filename(
1002                            UnixNanos::from(current_start_ns),
1003                            UnixNanos::from(current_end_ns)
1004                        )
1005                    );
1006
1007                    if self.file_exists(&target_filename)? {
1008                        // Skip if target file already exists
1009                        current_start_ns += period_nanos;
1010                        continue;
1011                    }
1012                }
1013
1014                // Add query to execution list
1015                queries_to_execute.push(ConsolidationQuery {
1016                    query_start: current_start_ns,
1017                    query_end: current_end_ns,
1018                    use_period_boundaries: ensure_contiguous_files,
1019                });
1020
1021                // Move to next period
1022                current_start_ns += period_nanos;
1023
1024                if current_start_ns > effective_end {
1025                    break;
1026                }
1027            }
1028        }
1029
1030        // Sort queries by start date to enable efficient file removal
1031        // Files can be removed when interval[1] <= query_info["query_end"]
1032        // and processing in chronological order ensures optimal cleanup
1033        queries_to_execute.sort_by_key(|q| q.query_start);
1034
1035        Ok(queries_to_execute)
1036    }
1037
1038    /// Groups intervals into contiguous groups for efficient consolidation.
1039    ///
1040    /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1041    /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1042    /// before the start of the next interval. This grouping preserves data gaps while allowing
1043    /// consolidation within each contiguous group.
1044    ///
1045    /// # Parameters
1046    ///
1047    /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1048    ///
1049    /// # Returns
1050    ///
1051    /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1052    /// Returns an empty vector if the input is empty.
1053    ///
1054    /// # Algorithm
1055    ///
1056    /// 1. Starts with the first interval in a new group.
1057    /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1058    /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1059    /// 4. If not contiguous, starts a new group.
1060    /// 5. Returns all groups.
1061    ///
1062    /// # Examples
1063    ///
1064    /// ```text
1065    /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1066    /// Returns: [[(1,5), (6,10), (11,15)]]
1067    ///
1068    /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1069    /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1070    /// ```
1071    ///
1072    /// # Notes
1073    ///
1074    /// - Input intervals should be sorted by start timestamp.
1075    /// - Gaps between groups are preserved and not consolidated.
1076    /// - Used internally by period-based consolidation methods.
1077    #[must_use]
1078    pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1079        if intervals.is_empty() {
1080            return Vec::new();
1081        }
1082
1083        let mut contiguous_groups = Vec::new();
1084        let mut current_group = vec![intervals[0]];
1085
1086        for i in 1..intervals.len() {
1087            let prev_interval = intervals[i - 1];
1088            let curr_interval = intervals[i];
1089
1090            // Check if current interval is contiguous with previous (end + 1 == start)
1091            if prev_interval.1 + 1 == curr_interval.0 {
1092                current_group.push(curr_interval);
1093            } else {
1094                // Gap found, start new group
1095                contiguous_groups.push(current_group);
1096                current_group = vec![curr_interval];
1097            }
1098        }
1099
1100        // Add the last group
1101        contiguous_groups.push(current_group);
1102
1103        contiguous_groups
1104    }
1105
1106    /// Checks if a file exists in the object store.
1107    ///
1108    /// This method performs a HEAD operation on the object store to determine if a file
1109    /// exists without downloading its content. It works with both local and remote object stores.
1110    ///
1111    /// # Parameters
1112    ///
1113    /// - `path`: The file path to check, relative to the catalog structure.
1114    ///
1115    /// # Returns
1116    ///
1117    /// Returns `true` if the file exists, `false` if it doesn't exist.
1118    ///
1119    /// # Errors
1120    ///
1121    /// Returns an error if the object store operation fails due to network issues,
1122    /// authentication problems, or other I/O errors.
1123    fn file_exists(&self, path: &str) -> Result<bool> {
1124        let object_path = self.to_object_path(path);
1125        let exists =
1126            self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1127        Ok(exists)
1128    }
1129
1130    /// Deletes a file from the object store.
1131    ///
1132    /// This method removes a file from the object store. The operation is permanent
1133    /// and cannot be undone. It works with both local filesystems and remote object stores.
1134    ///
1135    /// # Parameters
1136    ///
1137    /// - `path`: The file path to delete, relative to the catalog structure.
1138    ///
1139    /// # Returns
1140    ///
1141    /// Returns `Ok(())` on successful deletion.
1142    ///
1143    /// # Errors
1144    ///
1145    /// Returns an error if:
1146    /// - The file doesn't exist.
1147    /// - Permission is denied.
1148    /// - Network issues occur (for remote stores).
1149    /// - The object store operation fails.
1150    ///
1151    /// # Safety
1152    ///
1153    /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1154    fn delete_file(&self, path: &str) -> Result<()> {
1155        let object_path = self.to_object_path(path);
1156        self.execute_async(async {
1157            self.object_store
1158                .delete(&object_path)
1159                .await
1160                .map_err(anyhow::Error::from)
1161        })?;
1162        Ok(())
1163    }
1164
1165    /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1166    ///
1167    /// This method scans all leaf data directories in the catalog and renames files based on
1168    /// the actual timestamp range of their content. This is useful when files have been
1169    /// modified or when filename conventions have changed.
1170    ///
1171    /// # Returns
1172    ///
1173    /// Returns `Ok(())` on success, or an error if the operation fails.
1174    ///
1175    /// # Errors
1176    ///
1177    /// This function will return an error if:
1178    /// - Directory listing fails.
1179    /// - File metadata reading fails.
1180    /// - File rename operations fail.
1181    /// - Interval validation fails after renaming.
1182    ///
1183    /// # Examples
1184    ///
1185    /// ```rust,no_run
1186    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1187    ///
1188    /// let catalog = ParquetDataCatalog::new(/* ... */);
1189    ///
1190    /// // Reset all filenames in the catalog
1191    /// catalog.reset_catalog_file_names()?;
1192    /// # Ok::<(), anyhow::Error>(())
1193    /// ```
1194    pub fn reset_catalog_file_names(&self) -> Result<()> {
1195        let leaf_directories = self.find_leaf_data_directories()?;
1196
1197        for directory in leaf_directories {
1198            self.reset_file_names(&directory)?;
1199        }
1200
1201        Ok(())
1202    }
1203
1204    /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1205    ///
1206    /// This method renames files in a specific directory based on the actual timestamp
1207    /// range of their content. This is useful for correcting filenames after data
1208    /// modifications or when filename conventions have changed.
1209    ///
1210    /// # Parameters
1211    ///
1212    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1213    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1214    ///
1215    /// # Returns
1216    ///
1217    /// Returns `Ok(())` on success, or an error if the operation fails.
1218    ///
1219    /// # Errors
1220    ///
1221    /// This function will return an error if:
1222    /// - The directory path cannot be constructed.
1223    /// - File metadata reading fails.
1224    /// - File rename operations fail.
1225    /// - Interval validation fails after renaming.
1226    ///
1227    /// # Examples
1228    ///
1229    /// ```rust,no_run
1230    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1231    ///
1232    /// let catalog = ParquetDataCatalog::new(/* ... */);
1233    ///
1234    /// // Reset filenames for all quote files
1235    /// catalog.reset_data_file_names("quotes", None)?;
1236    ///
1237    /// // Reset filenames for a specific instrument's trade files
1238    /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1239    /// # Ok::<(), anyhow::Error>(())
1240    /// ```
1241    pub fn reset_data_file_names(
1242        &self,
1243        data_cls: &str,
1244        instrument_id: Option<String>,
1245    ) -> Result<()> {
1246        let directory = self.make_path(data_cls, instrument_id)?;
1247        self.reset_file_names(&directory)
1248    }
1249
1250    /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1251    ///
1252    /// This internal method scans all Parquet files in a directory, reads their metadata to
1253    /// determine the actual timestamp range of their content, and renames the files accordingly.
1254    /// This ensures that filenames accurately reflect the data they contain.
1255    ///
1256    /// # Parameters
1257    ///
1258    /// - `directory`: The directory path containing Parquet files to rename.
1259    ///
1260    /// # Returns
1261    ///
1262    /// Returns `Ok(())` on success, or an error if the operation fails.
1263    ///
1264    /// # Process
1265    ///
1266    /// 1. Lists all Parquet files in the directory
1267    /// 2. For each file, reads metadata to extract min/max timestamps
1268    /// 3. Generates a new filename based on actual timestamp range
1269    /// 4. Moves the file to the new name using object store operations
1270    /// 5. Validates that intervals remain disjoint after renaming
1271    ///
1272    /// # Errors
1273    ///
1274    /// This function will return an error if:
1275    /// - Directory listing fails.
1276    /// - Metadata reading fails for any file.
1277    /// - File move operations fail.
1278    /// - Interval validation fails after renaming.
1279    /// - Object store operations fail.
1280    ///
1281    /// # Notes
1282    ///
1283    /// - This operation can be time-consuming for directories with many files.
1284    /// - Files are processed sequentially to avoid conflicts.
1285    /// - The operation is atomic per file but not across the entire directory.
1286    fn reset_file_names(&self, directory: &str) -> Result<()> {
1287        let parquet_files = self.list_parquet_files(directory)?;
1288
1289        for file in parquet_files {
1290            let object_path = ObjectPath::from(file.as_str());
1291            let (first_ts, last_ts) = self.execute_async(async {
1292                min_max_from_parquet_metadata_object_store(
1293                    self.object_store.clone(),
1294                    &object_path,
1295                    "ts_init",
1296                )
1297                .await
1298            })?;
1299
1300            let new_filename =
1301                timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1302            let new_file_path = format!("{directory}/{new_filename}");
1303            let new_object_path = ObjectPath::from(new_file_path);
1304
1305            self.move_file(&object_path, &new_object_path)?;
1306        }
1307
1308        let intervals = self.get_directory_intervals(directory)?;
1309
1310        if !are_intervals_disjoint(&intervals) {
1311            anyhow::bail!("Intervals are not disjoint after resetting file names");
1312        }
1313
1314        Ok(())
1315    }
1316
1317    /// Finds all leaf data directories in the catalog.
1318    ///
1319    /// A leaf directory is one that contains data files but no subdirectories.
1320    /// This method is used to identify directories that can be processed for
1321    /// consolidation or other operations.
1322    ///
1323    /// # Returns
1324    ///
1325    /// Returns a vector of directory path strings representing leaf directories,
1326    /// or an error if directory traversal fails.
1327    ///
1328    /// # Errors
1329    ///
1330    /// This function will return an error if:
1331    /// - Object store listing operations fail.
1332    /// - Directory structure cannot be analyzed.
1333    ///
1334    /// # Examples
1335    ///
1336    /// ```rust,no_run
1337    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1338    ///
1339    /// let catalog = ParquetDataCatalog::new(/* ... */);
1340    ///
1341    /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1342    /// for dir in leaf_dirs {
1343    ///     println!("Found leaf directory: {}", dir);
1344    /// }
1345    /// # Ok::<(), anyhow::Error>(())
1346    /// ```
1347    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1348        let data_dir = if self.base_path.is_empty() {
1349            "data".to_string()
1350        } else {
1351            format!("{}/data", self.base_path)
1352        };
1353
1354        let leaf_dirs = self.execute_async(async {
1355            let mut all_paths = std::collections::HashSet::new();
1356            let mut directories = std::collections::HashSet::new();
1357            let mut files_in_dirs = std::collections::HashMap::new();
1358
1359            // List all objects under the data directory
1360            let prefix = ObjectPath::from(format!("{data_dir}/"));
1361            let mut stream = self.object_store.list(Some(&prefix));
1362
1363            while let Some(object) = stream.next().await {
1364                let object = object?;
1365                let path_str = object.location.to_string();
1366                all_paths.insert(path_str.clone());
1367
1368                // Extract directory path
1369                if let Some(parent) = std::path::Path::new(&path_str).parent() {
1370                    let parent_str = parent.to_string_lossy().to_string();
1371                    directories.insert(parent_str.clone());
1372
1373                    // Track files in each directory
1374                    files_in_dirs
1375                        .entry(parent_str)
1376                        .or_insert_with(Vec::new)
1377                        .push(path_str);
1378                }
1379            }
1380
1381            // Find leaf directories (directories with files but no subdirectories)
1382            let mut leaf_dirs = Vec::new();
1383            for dir in &directories {
1384                let has_files = files_in_dirs
1385                    .get(dir)
1386                    .is_some_and(|files| !files.is_empty());
1387                let has_subdirs = directories
1388                    .iter()
1389                    .any(|d| d.starts_with(&format!("{dir}/")) && d != dir);
1390
1391                if has_files && !has_subdirs {
1392                    leaf_dirs.push(dir.clone());
1393                }
1394            }
1395
1396            Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1397        })?;
1398
1399        Ok(leaf_dirs)
1400    }
1401
1402    /// Deletes data within a specified time range for a specific data type and instrument.
1403    ///
1404    /// This method identifies all parquet files that intersect with the specified time range
1405    /// and handles them appropriately:
1406    /// - Files completely within the range are deleted
1407    /// - Files partially overlapping the range are split to preserve data outside the range
1408    /// - The original intersecting files are removed after processing
1409    ///
1410    /// # Parameters
1411    ///
1412    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1413    /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1414    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1415    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1416    ///
1417    /// # Returns
1418    ///
1419    /// Returns `Ok(())` on success, or an error if deletion fails.
1420    ///
1421    /// # Errors
1422    ///
1423    /// This function will return an error if:
1424    /// - The directory path cannot be constructed.
1425    /// - File operations fail.
1426    /// - Data querying or writing fails.
1427    ///
1428    /// # Notes
1429    ///
1430    /// - This operation permanently removes data and cannot be undone.
1431    /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1432    /// - The method ensures data integrity by using atomic operations where possible.
1433    /// - Empty directories are not automatically removed after deletion.
1434    ///
1435    /// # Examples
1436    ///
1437    /// ```rust,no_run
1438    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1439    /// use nautilus_core::UnixNanos;
1440    ///
1441    /// let catalog = ParquetDataCatalog::new(/* ... */);
1442    ///
1443    /// // Delete all quote data for a specific instrument
1444    /// catalog.delete_data_range(
1445    ///     "quotes",
1446    ///     Some("BTCUSD".to_string()),
1447    ///     None,
1448    ///     None
1449    /// )?;
1450    ///
1451    /// // Delete trade data within a specific time range
1452    /// catalog.delete_data_range(
1453    ///     "trades",
1454    ///     None,
1455    ///     Some(UnixNanos::from(1609459200000000000)),
1456    ///     Some(UnixNanos::from(1609545600000000000))
1457    /// )?;
1458    /// # Ok::<(), anyhow::Error>(())
1459    /// ```
1460    pub fn delete_data_range(
1461        &mut self,
1462        type_name: &str,
1463        identifier: Option<String>,
1464        start: Option<UnixNanos>,
1465        end: Option<UnixNanos>,
1466    ) -> Result<()> {
1467        // Use match statement to call the generic delete_data_range for various types
1468        match type_name {
1469            "quotes" => {
1470                use nautilus_model::data::QuoteTick;
1471                self.delete_data_range_generic::<QuoteTick>(identifier, start, end)
1472            }
1473            "trades" => {
1474                use nautilus_model::data::TradeTick;
1475                self.delete_data_range_generic::<TradeTick>(identifier, start, end)
1476            }
1477            "bars" => {
1478                use nautilus_model::data::Bar;
1479                self.delete_data_range_generic::<Bar>(identifier, start, end)
1480            }
1481            "order_book_deltas" => {
1482                use nautilus_model::data::OrderBookDelta;
1483                self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1484            }
1485            "order_book_depth10" => {
1486                use nautilus_model::data::OrderBookDepth10;
1487                self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1488            }
1489            _ => Err(anyhow::anyhow!("Unsupported data type: {}", type_name)),
1490        }
1491    }
1492
1493    /// Deletes data within a specified time range across the entire catalog.
1494    ///
1495    /// This method identifies all leaf directories in the catalog that contain parquet files
1496    /// and deletes data within the specified time range from each directory. A leaf directory
1497    /// is one that contains files but no subdirectories. This is a convenience method that
1498    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1499    ///
1500    /// # Parameters
1501    ///
1502    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1503    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1504    ///
1505    /// # Returns
1506    ///
1507    /// Returns `Ok(())` on success, or an error if deletion fails.
1508    ///
1509    /// # Errors
1510    ///
1511    /// This function will return an error if:
1512    /// - Directory traversal fails.
1513    /// - Data class extraction from paths fails.
1514    /// - Individual delete operations fail.
1515    ///
1516    /// # Notes
1517    ///
1518    /// - This operation permanently removes data and cannot be undone.
1519    /// - The deletion process handles file intersections intelligently by splitting files
1520    ///   when they partially overlap with the deletion range.
1521    /// - Files completely within the deletion range are removed entirely.
1522    /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1523    /// - This method is useful for bulk data cleanup operations across the entire catalog.
1524    /// - Empty directories are not automatically removed after deletion.
1525    ///
1526    /// # Examples
1527    ///
1528    /// ```rust,no_run
1529    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1530    /// use nautilus_core::UnixNanos;
1531    ///
1532    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1533    ///
1534    /// // Delete all data before a specific date across entire catalog
1535    /// catalog.delete_catalog_range(
1536    ///     None,
1537    ///     Some(UnixNanos::from(1609459200000000000))
1538    /// )?;
1539    ///
1540    /// // Delete all data within a specific range across entire catalog
1541    /// catalog.delete_catalog_range(
1542    ///     Some(UnixNanos::from(1609459200000000000)),
1543    ///     Some(UnixNanos::from(1609545600000000000))
1544    /// )?;
1545    ///
1546    /// // Delete all data after a specific date across entire catalog
1547    /// catalog.delete_catalog_range(
1548    ///     Some(UnixNanos::from(1609459200000000000)),
1549    ///     None
1550    /// )?;
1551    /// # Ok::<(), anyhow::Error>(())
1552    /// ```
1553    pub fn delete_catalog_range(
1554        &mut self,
1555        start: Option<UnixNanos>,
1556        end: Option<UnixNanos>,
1557    ) -> Result<()> {
1558        let leaf_directories = self.find_leaf_data_directories()?;
1559
1560        for directory in leaf_directories {
1561            if let Ok((Some(data_type), identifier)) =
1562                self.extract_data_cls_and_identifier_from_path(&directory)
1563            {
1564                // Call the existing delete_data_range method
1565                if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1566                    eprintln!("Failed to delete data in directory {directory}: {e}");
1567                    // Continue with other directories instead of failing completely
1568                }
1569            }
1570        }
1571
1572        Ok(())
1573    }
1574
1575    /// Generic implementation for deleting data within a specified time range.
1576    ///
1577    /// This method provides the core deletion logic that works with any data type
1578    /// that implements the required traits. It handles file intersection analysis,
1579    /// data splitting for partial overlaps, and file cleanup.
1580    ///
1581    /// # Type Parameters
1582    ///
1583    /// - `T`: The data type that implements required traits for catalog operations.
1584    ///
1585    /// # Parameters
1586    ///
1587    /// - `identifier`: Optional instrument ID to delete data for.
1588    /// - `start`: Optional start timestamp for the deletion range.
1589    /// - `end`: Optional end timestamp for the deletion range.
1590    ///
1591    /// # Returns
1592    ///
1593    /// Returns `Ok(())` on success, or an error if deletion fails.
1594    pub fn delete_data_range_generic<T>(
1595        &mut self,
1596        identifier: Option<String>,
1597        start: Option<UnixNanos>,
1598        end: Option<UnixNanos>,
1599    ) -> Result<()>
1600    where
1601        T: DecodeDataFromRecordBatch
1602            + CatalogPathPrefix
1603            + EncodeToRecordBatch
1604            + HasTsInit
1605            + TryFrom<Data>
1606            + Clone,
1607    {
1608        // Get intervals for cleaner implementation
1609        let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1610
1611        if intervals.is_empty() {
1612            return Ok(()); // No files to process
1613        }
1614
1615        // Prepare all operations for execution
1616        let operations_to_execute = self.prepare_delete_operations(
1617            T::path_prefix(),
1618            identifier.clone(),
1619            &intervals,
1620            start,
1621            end,
1622        )?;
1623
1624        if operations_to_execute.is_empty() {
1625            return Ok(()); // No operations to execute
1626        }
1627
1628        // Execute all operations
1629        let mut files_to_remove = std::collections::HashSet::new();
1630
1631        for operation in operations_to_execute {
1632            match operation.operation_type.as_str() {
1633                "split_before" => {
1634                    // Query data before the deletion range and write it
1635                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1636                    let before_data = self.query_typed_data::<T>(
1637                        instrument_ids,
1638                        Some(UnixNanos::from(operation.query_start)),
1639                        Some(UnixNanos::from(operation.query_end)),
1640                        None,
1641                        Some(operation.files.clone()),
1642                    )?;
1643
1644                    if !before_data.is_empty() {
1645                        let start_ts = UnixNanos::from(operation.file_start_ns);
1646                        let end_ts = UnixNanos::from(operation.file_end_ns);
1647                        self.write_to_parquet(
1648                            before_data,
1649                            Some(start_ts),
1650                            Some(end_ts),
1651                            Some(true),
1652                        )?;
1653                    }
1654                }
1655                "split_after" => {
1656                    // Query data after the deletion range and write it
1657                    let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1658                    let after_data = self.query_typed_data::<T>(
1659                        instrument_ids,
1660                        Some(UnixNanos::from(operation.query_start)),
1661                        Some(UnixNanos::from(operation.query_end)),
1662                        None,
1663                        Some(operation.files.clone()),
1664                    )?;
1665
1666                    if !after_data.is_empty() {
1667                        let start_ts = UnixNanos::from(operation.file_start_ns);
1668                        let end_ts = UnixNanos::from(operation.file_end_ns);
1669                        self.write_to_parquet(
1670                            after_data,
1671                            Some(start_ts),
1672                            Some(end_ts),
1673                            Some(true),
1674                        )?;
1675                    }
1676                }
1677                _ => {
1678                    // For "remove" operations, just mark files for removal
1679                }
1680            }
1681
1682            // Mark files for removal (applies to all operation types)
1683            for file in operation.files {
1684                files_to_remove.insert(file);
1685            }
1686        }
1687
1688        // Remove all files that were processed
1689        for file in files_to_remove {
1690            if let Err(e) = self.delete_file(&file) {
1691                eprintln!("Failed to delete file {file}: {e}");
1692            }
1693        }
1694
1695        Ok(())
1696    }
1697
1698    /// Prepares all operations for data deletion by identifying files that need to be
1699    /// split or removed.
1700    ///
1701    /// This auxiliary function handles all the preparation logic for deletion:
1702    /// 1. Filters intervals by time range
1703    /// 2. Identifies files that intersect with the deletion range
1704    /// 3. Creates split operations for files that partially overlap
1705    /// 4. Generates removal operations for files completely within the range
1706    ///
1707    /// # Parameters
1708    ///
1709    /// - `type_name`: The data type directory name for path generation.
1710    /// - `identifier`: Optional instrument identifier for path generation.
1711    /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1712    /// - `start`: Optional start timestamp for deletion range.
1713    /// - `end`: Optional end timestamp for deletion range.
1714    ///
1715    /// # Returns
1716    ///
1717    /// Returns a vector of `DeleteOperation` structs ready for execution.
1718    pub fn prepare_delete_operations(
1719        &self,
1720        type_name: &str,
1721        identifier: Option<String>,
1722        intervals: &[(u64, u64)],
1723        start: Option<UnixNanos>,
1724        end: Option<UnixNanos>,
1725    ) -> Result<Vec<DeleteOperation>> {
1726        // Convert start/end to nanoseconds
1727        let delete_start_ns = start.map(|s| s.as_u64());
1728        let delete_end_ns = end.map(|e| e.as_u64());
1729
1730        let mut operations = Vec::new();
1731
1732        // Get directory for file path construction
1733        let directory = self.make_path(type_name, identifier)?;
1734
1735        // Process each interval (which represents an actual file)
1736        for &(file_start_ns, file_end_ns) in intervals {
1737            // Check if file intersects with deletion range
1738            let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1739                && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1740
1741            if !intersects {
1742                continue; // File doesn't intersect with deletion range
1743            }
1744
1745            // Construct file path from interval timestamps
1746            let filename = timestamps_to_filename(
1747                UnixNanos::from(file_start_ns),
1748                UnixNanos::from(file_end_ns),
1749            );
1750            let file_path = format!("{directory}/{filename}");
1751
1752            // Determine what type of operation is needed
1753            let file_completely_within_range = (delete_start_ns.is_none()
1754                || delete_start_ns.unwrap() <= file_start_ns)
1755                && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1756
1757            if file_completely_within_range {
1758                // File is completely within deletion range - just mark for removal
1759                operations.push(DeleteOperation {
1760                    operation_type: "remove".to_string(),
1761                    files: vec![file_path],
1762                    query_start: 0,
1763                    query_end: 0,
1764                    file_start_ns: 0,
1765                    file_end_ns: 0,
1766                });
1767            } else {
1768                // File partially overlaps - need to split
1769                if let Some(delete_start) = delete_start_ns
1770                    && file_start_ns < delete_start
1771                {
1772                    // Keep data before deletion range
1773                    operations.push(DeleteOperation {
1774                        operation_type: "split_before".to_string(),
1775                        files: vec![file_path.clone()],
1776                        query_start: file_start_ns,
1777                        query_end: delete_start.saturating_sub(1), // Exclusive end
1778                        file_start_ns,
1779                        file_end_ns: delete_start.saturating_sub(1),
1780                    });
1781                }
1782
1783                if let Some(delete_end) = delete_end_ns
1784                    && delete_end < file_end_ns
1785                {
1786                    // Keep data after deletion range
1787                    operations.push(DeleteOperation {
1788                        operation_type: "split_after".to_string(),
1789                        files: vec![file_path.clone()],
1790                        query_start: delete_end.saturating_add(1), // Exclusive start
1791                        query_end: file_end_ns,
1792                        file_start_ns: delete_end.saturating_add(1),
1793                        file_end_ns,
1794                    });
1795                }
1796            }
1797        }
1798
1799        Ok(operations)
1800    }
1801}