nautilus_persistence/backend/catalog_operations.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use std::collections::HashSet;
22
23use anyhow::Result;
24use futures::StreamExt;
25use nautilus_core::UnixNanos;
26use nautilus_model::data::{Data, HasTsInit};
27use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
28use object_store::path::Path as ObjectPath;
29
30use crate::{
31 backend::catalog::{
32 CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
33 parse_filename_timestamps, timestamps_to_filename,
34 },
35 parquet::{
36 combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
37 },
38};
39
40/// Information about a consolidation query to be executed.
41///
42/// This struct encapsulates all the information needed to execute a single consolidation
43/// operation, including the data range to query and file naming strategy.
44///
45/// # Fields
46///
47/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
48/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
49/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
50///
51/// # Usage
52///
53/// This struct is used internally by the consolidation system to plan and execute
54/// data consolidation operations. It allows the system to:
55/// - Separate query planning from execution.
56/// - Handle complex scenarios like data splitting.
57/// - Optimize file naming strategies.
58/// - Batch multiple operations efficiently.
59/// - Maintain file contiguity across periods.
60///
61/// # Examples
62///
63/// ```rust,no_run
64/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
65///
66/// // Regular consolidation query
67/// let query = ConsolidationQuery {
68/// query_start: 1609459200000000000,
69/// query_end: 1609545600000000000,
70/// use_period_boundaries: true,
71/// };
72///
73/// // Split operation to preserve data
74/// let split_query = ConsolidationQuery {
75/// query_start: 1609459200000000000,
76/// query_end: 1609462800000000000,
77/// use_period_boundaries: false,
78/// };
79/// ```
80#[derive(Debug, Clone)]
81pub struct ConsolidationQuery {
82 /// Start timestamp for the query range (inclusive, in nanoseconds)
83 pub query_start: u64,
84 /// End timestamp for the query range (inclusive, in nanoseconds)
85 pub query_end: u64,
86 /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
87 pub use_period_boundaries: bool,
88}
89
90/// Information about a deletion operation to be executed.
91///
92/// This struct encapsulates all the information needed to execute a single deletion
93/// operation, including the type of operation and file handling details.
94#[derive(Debug, Clone)]
95pub struct DeleteOperation {
96 /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
97 pub operation_type: String,
98 /// List of files involved in this operation.
99 pub files: Vec<String>,
100 /// Start timestamp for data query (used for split operations).
101 pub query_start: u64,
102 /// End timestamp for data query (used for split operations).
103 pub query_end: u64,
104 /// Start timestamp for new file naming (used for split operations).
105 pub file_start_ns: u64,
106 /// End timestamp for new file naming (used for split operations).
107 pub file_end_ns: u64,
108}
109
110impl ParquetDataCatalog {
111 /// Consolidates all data files in the catalog.
112 ///
113 /// This method identifies all leaf directories in the catalog that contain parquet files
114 /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
115 /// This is a convenience method that effectively calls `consolidate_data` for all data types
116 /// and instrument IDs in the catalog.
117 ///
118 /// # Parameters
119 ///
120 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
121 /// greater than or equal to this value will be consolidated. If None, all files
122 /// from the beginning of time will be considered.
123 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
124 /// less than or equal to this value will be consolidated. If None, all files
125 /// up to the end of time will be considered.
126 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
127 ///
128 /// # Returns
129 ///
130 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
131 ///
132 /// # Errors
133 ///
134 /// This function will return an error if:
135 /// - Directory listing fails.
136 /// - File consolidation operations fail.
137 /// - Interval validation fails (when `ensure_contiguous_files` is true).
138 ///
139 /// # Examples
140 ///
141 /// ```rust,no_run
142 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
143 /// use nautilus_core::UnixNanos;
144 ///
145 /// let catalog = ParquetDataCatalog::new(/* ... */);
146 ///
147 /// // Consolidate all files in the catalog
148 /// catalog.consolidate_catalog(None, None, None)?;
149 ///
150 /// // Consolidate only files within a specific time range
151 /// catalog.consolidate_catalog(
152 /// Some(UnixNanos::from(1609459200000000000)),
153 /// Some(UnixNanos::from(1609545600000000000)),
154 /// Some(true)
155 /// )?;
156 /// # Ok::<(), anyhow::Error>(())
157 /// ```
158 pub fn consolidate_catalog(
159 &self,
160 start: Option<UnixNanos>,
161 end: Option<UnixNanos>,
162 ensure_contiguous_files: Option<bool>,
163 ) -> Result<()> {
164 let leaf_directories = self.find_leaf_data_directories()?;
165
166 for directory in leaf_directories {
167 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
168 }
169
170 Ok(())
171 }
172
173 /// Consolidates data files for a specific data type and instrument.
174 ///
175 /// This method consolidates Parquet files within a specific directory (defined by data type
176 /// and optional instrument ID) by merging multiple files into a single file. This improves
177 /// query performance and can reduce storage overhead.
178 ///
179 /// # Parameters
180 ///
181 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
182 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
183 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
184 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
185 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
186 ///
187 /// # Returns
188 ///
189 /// Returns `Ok(())` on success, or an error if consolidation fails.
190 ///
191 /// # Errors
192 ///
193 /// This function will return an error if:
194 /// - The directory path cannot be constructed.
195 /// - File consolidation operations fail.
196 /// - Interval validation fails (when `ensure_contiguous_files` is true).
197 ///
198 /// # Examples
199 ///
200 /// ```rust,no_run
201 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
202 /// use nautilus_core::UnixNanos;
203 ///
204 /// let catalog = ParquetDataCatalog::new(/* ... */);
205 ///
206 /// // Consolidate all quote files for a specific instrument
207 /// catalog.consolidate_data(
208 /// "quotes",
209 /// Some("BTCUSD".to_string()),
210 /// None,
211 /// None,
212 /// None
213 /// )?;
214 ///
215 /// // Consolidate trade files within a time range
216 /// catalog.consolidate_data(
217 /// "trades",
218 /// None,
219 /// Some(UnixNanos::from(1609459200000000000)),
220 /// Some(UnixNanos::from(1609545600000000000)),
221 /// Some(true)
222 /// )?;
223 /// # Ok::<(), anyhow::Error>(())
224 /// ```
225 pub fn consolidate_data(
226 &self,
227 type_name: &str,
228 instrument_id: Option<String>,
229 start: Option<UnixNanos>,
230 end: Option<UnixNanos>,
231 ensure_contiguous_files: Option<bool>,
232 ) -> Result<()> {
233 let directory = self.make_path(type_name, instrument_id)?;
234 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
235 }
236
237 /// Consolidates Parquet files within a specific directory by merging them into a single file.
238 ///
239 /// This internal method performs the actual consolidation work for a single directory.
240 /// It identifies files within the specified time range, validates their intervals,
241 /// and combines them into a single Parquet file with optimized storage.
242 ///
243 /// # Parameters
244 ///
245 /// - `directory`: The directory path containing Parquet files to consolidate.
246 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
247 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
248 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
249 ///
250 /// # Returns
251 ///
252 /// Returns `Ok(())` on success, or an error if consolidation fails.
253 ///
254 /// # Behavior
255 ///
256 /// - Skips consolidation if directory contains 1 or fewer files.
257 /// - Filters files by timestamp range if start/end are specified.
258 /// - Sorts intervals by start timestamp before consolidation.
259 /// - Creates a new file spanning the entire time range of input files.
260 /// - Validates interval disjointness after consolidation (if enabled).
261 ///
262 /// # Errors
263 ///
264 /// This function will return an error if:
265 /// - Directory listing fails.
266 /// - File combination operations fail.
267 /// - Interval validation fails (when `ensure_contiguous_files` is true).
268 /// - Object store operations fail.
269 fn consolidate_directory(
270 &self,
271 directory: &str,
272 start: Option<UnixNanos>,
273 end: Option<UnixNanos>,
274 ensure_contiguous_files: Option<bool>,
275 ) -> Result<()> {
276 let parquet_files = self.list_parquet_files(directory)?;
277
278 if parquet_files.len() <= 1 {
279 return Ok(());
280 }
281
282 let mut files_to_consolidate = Vec::new();
283 let mut intervals = Vec::new();
284 let start = start.map(|t| t.as_u64());
285 let end = end.map(|t| t.as_u64());
286
287 for file in parquet_files {
288 if let Some(interval) = parse_filename_timestamps(&file) {
289 let (interval_start, interval_end) = interval;
290 let include_file = match (start, end) {
291 (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
292 (Some(s), None) => interval_start >= s,
293 (None, Some(e)) => interval_end <= e,
294 (None, None) => true,
295 };
296
297 if include_file {
298 files_to_consolidate.push(file);
299 intervals.push(interval);
300 }
301 }
302 }
303
304 intervals.sort_by_key(|&(start, _)| start);
305
306 if !intervals.is_empty() {
307 let file_name = timestamps_to_filename(
308 UnixNanos::from(intervals[0].0),
309 UnixNanos::from(intervals.last().unwrap().1),
310 );
311 let path = format!("{directory}/{file_name}");
312
313 // Convert string paths to ObjectPath for the function call
314 let object_paths: Vec<ObjectPath> = files_to_consolidate
315 .iter()
316 .map(|path| ObjectPath::from(path.as_str()))
317 .collect();
318
319 self.execute_async(async {
320 combine_parquet_files_from_object_store(
321 self.object_store.clone(),
322 object_paths,
323 &ObjectPath::from(path),
324 Some(self.compression),
325 Some(self.max_row_group_size),
326 )
327 .await
328 })?;
329 }
330
331 if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
332 anyhow::bail!("Intervals are not disjoint after consolidating a directory");
333 }
334
335 Ok(())
336 }
337
338 /// Consolidates all data files in the catalog by splitting them into fixed time periods.
339 ///
340 /// This method identifies all leaf directories in the catalog that contain parquet files
341 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
342 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
343 /// and instrument IDs in the catalog.
344 ///
345 /// # Parameters
346 ///
347 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
348 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
349 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
350 /// greater than or equal to this value will be consolidated. If None, all files
351 /// from the beginning of time will be considered.
352 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
353 /// less than or equal to this value will be consolidated. If None, all files
354 /// up to the end of time will be considered.
355 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
356 /// If false, uses actual data timestamps for file naming.
357 ///
358 /// # Returns
359 ///
360 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
361 ///
362 /// # Errors
363 ///
364 /// This function will return an error if:
365 /// - Directory listing fails.
366 /// - Data type extraction from path fails.
367 /// - Period-based consolidation operations fail.
368 ///
369 /// # Notes
370 ///
371 /// - This operation can be resource-intensive for large catalogs with many data types.
372 /// and instruments.
373 /// - The consolidation process splits data into fixed time periods rather than combining.
374 /// all files into a single file per directory.
375 /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
376 /// - Original files are removed and replaced with period-based consolidated files.
377 /// - This method is useful for periodic maintenance of the catalog to standardize.
378 /// file organization by time periods.
379 ///
380 /// # Examples
381 ///
382 /// ```rust,no_run
383 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
384 /// use nautilus_core::UnixNanos;
385 ///
386 /// let catalog = ParquetDataCatalog::new(/* ... */);
387 ///
388 /// // Consolidate all files in the catalog by 1-day periods
389 /// catalog.consolidate_catalog_by_period(
390 /// Some(86400000000000), // 1 day in nanoseconds
391 /// None,
392 /// None,
393 /// Some(true)
394 /// )?;
395 ///
396 /// // Consolidate only files within a specific time range by 1-hour periods
397 /// catalog.consolidate_catalog_by_period(
398 /// Some(3600000000000), // 1 hour in nanoseconds
399 /// Some(UnixNanos::from(1609459200000000000)),
400 /// Some(UnixNanos::from(1609545600000000000)),
401 /// Some(false)
402 /// )?;
403 /// # Ok::<(), anyhow::Error>(())
404 /// ```
405 pub fn consolidate_catalog_by_period(
406 &mut self,
407 period_nanos: Option<u64>,
408 start: Option<UnixNanos>,
409 end: Option<UnixNanos>,
410 ensure_contiguous_files: Option<bool>,
411 ) -> Result<()> {
412 let leaf_directories = self.find_leaf_data_directories()?;
413
414 for directory in leaf_directories {
415 let (data_cls, identifier) =
416 self.extract_data_cls_and_identifier_from_path(&directory)?;
417
418 if let Some(data_cls_name) = data_cls {
419 // Use match statement to call the generic consolidate_data_by_period for various types
420 match data_cls_name.as_str() {
421 "quotes" => {
422 use nautilus_model::data::QuoteTick;
423 self.consolidate_data_by_period_generic::<QuoteTick>(
424 identifier,
425 period_nanos,
426 start,
427 end,
428 ensure_contiguous_files,
429 )?;
430 }
431 "trades" => {
432 use nautilus_model::data::TradeTick;
433 self.consolidate_data_by_period_generic::<TradeTick>(
434 identifier,
435 period_nanos,
436 start,
437 end,
438 ensure_contiguous_files,
439 )?;
440 }
441 "order_book_deltas" => {
442 use nautilus_model::data::OrderBookDelta;
443 self.consolidate_data_by_period_generic::<OrderBookDelta>(
444 identifier,
445 period_nanos,
446 start,
447 end,
448 ensure_contiguous_files,
449 )?;
450 }
451 "order_book_depths" => {
452 use nautilus_model::data::OrderBookDepth10;
453 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
454 identifier,
455 period_nanos,
456 start,
457 end,
458 ensure_contiguous_files,
459 )?;
460 }
461 "bars" => {
462 use nautilus_model::data::Bar;
463 self.consolidate_data_by_period_generic::<Bar>(
464 identifier,
465 period_nanos,
466 start,
467 end,
468 ensure_contiguous_files,
469 )?;
470 }
471 "index_prices" => {
472 use nautilus_model::data::IndexPriceUpdate;
473 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
474 identifier,
475 period_nanos,
476 start,
477 end,
478 ensure_contiguous_files,
479 )?;
480 }
481 "mark_prices" => {
482 use nautilus_model::data::MarkPriceUpdate;
483 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
484 identifier,
485 period_nanos,
486 start,
487 end,
488 ensure_contiguous_files,
489 )?;
490 }
491 "instrument_closes" => {
492 use nautilus_model::data::close::InstrumentClose;
493 self.consolidate_data_by_period_generic::<InstrumentClose>(
494 identifier,
495 period_nanos,
496 start,
497 end,
498 ensure_contiguous_files,
499 )?;
500 }
501 _ => {
502 // Skip unknown data types
503 log::warn!("Unknown data type for consolidation: {data_cls_name}");
504 continue;
505 }
506 }
507 }
508 }
509
510 Ok(())
511 }
512
513 /// Extracts data class and identifier from a directory path.
514 ///
515 /// This method parses a directory path to extract the data type and optional
516 /// instrument identifier. It's used to determine what type of data consolidation
517 /// to perform for each directory.
518 ///
519 /// # Parameters
520 ///
521 /// - `path`: The directory path to parse.
522 ///
523 /// # Returns
524 ///
525 /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
526 pub fn extract_data_cls_and_identifier_from_path(
527 &self,
528 path: &str,
529 ) -> Result<(Option<String>, Option<String>)> {
530 // Split the path and look for the data directory structure
531 let path_parts: Vec<&str> = path.split('/').collect();
532
533 // Find the "data" directory in the path
534 if let Some(data_index) = path_parts.iter().position(|&part| part == "data")
535 && data_index + 1 < path_parts.len()
536 {
537 let data_cls = path_parts[data_index + 1].to_string();
538
539 // Check if there's an identifier (instrument ID) after the data class
540 let identifier = if data_index + 2 < path_parts.len() {
541 Some(path_parts[data_index + 2].to_string())
542 } else {
543 None
544 };
545
546 return Ok((Some(data_cls), identifier));
547 }
548
549 // If we can't parse the path, return None for both
550 Ok((None, None))
551 }
552
553 /// Consolidates data files by splitting them into fixed time periods.
554 ///
555 /// This method queries data by period and writes consolidated files immediately,
556 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
557 /// the function automatically splits those files to preserve all data.
558 ///
559 /// # Parameters
560 ///
561 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
562 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
563 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
564 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
565 /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
566 /// If specified and intersects existing files, those files will be split to preserve
567 /// data outside the consolidation range.
568 /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
569 /// If specified and intersects existing files, those files will be split to preserve
570 /// data outside the consolidation range.
571 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
572 /// If false, uses actual data timestamps for file naming.
573 ///
574 /// # Returns
575 ///
576 /// Returns `Ok(())` on success, or an error if consolidation fails.
577 ///
578 /// # Errors
579 ///
580 /// This function will return an error if:
581 /// - The directory path cannot be constructed.
582 /// - File operations fail.
583 /// - Data querying or writing fails.
584 ///
585 /// # Notes
586 ///
587 /// - Uses two-phase approach: first determines all queries, then executes them.
588 /// - Groups intervals into contiguous groups to preserve holes between groups.
589 /// - Allows consolidation across multiple files within each contiguous group.
590 /// - Skips queries if target files already exist for efficiency.
591 /// - Original files are removed immediately after querying each period.
592 /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
593 /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
594 /// - Uses modulo arithmetic for efficient period boundary calculation.
595 /// - Preserves holes in data by preventing queries from spanning across gaps.
596 /// - Automatically splits files at start/end boundaries to preserve all data.
597 /// - Split operations are executed before consolidation to ensure data preservation.
598 ///
599 /// # Examples
600 ///
601 /// ```rust,no_run
602 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
603 /// use nautilus_core::UnixNanos;
604 ///
605 /// let catalog = ParquetDataCatalog::new(/* ... */);
606 ///
607 /// // Consolidate all quote files by 1-day periods
608 /// catalog.consolidate_data_by_period(
609 /// "quotes",
610 /// None,
611 /// Some(86400000000000), // 1 day in nanoseconds
612 /// None,
613 /// None,
614 /// Some(true)
615 /// )?;
616 ///
617 /// // Consolidate specific instrument by 1-hour periods
618 /// catalog.consolidate_data_by_period(
619 /// "trades",
620 /// Some("BTCUSD".to_string()),
621 /// Some(3600000000000), // 1 hour in nanoseconds
622 /// Some(UnixNanos::from(1609459200000000000)),
623 /// Some(UnixNanos::from(1609545600000000000)),
624 /// Some(false)
625 /// )?;
626 /// # Ok::<(), anyhow::Error>(())
627 /// ```
628 pub fn consolidate_data_by_period(
629 &mut self,
630 type_name: &str,
631 identifier: Option<String>,
632 period_nanos: Option<u64>,
633 start: Option<UnixNanos>,
634 end: Option<UnixNanos>,
635 ensure_contiguous_files: Option<bool>,
636 ) -> Result<()> {
637 // Use match statement to call the generic consolidate_data_by_period for various types
638 match type_name {
639 "quotes" => {
640 use nautilus_model::data::QuoteTick;
641 self.consolidate_data_by_period_generic::<QuoteTick>(
642 identifier,
643 period_nanos,
644 start,
645 end,
646 ensure_contiguous_files,
647 )?;
648 }
649 "trades" => {
650 use nautilus_model::data::TradeTick;
651 self.consolidate_data_by_period_generic::<TradeTick>(
652 identifier,
653 period_nanos,
654 start,
655 end,
656 ensure_contiguous_files,
657 )?;
658 }
659 "order_book_deltas" => {
660 use nautilus_model::data::OrderBookDelta;
661 self.consolidate_data_by_period_generic::<OrderBookDelta>(
662 identifier,
663 period_nanos,
664 start,
665 end,
666 ensure_contiguous_files,
667 )?;
668 }
669 "order_book_depths" => {
670 use nautilus_model::data::OrderBookDepth10;
671 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
672 identifier,
673 period_nanos,
674 start,
675 end,
676 ensure_contiguous_files,
677 )?;
678 }
679 "bars" => {
680 use nautilus_model::data::Bar;
681 self.consolidate_data_by_period_generic::<Bar>(
682 identifier,
683 period_nanos,
684 start,
685 end,
686 ensure_contiguous_files,
687 )?;
688 }
689 "index_prices" => {
690 use nautilus_model::data::IndexPriceUpdate;
691 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
692 identifier,
693 period_nanos,
694 start,
695 end,
696 ensure_contiguous_files,
697 )?;
698 }
699 "mark_prices" => {
700 use nautilus_model::data::MarkPriceUpdate;
701 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
702 identifier,
703 period_nanos,
704 start,
705 end,
706 ensure_contiguous_files,
707 )?;
708 }
709 "instrument_closes" => {
710 use nautilus_model::data::close::InstrumentClose;
711 self.consolidate_data_by_period_generic::<InstrumentClose>(
712 identifier,
713 period_nanos,
714 start,
715 end,
716 ensure_contiguous_files,
717 )?;
718 }
719 _ => {
720 anyhow::bail!("Unknown data type for consolidation: {}", type_name);
721 }
722 }
723
724 Ok(())
725 }
726
727 /// Generic consolidate data files by splitting them into fixed time periods.
728 ///
729 /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
730 /// to ensure compile-time correctness and enable reuse across different data types.
731 ///
732 /// # Type Parameters
733 ///
734 /// - `T`: The data type to consolidate, must implement required traits for serialization.
735 ///
736 /// # Parameters
737 ///
738 /// - `identifier`: Optional instrument ID to target a specific instrument's data.
739 /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
740 /// - `start`: Optional start timestamp for consolidation range.
741 /// - `end`: Optional end timestamp for consolidation range.
742 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
743 ///
744 /// # Returns
745 ///
746 /// Returns `Ok(())` on success, or an error if consolidation fails.
747 pub fn consolidate_data_by_period_generic<T>(
748 &mut self,
749 identifier: Option<String>,
750 period_nanos: Option<u64>,
751 start: Option<UnixNanos>,
752 end: Option<UnixNanos>,
753 ensure_contiguous_files: Option<bool>,
754 ) -> Result<()>
755 where
756 T: DecodeDataFromRecordBatch
757 + CatalogPathPrefix
758 + EncodeToRecordBatch
759 + HasTsInit
760 + TryFrom<Data>
761 + Clone,
762 {
763 let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
764 let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
765
766 // Use get_intervals for cleaner implementation
767 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
768
769 if intervals.is_empty() {
770 return Ok(()); // No files to consolidate
771 }
772
773 // Use auxiliary function to prepare all queries for execution
774 let queries_to_execute = self.prepare_consolidation_queries(
775 T::path_prefix(),
776 identifier.clone(),
777 &intervals,
778 period_nanos,
779 start,
780 end,
781 ensure_contiguous_files,
782 )?;
783
784 if queries_to_execute.is_empty() {
785 return Ok(()); // No queries to execute
786 }
787
788 // Get directory for file operations
789 let directory = self.make_path(T::path_prefix(), identifier.clone())?;
790 let mut existing_files = self.list_parquet_files(&directory)?;
791 existing_files.sort();
792
793 // Track files to remove and maintain existing_files list
794 let mut files_to_remove = HashSet::new();
795 let original_files_count = existing_files.len();
796
797 // Phase 2: Execute queries, write, and delete
798 let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
799
800 for query_info in queries_to_execute {
801 // Query data for this period using query_typed_data
802 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
803
804 let period_data = self.query_typed_data::<T>(
805 instrument_ids,
806 Some(UnixNanos::from(query_info.query_start)),
807 Some(UnixNanos::from(query_info.query_end)),
808 None,
809 Some(existing_files.clone()),
810 )?;
811
812 if period_data.is_empty() {
813 // Skip if no data found, but maintain contiguity by using query start
814 if file_start_ns.is_none() {
815 file_start_ns = Some(query_info.query_start);
816 }
817 continue;
818 }
819 file_start_ns = None;
820
821 // Determine final file timestamps
822 let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
823 // Use period boundaries for file naming, maintaining contiguity
824 if file_start_ns.is_none() {
825 file_start_ns = Some(query_info.query_start);
826 }
827 (file_start_ns.unwrap(), query_info.query_end)
828 } else {
829 // Use actual data timestamps for file naming
830 let first_ts = period_data.first().unwrap().ts_init().as_u64();
831 let last_ts = period_data.last().unwrap().ts_init().as_u64();
832 (first_ts, last_ts)
833 };
834
835 // Check again if target file exists (in case it was created during this process)
836 let target_filename = format!(
837 "{}/{}",
838 directory,
839 timestamps_to_filename(
840 UnixNanos::from(final_start_ns),
841 UnixNanos::from(final_end_ns)
842 )
843 );
844
845 if self.file_exists(&target_filename)? {
846 // Skip if target file already exists
847 continue;
848 }
849
850 // Write consolidated data for this period using write_to_parquet
851 // Use skip_disjoint_check since we're managing file removal carefully
852 let start_ts = UnixNanos::from(final_start_ns);
853 let end_ts = UnixNanos::from(final_end_ns);
854 self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
855
856 // Identify files that are completely covered by this period
857 // Only remove files AFTER successfully writing a new file
858 // Use slice copy to avoid modification during iteration (match Python logic)
859 for file in existing_files.clone() {
860 if let Some(interval) = parse_filename_timestamps(&file)
861 && interval.1 <= query_info.query_end
862 {
863 files_to_remove.insert(file.clone());
864 existing_files.retain(|f| f != &file);
865 }
866 }
867
868 // Remove files as soon as we have some to remove
869 if !files_to_remove.is_empty() {
870 for file in files_to_remove.drain() {
871 self.delete_file(&file)?;
872 }
873 }
874 }
875
876 // Remove any remaining files that weren't removed in the loop
877 // This matches the Python implementation's final cleanup step
878 // Only remove files if any consolidation actually happened (i.e., files were processed)
879 let files_were_processed = existing_files.len() < original_files_count;
880 if files_were_processed {
881 for file in existing_files {
882 self.delete_file(&file)?;
883 }
884 }
885
886 Ok(())
887 }
888
889 /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
890 ///
891 /// This auxiliary function handles all the preparation logic for consolidation:
892 /// 1. Filters intervals by time range.
893 /// 2. Groups intervals into contiguous groups.
894 /// 3. Identifies and creates split operations for data preservation.
895 /// 4. Generates period-based consolidation queries.
896 /// 5. Checks for existing target files.
897 #[allow(clippy::too_many_arguments)]
898 pub fn prepare_consolidation_queries(
899 &self,
900 type_name: &str,
901 identifier: Option<String>,
902 intervals: &[(u64, u64)],
903 period_nanos: u64,
904 start: Option<UnixNanos>,
905 end: Option<UnixNanos>,
906 ensure_contiguous_files: bool,
907 ) -> Result<Vec<ConsolidationQuery>> {
908 // Filter intervals by time range if specified
909 let used_start = start.map(|s| s.as_u64());
910 let used_end = end.map(|e| e.as_u64());
911
912 let mut filtered_intervals = Vec::new();
913 for &(interval_start, interval_end) in intervals {
914 // Check if interval overlaps with the specified range
915 if (used_start.is_none() || used_start.unwrap() <= interval_end)
916 && (used_end.is_none() || interval_start <= used_end.unwrap())
917 {
918 filtered_intervals.push((interval_start, interval_end));
919 }
920 }
921
922 if filtered_intervals.is_empty() {
923 return Ok(Vec::new()); // No intervals in the specified range
924 }
925
926 // Check contiguity of filtered intervals if required
927 if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
928 anyhow::bail!(
929 "Intervals are not contiguous. When ensure_contiguous_files=true, \
930 all files in the consolidation range must have contiguous timestamps."
931 );
932 }
933
934 // Group intervals into contiguous groups to preserve holes between groups
935 // but allow consolidation within each contiguous group
936 let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
937
938 let mut queries_to_execute = Vec::new();
939
940 // Handle interval splitting by creating split operations for data preservation
941 if !filtered_intervals.is_empty() {
942 if let Some(start_ts) = used_start {
943 let first_interval = filtered_intervals[0];
944 if first_interval.0 < start_ts && start_ts <= first_interval.1 {
945 // Split before start: preserve data from interval_start to start-1
946 queries_to_execute.push(ConsolidationQuery {
947 query_start: first_interval.0,
948 query_end: start_ts - 1,
949 use_period_boundaries: false,
950 });
951 }
952 }
953
954 if let Some(end_ts) = used_end {
955 let last_interval = filtered_intervals[filtered_intervals.len() - 1];
956 if last_interval.0 <= end_ts && end_ts < last_interval.1 {
957 // Split after end: preserve data from end+1 to interval_end
958 queries_to_execute.push(ConsolidationQuery {
959 query_start: end_ts + 1,
960 query_end: last_interval.1,
961 use_period_boundaries: false,
962 });
963 }
964 }
965 }
966
967 // Generate period-based consolidation queries for each contiguous group
968 for group in contiguous_groups {
969 let group_start = group[0].0;
970 let group_end = group[group.len() - 1].1;
971
972 // Apply start/end filtering to the group
973 let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
974 let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
975
976 if effective_start > effective_end {
977 continue; // Skip if no overlap
978 }
979
980 // Generate period-based queries within this contiguous group
981 let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
982
983 // Add safety check to prevent infinite loops (match Python logic)
984 let max_iterations = 10000;
985 let mut iteration_count = 0;
986
987 while current_start_ns <= effective_end {
988 iteration_count += 1;
989 if iteration_count > max_iterations {
990 // Safety break to prevent infinite loops
991 break;
992 }
993 let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
994
995 // Check if target file already exists (only when ensure_contiguous_files is true)
996 if ensure_contiguous_files {
997 let directory = self.make_path(type_name, identifier.clone())?;
998 let target_filename = format!(
999 "{}/{}",
1000 directory,
1001 timestamps_to_filename(
1002 UnixNanos::from(current_start_ns),
1003 UnixNanos::from(current_end_ns)
1004 )
1005 );
1006
1007 if self.file_exists(&target_filename)? {
1008 // Skip if target file already exists
1009 current_start_ns += period_nanos;
1010 continue;
1011 }
1012 }
1013
1014 // Add query to execution list
1015 queries_to_execute.push(ConsolidationQuery {
1016 query_start: current_start_ns,
1017 query_end: current_end_ns,
1018 use_period_boundaries: ensure_contiguous_files,
1019 });
1020
1021 // Move to next period
1022 current_start_ns += period_nanos;
1023
1024 if current_start_ns > effective_end {
1025 break;
1026 }
1027 }
1028 }
1029
1030 // Sort queries by start date to enable efficient file removal
1031 // Files can be removed when interval[1] <= query_info["query_end"]
1032 // and processing in chronological order ensures optimal cleanup
1033 queries_to_execute.sort_by_key(|q| q.query_start);
1034
1035 Ok(queries_to_execute)
1036 }
1037
1038 /// Groups intervals into contiguous groups for efficient consolidation.
1039 ///
1040 /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1041 /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1042 /// before the start of the next interval. This grouping preserves data gaps while allowing
1043 /// consolidation within each contiguous group.
1044 ///
1045 /// # Parameters
1046 ///
1047 /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1048 ///
1049 /// # Returns
1050 ///
1051 /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1052 /// Returns an empty vector if the input is empty.
1053 ///
1054 /// # Algorithm
1055 ///
1056 /// 1. Starts with the first interval in a new group.
1057 /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1058 /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1059 /// 4. If not contiguous, starts a new group.
1060 /// 5. Returns all groups.
1061 ///
1062 /// # Examples
1063 ///
1064 /// ```text
1065 /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1066 /// Returns: [[(1,5), (6,10), (11,15)]]
1067 ///
1068 /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1069 /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1070 /// ```
1071 ///
1072 /// # Notes
1073 ///
1074 /// - Input intervals should be sorted by start timestamp.
1075 /// - Gaps between groups are preserved and not consolidated.
1076 /// - Used internally by period-based consolidation methods.
1077 #[must_use]
1078 pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1079 if intervals.is_empty() {
1080 return Vec::new();
1081 }
1082
1083 let mut contiguous_groups = Vec::new();
1084 let mut current_group = vec![intervals[0]];
1085
1086 for i in 1..intervals.len() {
1087 let prev_interval = intervals[i - 1];
1088 let curr_interval = intervals[i];
1089
1090 // Check if current interval is contiguous with previous (end + 1 == start)
1091 if prev_interval.1 + 1 == curr_interval.0 {
1092 current_group.push(curr_interval);
1093 } else {
1094 // Gap found, start new group
1095 contiguous_groups.push(current_group);
1096 current_group = vec![curr_interval];
1097 }
1098 }
1099
1100 // Add the last group
1101 contiguous_groups.push(current_group);
1102
1103 contiguous_groups
1104 }
1105
1106 /// Checks if a file exists in the object store.
1107 ///
1108 /// This method performs a HEAD operation on the object store to determine if a file
1109 /// exists without downloading its content. It works with both local and remote object stores.
1110 ///
1111 /// # Parameters
1112 ///
1113 /// - `path`: The file path to check, relative to the catalog structure.
1114 ///
1115 /// # Returns
1116 ///
1117 /// Returns `true` if the file exists, `false` if it doesn't exist.
1118 ///
1119 /// # Errors
1120 ///
1121 /// Returns an error if the object store operation fails due to network issues,
1122 /// authentication problems, or other I/O errors.
1123 fn file_exists(&self, path: &str) -> Result<bool> {
1124 let object_path = self.to_object_path(path);
1125 let exists =
1126 self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1127 Ok(exists)
1128 }
1129
1130 /// Deletes a file from the object store.
1131 ///
1132 /// This method removes a file from the object store. The operation is permanent
1133 /// and cannot be undone. It works with both local filesystems and remote object stores.
1134 ///
1135 /// # Parameters
1136 ///
1137 /// - `path`: The file path to delete, relative to the catalog structure.
1138 ///
1139 /// # Returns
1140 ///
1141 /// Returns `Ok(())` on successful deletion.
1142 ///
1143 /// # Errors
1144 ///
1145 /// Returns an error if:
1146 /// - The file doesn't exist.
1147 /// - Permission is denied.
1148 /// - Network issues occur (for remote stores).
1149 /// - The object store operation fails.
1150 ///
1151 /// # Safety
1152 ///
1153 /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1154 fn delete_file(&self, path: &str) -> Result<()> {
1155 let object_path = self.to_object_path(path);
1156 self.execute_async(async {
1157 self.object_store
1158 .delete(&object_path)
1159 .await
1160 .map_err(anyhow::Error::from)
1161 })?;
1162 Ok(())
1163 }
1164
1165 /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1166 ///
1167 /// This method scans all leaf data directories in the catalog and renames files based on
1168 /// the actual timestamp range of their content. This is useful when files have been
1169 /// modified or when filename conventions have changed.
1170 ///
1171 /// # Returns
1172 ///
1173 /// Returns `Ok(())` on success, or an error if the operation fails.
1174 ///
1175 /// # Errors
1176 ///
1177 /// This function will return an error if:
1178 /// - Directory listing fails.
1179 /// - File metadata reading fails.
1180 /// - File rename operations fail.
1181 /// - Interval validation fails after renaming.
1182 ///
1183 /// # Examples
1184 ///
1185 /// ```rust,no_run
1186 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1187 ///
1188 /// let catalog = ParquetDataCatalog::new(/* ... */);
1189 ///
1190 /// // Reset all filenames in the catalog
1191 /// catalog.reset_catalog_file_names()?;
1192 /// # Ok::<(), anyhow::Error>(())
1193 /// ```
1194 pub fn reset_catalog_file_names(&self) -> Result<()> {
1195 let leaf_directories = self.find_leaf_data_directories()?;
1196
1197 for directory in leaf_directories {
1198 self.reset_file_names(&directory)?;
1199 }
1200
1201 Ok(())
1202 }
1203
1204 /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1205 ///
1206 /// This method renames files in a specific directory based on the actual timestamp
1207 /// range of their content. This is useful for correcting filenames after data
1208 /// modifications or when filename conventions have changed.
1209 ///
1210 /// # Parameters
1211 ///
1212 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1213 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1214 ///
1215 /// # Returns
1216 ///
1217 /// Returns `Ok(())` on success, or an error if the operation fails.
1218 ///
1219 /// # Errors
1220 ///
1221 /// This function will return an error if:
1222 /// - The directory path cannot be constructed.
1223 /// - File metadata reading fails.
1224 /// - File rename operations fail.
1225 /// - Interval validation fails after renaming.
1226 ///
1227 /// # Examples
1228 ///
1229 /// ```rust,no_run
1230 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1231 ///
1232 /// let catalog = ParquetDataCatalog::new(/* ... */);
1233 ///
1234 /// // Reset filenames for all quote files
1235 /// catalog.reset_data_file_names("quotes", None)?;
1236 ///
1237 /// // Reset filenames for a specific instrument's trade files
1238 /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1239 /// # Ok::<(), anyhow::Error>(())
1240 /// ```
1241 pub fn reset_data_file_names(
1242 &self,
1243 data_cls: &str,
1244 instrument_id: Option<String>,
1245 ) -> Result<()> {
1246 let directory = self.make_path(data_cls, instrument_id)?;
1247 self.reset_file_names(&directory)
1248 }
1249
1250 /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1251 ///
1252 /// This internal method scans all Parquet files in a directory, reads their metadata to
1253 /// determine the actual timestamp range of their content, and renames the files accordingly.
1254 /// This ensures that filenames accurately reflect the data they contain.
1255 ///
1256 /// # Parameters
1257 ///
1258 /// - `directory`: The directory path containing Parquet files to rename.
1259 ///
1260 /// # Returns
1261 ///
1262 /// Returns `Ok(())` on success, or an error if the operation fails.
1263 ///
1264 /// # Process
1265 ///
1266 /// 1. Lists all Parquet files in the directory
1267 /// 2. For each file, reads metadata to extract min/max timestamps
1268 /// 3. Generates a new filename based on actual timestamp range
1269 /// 4. Moves the file to the new name using object store operations
1270 /// 5. Validates that intervals remain disjoint after renaming
1271 ///
1272 /// # Errors
1273 ///
1274 /// This function will return an error if:
1275 /// - Directory listing fails.
1276 /// - Metadata reading fails for any file.
1277 /// - File move operations fail.
1278 /// - Interval validation fails after renaming.
1279 /// - Object store operations fail.
1280 ///
1281 /// # Notes
1282 ///
1283 /// - This operation can be time-consuming for directories with many files.
1284 /// - Files are processed sequentially to avoid conflicts.
1285 /// - The operation is atomic per file but not across the entire directory.
1286 fn reset_file_names(&self, directory: &str) -> Result<()> {
1287 let parquet_files = self.list_parquet_files(directory)?;
1288
1289 for file in parquet_files {
1290 let object_path = ObjectPath::from(file.as_str());
1291 let (first_ts, last_ts) = self.execute_async(async {
1292 min_max_from_parquet_metadata_object_store(
1293 self.object_store.clone(),
1294 &object_path,
1295 "ts_init",
1296 )
1297 .await
1298 })?;
1299
1300 let new_filename =
1301 timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1302 let new_file_path = format!("{directory}/{new_filename}");
1303 let new_object_path = ObjectPath::from(new_file_path);
1304
1305 self.move_file(&object_path, &new_object_path)?;
1306 }
1307
1308 let intervals = self.get_directory_intervals(directory)?;
1309
1310 if !are_intervals_disjoint(&intervals) {
1311 anyhow::bail!("Intervals are not disjoint after resetting file names");
1312 }
1313
1314 Ok(())
1315 }
1316
1317 /// Finds all leaf data directories in the catalog.
1318 ///
1319 /// A leaf directory is one that contains data files but no subdirectories.
1320 /// This method is used to identify directories that can be processed for
1321 /// consolidation or other operations.
1322 ///
1323 /// # Returns
1324 ///
1325 /// Returns a vector of directory path strings representing leaf directories,
1326 /// or an error if directory traversal fails.
1327 ///
1328 /// # Errors
1329 ///
1330 /// This function will return an error if:
1331 /// - Object store listing operations fail.
1332 /// - Directory structure cannot be analyzed.
1333 ///
1334 /// # Examples
1335 ///
1336 /// ```rust,no_run
1337 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1338 ///
1339 /// let catalog = ParquetDataCatalog::new(/* ... */);
1340 ///
1341 /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1342 /// for dir in leaf_dirs {
1343 /// println!("Found leaf directory: {}", dir);
1344 /// }
1345 /// # Ok::<(), anyhow::Error>(())
1346 /// ```
1347 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1348 let data_dir = if self.base_path.is_empty() {
1349 "data".to_string()
1350 } else {
1351 format!("{}/data", self.base_path)
1352 };
1353
1354 let leaf_dirs = self.execute_async(async {
1355 let mut all_paths = std::collections::HashSet::new();
1356 let mut directories = std::collections::HashSet::new();
1357 let mut files_in_dirs = std::collections::HashMap::new();
1358
1359 // List all objects under the data directory
1360 let prefix = ObjectPath::from(format!("{data_dir}/"));
1361 let mut stream = self.object_store.list(Some(&prefix));
1362
1363 while let Some(object) = stream.next().await {
1364 let object = object?;
1365 let path_str = object.location.to_string();
1366 all_paths.insert(path_str.clone());
1367
1368 // Extract directory path
1369 if let Some(parent) = std::path::Path::new(&path_str).parent() {
1370 let parent_str = parent.to_string_lossy().to_string();
1371 directories.insert(parent_str.clone());
1372
1373 // Track files in each directory
1374 files_in_dirs
1375 .entry(parent_str)
1376 .or_insert_with(Vec::new)
1377 .push(path_str);
1378 }
1379 }
1380
1381 // Find leaf directories (directories with files but no subdirectories)
1382 let mut leaf_dirs = Vec::new();
1383 for dir in &directories {
1384 let has_files = files_in_dirs
1385 .get(dir)
1386 .is_some_and(|files| !files.is_empty());
1387 let has_subdirs = directories
1388 .iter()
1389 .any(|d| d.starts_with(&format!("{dir}/")) && d != dir);
1390
1391 if has_files && !has_subdirs {
1392 leaf_dirs.push(dir.clone());
1393 }
1394 }
1395
1396 Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1397 })?;
1398
1399 Ok(leaf_dirs)
1400 }
1401
1402 /// Deletes data within a specified time range for a specific data type and instrument.
1403 ///
1404 /// This method identifies all parquet files that intersect with the specified time range
1405 /// and handles them appropriately:
1406 /// - Files completely within the range are deleted
1407 /// - Files partially overlapping the range are split to preserve data outside the range
1408 /// - The original intersecting files are removed after processing
1409 ///
1410 /// # Parameters
1411 ///
1412 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1413 /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1414 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1415 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1416 ///
1417 /// # Returns
1418 ///
1419 /// Returns `Ok(())` on success, or an error if deletion fails.
1420 ///
1421 /// # Errors
1422 ///
1423 /// This function will return an error if:
1424 /// - The directory path cannot be constructed.
1425 /// - File operations fail.
1426 /// - Data querying or writing fails.
1427 ///
1428 /// # Notes
1429 ///
1430 /// - This operation permanently removes data and cannot be undone.
1431 /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1432 /// - The method ensures data integrity by using atomic operations where possible.
1433 /// - Empty directories are not automatically removed after deletion.
1434 ///
1435 /// # Examples
1436 ///
1437 /// ```rust,no_run
1438 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1439 /// use nautilus_core::UnixNanos;
1440 ///
1441 /// let catalog = ParquetDataCatalog::new(/* ... */);
1442 ///
1443 /// // Delete all quote data for a specific instrument
1444 /// catalog.delete_data_range(
1445 /// "quotes",
1446 /// Some("BTCUSD".to_string()),
1447 /// None,
1448 /// None
1449 /// )?;
1450 ///
1451 /// // Delete trade data within a specific time range
1452 /// catalog.delete_data_range(
1453 /// "trades",
1454 /// None,
1455 /// Some(UnixNanos::from(1609459200000000000)),
1456 /// Some(UnixNanos::from(1609545600000000000))
1457 /// )?;
1458 /// # Ok::<(), anyhow::Error>(())
1459 /// ```
1460 pub fn delete_data_range(
1461 &mut self,
1462 type_name: &str,
1463 identifier: Option<String>,
1464 start: Option<UnixNanos>,
1465 end: Option<UnixNanos>,
1466 ) -> Result<()> {
1467 // Use match statement to call the generic delete_data_range for various types
1468 match type_name {
1469 "quotes" => {
1470 use nautilus_model::data::QuoteTick;
1471 self.delete_data_range_generic::<QuoteTick>(identifier, start, end)
1472 }
1473 "trades" => {
1474 use nautilus_model::data::TradeTick;
1475 self.delete_data_range_generic::<TradeTick>(identifier, start, end)
1476 }
1477 "bars" => {
1478 use nautilus_model::data::Bar;
1479 self.delete_data_range_generic::<Bar>(identifier, start, end)
1480 }
1481 "order_book_deltas" => {
1482 use nautilus_model::data::OrderBookDelta;
1483 self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1484 }
1485 "order_book_depth10" => {
1486 use nautilus_model::data::OrderBookDepth10;
1487 self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1488 }
1489 _ => Err(anyhow::anyhow!("Unsupported data type: {}", type_name)),
1490 }
1491 }
1492
1493 /// Deletes data within a specified time range across the entire catalog.
1494 ///
1495 /// This method identifies all leaf directories in the catalog that contain parquet files
1496 /// and deletes data within the specified time range from each directory. A leaf directory
1497 /// is one that contains files but no subdirectories. This is a convenience method that
1498 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1499 ///
1500 /// # Parameters
1501 ///
1502 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1503 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1504 ///
1505 /// # Returns
1506 ///
1507 /// Returns `Ok(())` on success, or an error if deletion fails.
1508 ///
1509 /// # Errors
1510 ///
1511 /// This function will return an error if:
1512 /// - Directory traversal fails.
1513 /// - Data class extraction from paths fails.
1514 /// - Individual delete operations fail.
1515 ///
1516 /// # Notes
1517 ///
1518 /// - This operation permanently removes data and cannot be undone.
1519 /// - The deletion process handles file intersections intelligently by splitting files
1520 /// when they partially overlap with the deletion range.
1521 /// - Files completely within the deletion range are removed entirely.
1522 /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1523 /// - This method is useful for bulk data cleanup operations across the entire catalog.
1524 /// - Empty directories are not automatically removed after deletion.
1525 ///
1526 /// # Examples
1527 ///
1528 /// ```rust,no_run
1529 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1530 /// use nautilus_core::UnixNanos;
1531 ///
1532 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1533 ///
1534 /// // Delete all data before a specific date across entire catalog
1535 /// catalog.delete_catalog_range(
1536 /// None,
1537 /// Some(UnixNanos::from(1609459200000000000))
1538 /// )?;
1539 ///
1540 /// // Delete all data within a specific range across entire catalog
1541 /// catalog.delete_catalog_range(
1542 /// Some(UnixNanos::from(1609459200000000000)),
1543 /// Some(UnixNanos::from(1609545600000000000))
1544 /// )?;
1545 ///
1546 /// // Delete all data after a specific date across entire catalog
1547 /// catalog.delete_catalog_range(
1548 /// Some(UnixNanos::from(1609459200000000000)),
1549 /// None
1550 /// )?;
1551 /// # Ok::<(), anyhow::Error>(())
1552 /// ```
1553 pub fn delete_catalog_range(
1554 &mut self,
1555 start: Option<UnixNanos>,
1556 end: Option<UnixNanos>,
1557 ) -> Result<()> {
1558 let leaf_directories = self.find_leaf_data_directories()?;
1559
1560 for directory in leaf_directories {
1561 if let Ok((Some(data_type), identifier)) =
1562 self.extract_data_cls_and_identifier_from_path(&directory)
1563 {
1564 // Call the existing delete_data_range method
1565 if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1566 eprintln!("Failed to delete data in directory {directory}: {e}");
1567 // Continue with other directories instead of failing completely
1568 }
1569 }
1570 }
1571
1572 Ok(())
1573 }
1574
1575 /// Generic implementation for deleting data within a specified time range.
1576 ///
1577 /// This method provides the core deletion logic that works with any data type
1578 /// that implements the required traits. It handles file intersection analysis,
1579 /// data splitting for partial overlaps, and file cleanup.
1580 ///
1581 /// # Type Parameters
1582 ///
1583 /// - `T`: The data type that implements required traits for catalog operations.
1584 ///
1585 /// # Parameters
1586 ///
1587 /// - `identifier`: Optional instrument ID to delete data for.
1588 /// - `start`: Optional start timestamp for the deletion range.
1589 /// - `end`: Optional end timestamp for the deletion range.
1590 ///
1591 /// # Returns
1592 ///
1593 /// Returns `Ok(())` on success, or an error if deletion fails.
1594 pub fn delete_data_range_generic<T>(
1595 &mut self,
1596 identifier: Option<String>,
1597 start: Option<UnixNanos>,
1598 end: Option<UnixNanos>,
1599 ) -> Result<()>
1600 where
1601 T: DecodeDataFromRecordBatch
1602 + CatalogPathPrefix
1603 + EncodeToRecordBatch
1604 + HasTsInit
1605 + TryFrom<Data>
1606 + Clone,
1607 {
1608 // Get intervals for cleaner implementation
1609 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1610
1611 if intervals.is_empty() {
1612 return Ok(()); // No files to process
1613 }
1614
1615 // Prepare all operations for execution
1616 let operations_to_execute = self.prepare_delete_operations(
1617 T::path_prefix(),
1618 identifier.clone(),
1619 &intervals,
1620 start,
1621 end,
1622 )?;
1623
1624 if operations_to_execute.is_empty() {
1625 return Ok(()); // No operations to execute
1626 }
1627
1628 // Execute all operations
1629 let mut files_to_remove = std::collections::HashSet::new();
1630
1631 for operation in operations_to_execute {
1632 match operation.operation_type.as_str() {
1633 "split_before" => {
1634 // Query data before the deletion range and write it
1635 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1636 let before_data = self.query_typed_data::<T>(
1637 instrument_ids,
1638 Some(UnixNanos::from(operation.query_start)),
1639 Some(UnixNanos::from(operation.query_end)),
1640 None,
1641 Some(operation.files.clone()),
1642 )?;
1643
1644 if !before_data.is_empty() {
1645 let start_ts = UnixNanos::from(operation.file_start_ns);
1646 let end_ts = UnixNanos::from(operation.file_end_ns);
1647 self.write_to_parquet(
1648 before_data,
1649 Some(start_ts),
1650 Some(end_ts),
1651 Some(true),
1652 )?;
1653 }
1654 }
1655 "split_after" => {
1656 // Query data after the deletion range and write it
1657 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1658 let after_data = self.query_typed_data::<T>(
1659 instrument_ids,
1660 Some(UnixNanos::from(operation.query_start)),
1661 Some(UnixNanos::from(operation.query_end)),
1662 None,
1663 Some(operation.files.clone()),
1664 )?;
1665
1666 if !after_data.is_empty() {
1667 let start_ts = UnixNanos::from(operation.file_start_ns);
1668 let end_ts = UnixNanos::from(operation.file_end_ns);
1669 self.write_to_parquet(
1670 after_data,
1671 Some(start_ts),
1672 Some(end_ts),
1673 Some(true),
1674 )?;
1675 }
1676 }
1677 _ => {
1678 // For "remove" operations, just mark files for removal
1679 }
1680 }
1681
1682 // Mark files for removal (applies to all operation types)
1683 for file in operation.files {
1684 files_to_remove.insert(file);
1685 }
1686 }
1687
1688 // Remove all files that were processed
1689 for file in files_to_remove {
1690 if let Err(e) = self.delete_file(&file) {
1691 eprintln!("Failed to delete file {file}: {e}");
1692 }
1693 }
1694
1695 Ok(())
1696 }
1697
1698 /// Prepares all operations for data deletion by identifying files that need to be
1699 /// split or removed.
1700 ///
1701 /// This auxiliary function handles all the preparation logic for deletion:
1702 /// 1. Filters intervals by time range
1703 /// 2. Identifies files that intersect with the deletion range
1704 /// 3. Creates split operations for files that partially overlap
1705 /// 4. Generates removal operations for files completely within the range
1706 ///
1707 /// # Parameters
1708 ///
1709 /// - `type_name`: The data type directory name for path generation.
1710 /// - `identifier`: Optional instrument identifier for path generation.
1711 /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1712 /// - `start`: Optional start timestamp for deletion range.
1713 /// - `end`: Optional end timestamp for deletion range.
1714 ///
1715 /// # Returns
1716 ///
1717 /// Returns a vector of `DeleteOperation` structs ready for execution.
1718 pub fn prepare_delete_operations(
1719 &self,
1720 type_name: &str,
1721 identifier: Option<String>,
1722 intervals: &[(u64, u64)],
1723 start: Option<UnixNanos>,
1724 end: Option<UnixNanos>,
1725 ) -> Result<Vec<DeleteOperation>> {
1726 // Convert start/end to nanoseconds
1727 let delete_start_ns = start.map(|s| s.as_u64());
1728 let delete_end_ns = end.map(|e| e.as_u64());
1729
1730 let mut operations = Vec::new();
1731
1732 // Get directory for file path construction
1733 let directory = self.make_path(type_name, identifier)?;
1734
1735 // Process each interval (which represents an actual file)
1736 for &(file_start_ns, file_end_ns) in intervals {
1737 // Check if file intersects with deletion range
1738 let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1739 && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1740
1741 if !intersects {
1742 continue; // File doesn't intersect with deletion range
1743 }
1744
1745 // Construct file path from interval timestamps
1746 let filename = timestamps_to_filename(
1747 UnixNanos::from(file_start_ns),
1748 UnixNanos::from(file_end_ns),
1749 );
1750 let file_path = format!("{directory}/{filename}");
1751
1752 // Determine what type of operation is needed
1753 let file_completely_within_range = (delete_start_ns.is_none()
1754 || delete_start_ns.unwrap() <= file_start_ns)
1755 && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1756
1757 if file_completely_within_range {
1758 // File is completely within deletion range - just mark for removal
1759 operations.push(DeleteOperation {
1760 operation_type: "remove".to_string(),
1761 files: vec![file_path],
1762 query_start: 0,
1763 query_end: 0,
1764 file_start_ns: 0,
1765 file_end_ns: 0,
1766 });
1767 } else {
1768 // File partially overlaps - need to split
1769 if let Some(delete_start) = delete_start_ns
1770 && file_start_ns < delete_start
1771 {
1772 // Keep data before deletion range
1773 operations.push(DeleteOperation {
1774 operation_type: "split_before".to_string(),
1775 files: vec![file_path.clone()],
1776 query_start: file_start_ns,
1777 query_end: delete_start.saturating_sub(1), // Exclusive end
1778 file_start_ns,
1779 file_end_ns: delete_start.saturating_sub(1),
1780 });
1781 }
1782
1783 if let Some(delete_end) = delete_end_ns
1784 && delete_end < file_end_ns
1785 {
1786 // Keep data after deletion range
1787 operations.push(DeleteOperation {
1788 operation_type: "split_after".to_string(),
1789 files: vec![file_path.clone()],
1790 query_start: delete_end.saturating_add(1), // Exclusive start
1791 query_end: file_end_ns,
1792 file_start_ns: delete_end.saturating_add(1),
1793 file_end_ns,
1794 });
1795 }
1796 }
1797 }
1798
1799 Ok(operations)
1800 }
1801}