nautilus_persistence/backend/catalog_operations.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use std::collections::HashSet;
22
23use anyhow::Result;
24use futures::StreamExt;
25use nautilus_core::UnixNanos;
26use nautilus_model::data::{Data, HasTsInit};
27use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
28use object_store::path::Path as ObjectPath;
29
30use crate::{
31 backend::catalog::{
32 CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
33 extract_path_components, make_object_store_path, parse_filename_timestamps,
34 timestamps_to_filename,
35 },
36 parquet::{
37 combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
38 },
39};
40
41/// Information about a consolidation query to be executed.
42///
43/// This struct encapsulates all the information needed to execute a single consolidation
44/// operation, including the data range to query and file naming strategy.
45///
46/// # Fields
47///
48/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
49/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
50/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
51///
52/// # Usage
53///
54/// This struct is used internally by the consolidation system to plan and execute
55/// data consolidation operations. It allows the system to:
56/// - Separate query planning from execution.
57/// - Handle complex scenarios like data splitting.
58/// - Optimize file naming strategies.
59/// - Batch multiple operations efficiently.
60/// - Maintain file contiguity across periods.
61///
62/// # Examples
63///
64/// ```rust,no_run
65/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
66///
67/// // Regular consolidation query
68/// let query = ConsolidationQuery {
69/// query_start: 1609459200000000000,
70/// query_end: 1609545600000000000,
71/// use_period_boundaries: true,
72/// };
73///
74/// // Split operation to preserve data
75/// let split_query = ConsolidationQuery {
76/// query_start: 1609459200000000000,
77/// query_end: 1609462800000000000,
78/// use_period_boundaries: false,
79/// };
80/// ```
81#[derive(Debug, Clone)]
82pub struct ConsolidationQuery {
83 /// Start timestamp for the query range (inclusive, in nanoseconds)
84 pub query_start: u64,
85 /// End timestamp for the query range (inclusive, in nanoseconds)
86 pub query_end: u64,
87 /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
88 pub use_period_boundaries: bool,
89}
90
91/// Information about a deletion operation to be executed.
92///
93/// This struct encapsulates all the information needed to execute a single deletion
94/// operation, including the type of operation and file handling details.
95#[derive(Debug, Clone)]
96pub struct DeleteOperation {
97 /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
98 pub operation_type: String,
99 /// List of files involved in this operation.
100 pub files: Vec<String>,
101 /// Start timestamp for data query (used for split operations).
102 pub query_start: u64,
103 /// End timestamp for data query (used for split operations).
104 pub query_end: u64,
105 /// Start timestamp for new file naming (used for split operations).
106 pub file_start_ns: u64,
107 /// End timestamp for new file naming (used for split operations).
108 pub file_end_ns: u64,
109}
110
111impl ParquetDataCatalog {
112 /// Consolidates all data files in the catalog.
113 ///
114 /// This method identifies all leaf directories in the catalog that contain parquet files
115 /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
116 /// This is a convenience method that effectively calls `consolidate_data` for all data types
117 /// and instrument IDs in the catalog.
118 ///
119 /// # Parameters
120 ///
121 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
122 /// greater than or equal to this value will be consolidated. If None, all files
123 /// from the beginning of time will be considered.
124 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
125 /// less than or equal to this value will be consolidated. If None, all files
126 /// up to the end of time will be considered.
127 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
128 ///
129 /// # Returns
130 ///
131 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
132 ///
133 /// # Errors
134 ///
135 /// This function will return an error if:
136 /// - Directory listing fails.
137 /// - File consolidation operations fail.
138 /// - Interval validation fails (when `ensure_contiguous_files` is true).
139 ///
140 /// # Examples
141 ///
142 /// ```rust,no_run
143 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
144 /// use nautilus_core::UnixNanos;
145 ///
146 /// let catalog = ParquetDataCatalog::new(/* ... */);
147 ///
148 /// // Consolidate all files in the catalog
149 /// catalog.consolidate_catalog(None, None, None)?;
150 ///
151 /// // Consolidate only files within a specific time range
152 /// catalog.consolidate_catalog(
153 /// Some(UnixNanos::from(1609459200000000000)),
154 /// Some(UnixNanos::from(1609545600000000000)),
155 /// Some(true)
156 /// )?;
157 /// # Ok::<(), anyhow::Error>(())
158 /// ```
159 pub fn consolidate_catalog(
160 &self,
161 start: Option<UnixNanos>,
162 end: Option<UnixNanos>,
163 ensure_contiguous_files: Option<bool>,
164 ) -> Result<()> {
165 let leaf_directories = self.find_leaf_data_directories()?;
166
167 for directory in leaf_directories {
168 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
169 }
170
171 Ok(())
172 }
173
174 /// Consolidates data files for a specific data type and instrument.
175 ///
176 /// This method consolidates Parquet files within a specific directory (defined by data type
177 /// and optional instrument ID) by merging multiple files into a single file. This improves
178 /// query performance and can reduce storage overhead.
179 ///
180 /// # Parameters
181 ///
182 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
183 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
184 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
185 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
186 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
187 ///
188 /// # Returns
189 ///
190 /// Returns `Ok(())` on success, or an error if consolidation fails.
191 ///
192 /// # Errors
193 ///
194 /// This function will return an error if:
195 /// - The directory path cannot be constructed.
196 /// - File consolidation operations fail.
197 /// - Interval validation fails (when `ensure_contiguous_files` is true).
198 ///
199 /// # Examples
200 ///
201 /// ```rust,no_run
202 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
203 /// use nautilus_core::UnixNanos;
204 ///
205 /// let catalog = ParquetDataCatalog::new(/* ... */);
206 ///
207 /// // Consolidate all quote files for a specific instrument
208 /// catalog.consolidate_data(
209 /// "quotes",
210 /// Some("BTCUSD".to_string()),
211 /// None,
212 /// None,
213 /// None
214 /// )?;
215 ///
216 /// // Consolidate trade files within a time range
217 /// catalog.consolidate_data(
218 /// "trades",
219 /// None,
220 /// Some(UnixNanos::from(1609459200000000000)),
221 /// Some(UnixNanos::from(1609545600000000000)),
222 /// Some(true)
223 /// )?;
224 /// # Ok::<(), anyhow::Error>(())
225 /// ```
226 pub fn consolidate_data(
227 &self,
228 type_name: &str,
229 instrument_id: Option<String>,
230 start: Option<UnixNanos>,
231 end: Option<UnixNanos>,
232 ensure_contiguous_files: Option<bool>,
233 ) -> Result<()> {
234 let directory = self.make_path(type_name, instrument_id)?;
235 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
236 }
237
238 /// Consolidates Parquet files within a specific directory by merging them into a single file.
239 ///
240 /// This internal method performs the actual consolidation work for a single directory.
241 /// It identifies files within the specified time range, validates their intervals,
242 /// and combines them into a single Parquet file with optimized storage.
243 ///
244 /// # Parameters
245 ///
246 /// - `directory`: The directory path containing Parquet files to consolidate.
247 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
248 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
249 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
250 ///
251 /// # Returns
252 ///
253 /// Returns `Ok(())` on success, or an error if consolidation fails.
254 ///
255 /// # Behavior
256 ///
257 /// - Skips consolidation if directory contains 1 or fewer files.
258 /// - Filters files by timestamp range if start/end are specified.
259 /// - Sorts intervals by start timestamp before consolidation.
260 /// - Creates a new file spanning the entire time range of input files.
261 /// - Validates interval disjointness after consolidation (if enabled).
262 ///
263 /// # Errors
264 ///
265 /// This function will return an error if:
266 /// - Directory listing fails.
267 /// - File combination operations fail.
268 /// - Interval validation fails (when `ensure_contiguous_files` is true).
269 /// - Object store operations fail.
270 fn consolidate_directory(
271 &self,
272 directory: &str,
273 start: Option<UnixNanos>,
274 end: Option<UnixNanos>,
275 ensure_contiguous_files: Option<bool>,
276 ) -> Result<()> {
277 let parquet_files = self.list_parquet_files(directory)?;
278
279 if parquet_files.len() <= 1 {
280 return Ok(());
281 }
282
283 let mut files_to_consolidate = Vec::new();
284 let mut intervals = Vec::new();
285 let start = start.map(|t| t.as_u64());
286 let end = end.map(|t| t.as_u64());
287
288 for file in parquet_files {
289 if let Some(interval) = parse_filename_timestamps(&file) {
290 let (interval_start, interval_end) = interval;
291 let include_file = match (start, end) {
292 (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
293 (Some(s), None) => interval_start >= s,
294 (None, Some(e)) => interval_end <= e,
295 (None, None) => true,
296 };
297
298 if include_file {
299 files_to_consolidate.push(file);
300 intervals.push(interval);
301 }
302 }
303 }
304
305 intervals.sort_by_key(|&(start, _)| start);
306
307 if !intervals.is_empty() {
308 let file_name = timestamps_to_filename(
309 UnixNanos::from(intervals[0].0),
310 UnixNanos::from(intervals.last().unwrap().1),
311 );
312 let path = make_object_store_path(directory, &[&file_name]);
313
314 // Convert string paths to ObjectPath for the function call
315 let object_paths: Vec<ObjectPath> = files_to_consolidate
316 .iter()
317 .map(|path| ObjectPath::from(path.as_str()))
318 .collect();
319
320 self.execute_async(async {
321 combine_parquet_files_from_object_store(
322 self.object_store.clone(),
323 object_paths,
324 &ObjectPath::from(path),
325 Some(self.compression),
326 Some(self.max_row_group_size),
327 )
328 .await
329 })?;
330 }
331
332 if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
333 anyhow::bail!("Intervals are not disjoint after consolidating a directory");
334 }
335
336 Ok(())
337 }
338
339 /// Consolidates all data files in the catalog by splitting them into fixed time periods.
340 ///
341 /// This method identifies all leaf directories in the catalog that contain parquet files
342 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
343 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
344 /// and instrument IDs in the catalog.
345 ///
346 /// # Parameters
347 ///
348 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
349 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
350 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
351 /// greater than or equal to this value will be consolidated. If None, all files
352 /// from the beginning of time will be considered.
353 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
354 /// less than or equal to this value will be consolidated. If None, all files
355 /// up to the end of time will be considered.
356 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
357 /// If false, uses actual data timestamps for file naming.
358 ///
359 /// # Returns
360 ///
361 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
362 ///
363 /// # Errors
364 ///
365 /// This function will return an error if:
366 /// - Directory listing fails.
367 /// - Data type extraction from path fails.
368 /// - Period-based consolidation operations fail.
369 ///
370 /// # Notes
371 ///
372 /// - This operation can be resource-intensive for large catalogs with many data types.
373 /// and instruments.
374 /// - The consolidation process splits data into fixed time periods rather than combining.
375 /// all files into a single file per directory.
376 /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
377 /// - Original files are removed and replaced with period-based consolidated files.
378 /// - This method is useful for periodic maintenance of the catalog to standardize.
379 /// file organization by time periods.
380 ///
381 /// # Examples
382 ///
383 /// ```rust,no_run
384 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
385 /// use nautilus_core::UnixNanos;
386 ///
387 /// let catalog = ParquetDataCatalog::new(/* ... */);
388 ///
389 /// // Consolidate all files in the catalog by 1-day periods
390 /// catalog.consolidate_catalog_by_period(
391 /// Some(86400000000000), // 1 day in nanoseconds
392 /// None,
393 /// None,
394 /// Some(true)
395 /// )?;
396 ///
397 /// // Consolidate only files within a specific time range by 1-hour periods
398 /// catalog.consolidate_catalog_by_period(
399 /// Some(3600000000000), // 1 hour in nanoseconds
400 /// Some(UnixNanos::from(1609459200000000000)),
401 /// Some(UnixNanos::from(1609545600000000000)),
402 /// Some(false)
403 /// )?;
404 /// # Ok::<(), anyhow::Error>(())
405 /// ```
406 pub fn consolidate_catalog_by_period(
407 &mut self,
408 period_nanos: Option<u64>,
409 start: Option<UnixNanos>,
410 end: Option<UnixNanos>,
411 ensure_contiguous_files: Option<bool>,
412 ) -> Result<()> {
413 let leaf_directories = self.find_leaf_data_directories()?;
414
415 for directory in leaf_directories {
416 let (data_cls, identifier) =
417 self.extract_data_cls_and_identifier_from_path(&directory)?;
418
419 if let Some(data_cls_name) = data_cls {
420 // Use match statement to call the generic consolidate_data_by_period for various types
421 match data_cls_name.as_str() {
422 "quotes" => {
423 use nautilus_model::data::QuoteTick;
424 self.consolidate_data_by_period_generic::<QuoteTick>(
425 identifier,
426 period_nanos,
427 start,
428 end,
429 ensure_contiguous_files,
430 )?;
431 }
432 "trades" => {
433 use nautilus_model::data::TradeTick;
434 self.consolidate_data_by_period_generic::<TradeTick>(
435 identifier,
436 period_nanos,
437 start,
438 end,
439 ensure_contiguous_files,
440 )?;
441 }
442 "order_book_deltas" => {
443 use nautilus_model::data::OrderBookDelta;
444 self.consolidate_data_by_period_generic::<OrderBookDelta>(
445 identifier,
446 period_nanos,
447 start,
448 end,
449 ensure_contiguous_files,
450 )?;
451 }
452 "order_book_depths" => {
453 use nautilus_model::data::OrderBookDepth10;
454 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
455 identifier,
456 period_nanos,
457 start,
458 end,
459 ensure_contiguous_files,
460 )?;
461 }
462 "bars" => {
463 use nautilus_model::data::Bar;
464 self.consolidate_data_by_period_generic::<Bar>(
465 identifier,
466 period_nanos,
467 start,
468 end,
469 ensure_contiguous_files,
470 )?;
471 }
472 "index_prices" => {
473 use nautilus_model::data::IndexPriceUpdate;
474 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
475 identifier,
476 period_nanos,
477 start,
478 end,
479 ensure_contiguous_files,
480 )?;
481 }
482 "mark_prices" => {
483 use nautilus_model::data::MarkPriceUpdate;
484 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
485 identifier,
486 period_nanos,
487 start,
488 end,
489 ensure_contiguous_files,
490 )?;
491 }
492 "instrument_closes" => {
493 use nautilus_model::data::close::InstrumentClose;
494 self.consolidate_data_by_period_generic::<InstrumentClose>(
495 identifier,
496 period_nanos,
497 start,
498 end,
499 ensure_contiguous_files,
500 )?;
501 }
502 _ => {
503 // Skip unknown data types
504 log::warn!("Unknown data type for consolidation: {data_cls_name}");
505 continue;
506 }
507 }
508 }
509 }
510
511 Ok(())
512 }
513
514 /// Extracts data class and identifier from a directory path.
515 ///
516 /// This method parses a directory path to extract the data type and optional
517 /// instrument identifier. It's used to determine what type of data consolidation
518 /// to perform for each directory.
519 ///
520 /// # Parameters
521 ///
522 /// - `path`: The directory path to parse.
523 ///
524 /// # Returns
525 ///
526 /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
527 pub fn extract_data_cls_and_identifier_from_path(
528 &self,
529 path: &str,
530 ) -> Result<(Option<String>, Option<String>)> {
531 // Use cross-platform path parsing
532 let path_components = extract_path_components(path);
533
534 // Find the "data" directory in the path
535 if let Some(data_index) = path_components.iter().position(|part| part == "data")
536 && data_index + 1 < path_components.len()
537 {
538 let data_cls = path_components[data_index + 1].clone();
539
540 // Check if there's an identifier (instrument ID) after the data class
541 let identifier = if data_index + 2 < path_components.len() {
542 Some(path_components[data_index + 2].clone())
543 } else {
544 None
545 };
546
547 return Ok((Some(data_cls), identifier));
548 }
549
550 // If we can't parse the path, return None for both
551 Ok((None, None))
552 }
553
554 /// Consolidates data files by splitting them into fixed time periods.
555 ///
556 /// This method queries data by period and writes consolidated files immediately,
557 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
558 /// the function automatically splits those files to preserve all data.
559 ///
560 /// # Parameters
561 ///
562 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
563 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
564 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
565 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
566 /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
567 /// If specified and intersects existing files, those files will be split to preserve
568 /// data outside the consolidation range.
569 /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
570 /// If specified and intersects existing files, those files will be split to preserve
571 /// data outside the consolidation range.
572 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
573 /// If false, uses actual data timestamps for file naming.
574 ///
575 /// # Returns
576 ///
577 /// Returns `Ok(())` on success, or an error if consolidation fails.
578 ///
579 /// # Errors
580 ///
581 /// This function will return an error if:
582 /// - The directory path cannot be constructed.
583 /// - File operations fail.
584 /// - Data querying or writing fails.
585 ///
586 /// # Notes
587 ///
588 /// - Uses two-phase approach: first determines all queries, then executes them.
589 /// - Groups intervals into contiguous groups to preserve holes between groups.
590 /// - Allows consolidation across multiple files within each contiguous group.
591 /// - Skips queries if target files already exist for efficiency.
592 /// - Original files are removed immediately after querying each period.
593 /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
594 /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
595 /// - Uses modulo arithmetic for efficient period boundary calculation.
596 /// - Preserves holes in data by preventing queries from spanning across gaps.
597 /// - Automatically splits files at start/end boundaries to preserve all data.
598 /// - Split operations are executed before consolidation to ensure data preservation.
599 ///
600 /// # Examples
601 ///
602 /// ```rust,no_run
603 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
604 /// use nautilus_core::UnixNanos;
605 ///
606 /// let catalog = ParquetDataCatalog::new(/* ... */);
607 ///
608 /// // Consolidate all quote files by 1-day periods
609 /// catalog.consolidate_data_by_period(
610 /// "quotes",
611 /// None,
612 /// Some(86400000000000), // 1 day in nanoseconds
613 /// None,
614 /// None,
615 /// Some(true)
616 /// )?;
617 ///
618 /// // Consolidate specific instrument by 1-hour periods
619 /// catalog.consolidate_data_by_period(
620 /// "trades",
621 /// Some("BTCUSD".to_string()),
622 /// Some(3600000000000), // 1 hour in nanoseconds
623 /// Some(UnixNanos::from(1609459200000000000)),
624 /// Some(UnixNanos::from(1609545600000000000)),
625 /// Some(false)
626 /// )?;
627 /// # Ok::<(), anyhow::Error>(())
628 /// ```
629 pub fn consolidate_data_by_period(
630 &mut self,
631 type_name: &str,
632 identifier: Option<String>,
633 period_nanos: Option<u64>,
634 start: Option<UnixNanos>,
635 end: Option<UnixNanos>,
636 ensure_contiguous_files: Option<bool>,
637 ) -> Result<()> {
638 // Use match statement to call the generic consolidate_data_by_period for various types
639 match type_name {
640 "quotes" => {
641 use nautilus_model::data::QuoteTick;
642 self.consolidate_data_by_period_generic::<QuoteTick>(
643 identifier,
644 period_nanos,
645 start,
646 end,
647 ensure_contiguous_files,
648 )?;
649 }
650 "trades" => {
651 use nautilus_model::data::TradeTick;
652 self.consolidate_data_by_period_generic::<TradeTick>(
653 identifier,
654 period_nanos,
655 start,
656 end,
657 ensure_contiguous_files,
658 )?;
659 }
660 "order_book_deltas" => {
661 use nautilus_model::data::OrderBookDelta;
662 self.consolidate_data_by_period_generic::<OrderBookDelta>(
663 identifier,
664 period_nanos,
665 start,
666 end,
667 ensure_contiguous_files,
668 )?;
669 }
670 "order_book_depths" => {
671 use nautilus_model::data::OrderBookDepth10;
672 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
673 identifier,
674 period_nanos,
675 start,
676 end,
677 ensure_contiguous_files,
678 )?;
679 }
680 "bars" => {
681 use nautilus_model::data::Bar;
682 self.consolidate_data_by_period_generic::<Bar>(
683 identifier,
684 period_nanos,
685 start,
686 end,
687 ensure_contiguous_files,
688 )?;
689 }
690 "index_prices" => {
691 use nautilus_model::data::IndexPriceUpdate;
692 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
693 identifier,
694 period_nanos,
695 start,
696 end,
697 ensure_contiguous_files,
698 )?;
699 }
700 "mark_prices" => {
701 use nautilus_model::data::MarkPriceUpdate;
702 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
703 identifier,
704 period_nanos,
705 start,
706 end,
707 ensure_contiguous_files,
708 )?;
709 }
710 "instrument_closes" => {
711 use nautilus_model::data::close::InstrumentClose;
712 self.consolidate_data_by_period_generic::<InstrumentClose>(
713 identifier,
714 period_nanos,
715 start,
716 end,
717 ensure_contiguous_files,
718 )?;
719 }
720 _ => {
721 anyhow::bail!("Unknown data type for consolidation: {}", type_name);
722 }
723 }
724
725 Ok(())
726 }
727
728 /// Generic consolidate data files by splitting them into fixed time periods.
729 ///
730 /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
731 /// to ensure compile-time correctness and enable reuse across different data types.
732 ///
733 /// # Type Parameters
734 ///
735 /// - `T`: The data type to consolidate, must implement required traits for serialization.
736 ///
737 /// # Parameters
738 ///
739 /// - `identifier`: Optional instrument ID to target a specific instrument's data.
740 /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
741 /// - `start`: Optional start timestamp for consolidation range.
742 /// - `end`: Optional end timestamp for consolidation range.
743 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
744 ///
745 /// # Returns
746 ///
747 /// Returns `Ok(())` on success, or an error if consolidation fails.
748 pub fn consolidate_data_by_period_generic<T>(
749 &mut self,
750 identifier: Option<String>,
751 period_nanos: Option<u64>,
752 start: Option<UnixNanos>,
753 end: Option<UnixNanos>,
754 ensure_contiguous_files: Option<bool>,
755 ) -> Result<()>
756 where
757 T: DecodeDataFromRecordBatch
758 + CatalogPathPrefix
759 + EncodeToRecordBatch
760 + HasTsInit
761 + TryFrom<Data>
762 + Clone,
763 {
764 let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
765 let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
766
767 // Use get_intervals for cleaner implementation
768 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
769
770 if intervals.is_empty() {
771 return Ok(()); // No files to consolidate
772 }
773
774 // Use auxiliary function to prepare all queries for execution
775 let queries_to_execute = self.prepare_consolidation_queries(
776 T::path_prefix(),
777 identifier.clone(),
778 &intervals,
779 period_nanos,
780 start,
781 end,
782 ensure_contiguous_files,
783 )?;
784
785 if queries_to_execute.is_empty() {
786 return Ok(()); // No queries to execute
787 }
788
789 // Get directory for file operations
790 let directory = self.make_path(T::path_prefix(), identifier.clone())?;
791 let mut existing_files = self.list_parquet_files(&directory)?;
792 existing_files.sort();
793
794 // Track files to remove and maintain existing_files list
795 let mut files_to_remove = HashSet::new();
796 let original_files_count = existing_files.len();
797
798 // Phase 2: Execute queries, write, and delete
799 let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
800
801 for query_info in queries_to_execute {
802 // Query data for this period using query_typed_data
803 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
804
805 let period_data = self.query_typed_data::<T>(
806 instrument_ids,
807 Some(UnixNanos::from(query_info.query_start)),
808 Some(UnixNanos::from(query_info.query_end)),
809 None,
810 Some(existing_files.clone()),
811 )?;
812
813 if period_data.is_empty() {
814 // Skip if no data found, but maintain contiguity by using query start
815 if file_start_ns.is_none() {
816 file_start_ns = Some(query_info.query_start);
817 }
818 continue;
819 }
820 file_start_ns = None;
821
822 // Determine final file timestamps
823 let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
824 // Use period boundaries for file naming, maintaining contiguity
825 if file_start_ns.is_none() {
826 file_start_ns = Some(query_info.query_start);
827 }
828 (file_start_ns.unwrap(), query_info.query_end)
829 } else {
830 // Use actual data timestamps for file naming
831 let first_ts = period_data.first().unwrap().ts_init().as_u64();
832 let last_ts = period_data.last().unwrap().ts_init().as_u64();
833 (first_ts, last_ts)
834 };
835
836 // Check again if target file exists (in case it was created during this process)
837 let target_filename = format!(
838 "{}/{}",
839 directory,
840 timestamps_to_filename(
841 UnixNanos::from(final_start_ns),
842 UnixNanos::from(final_end_ns)
843 )
844 );
845
846 if self.file_exists(&target_filename)? {
847 // Skip if target file already exists
848 continue;
849 }
850
851 // Write consolidated data for this period using write_to_parquet
852 // Use skip_disjoint_check since we're managing file removal carefully
853 let start_ts = UnixNanos::from(final_start_ns);
854 let end_ts = UnixNanos::from(final_end_ns);
855 self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
856
857 // Identify files that are completely covered by this period
858 // Only remove files AFTER successfully writing a new file
859 // Use slice copy to avoid modification during iteration (match Python logic)
860 for file in existing_files.clone() {
861 if let Some(interval) = parse_filename_timestamps(&file)
862 && interval.1 <= query_info.query_end
863 {
864 files_to_remove.insert(file.clone());
865 existing_files.retain(|f| f != &file);
866 }
867 }
868
869 // Remove files as soon as we have some to remove
870 if !files_to_remove.is_empty() {
871 for file in files_to_remove.drain() {
872 self.delete_file(&file)?;
873 }
874 }
875 }
876
877 // Remove any remaining files that weren't removed in the loop
878 // This matches the Python implementation's final cleanup step
879 // Only remove files if any consolidation actually happened (i.e., files were processed)
880 let files_were_processed = existing_files.len() < original_files_count;
881 if files_were_processed {
882 for file in existing_files {
883 self.delete_file(&file)?;
884 }
885 }
886
887 Ok(())
888 }
889
890 /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
891 ///
892 /// This auxiliary function handles all the preparation logic for consolidation:
893 /// 1. Filters intervals by time range.
894 /// 2. Groups intervals into contiguous groups.
895 /// 3. Identifies and creates split operations for data preservation.
896 /// 4. Generates period-based consolidation queries.
897 /// 5. Checks for existing target files.
898 #[allow(clippy::too_many_arguments)]
899 pub fn prepare_consolidation_queries(
900 &self,
901 type_name: &str,
902 identifier: Option<String>,
903 intervals: &[(u64, u64)],
904 period_nanos: u64,
905 start: Option<UnixNanos>,
906 end: Option<UnixNanos>,
907 ensure_contiguous_files: bool,
908 ) -> Result<Vec<ConsolidationQuery>> {
909 // Filter intervals by time range if specified
910 let used_start = start.map(|s| s.as_u64());
911 let used_end = end.map(|e| e.as_u64());
912
913 let mut filtered_intervals = Vec::new();
914 for &(interval_start, interval_end) in intervals {
915 // Check if interval overlaps with the specified range
916 if (used_start.is_none() || used_start.unwrap() <= interval_end)
917 && (used_end.is_none() || interval_start <= used_end.unwrap())
918 {
919 filtered_intervals.push((interval_start, interval_end));
920 }
921 }
922
923 if filtered_intervals.is_empty() {
924 return Ok(Vec::new()); // No intervals in the specified range
925 }
926
927 // Check contiguity of filtered intervals if required
928 if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
929 anyhow::bail!(
930 "Intervals are not contiguous. When ensure_contiguous_files=true, \
931 all files in the consolidation range must have contiguous timestamps."
932 );
933 }
934
935 // Group intervals into contiguous groups to preserve holes between groups
936 // but allow consolidation within each contiguous group
937 let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
938
939 let mut queries_to_execute = Vec::new();
940
941 // Handle interval splitting by creating split operations for data preservation
942 if !filtered_intervals.is_empty() {
943 if let Some(start_ts) = used_start {
944 let first_interval = filtered_intervals[0];
945 if first_interval.0 < start_ts && start_ts <= first_interval.1 {
946 // Split before start: preserve data from interval_start to start-1
947 queries_to_execute.push(ConsolidationQuery {
948 query_start: first_interval.0,
949 query_end: start_ts - 1,
950 use_period_boundaries: false,
951 });
952 }
953 }
954
955 if let Some(end_ts) = used_end {
956 let last_interval = filtered_intervals[filtered_intervals.len() - 1];
957 if last_interval.0 <= end_ts && end_ts < last_interval.1 {
958 // Split after end: preserve data from end+1 to interval_end
959 queries_to_execute.push(ConsolidationQuery {
960 query_start: end_ts + 1,
961 query_end: last_interval.1,
962 use_period_boundaries: false,
963 });
964 }
965 }
966 }
967
968 // Generate period-based consolidation queries for each contiguous group
969 for group in contiguous_groups {
970 let group_start = group[0].0;
971 let group_end = group[group.len() - 1].1;
972
973 // Apply start/end filtering to the group
974 let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
975 let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
976
977 if effective_start > effective_end {
978 continue; // Skip if no overlap
979 }
980
981 // Generate period-based queries within this contiguous group
982 let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
983
984 // Add safety check to prevent infinite loops (match Python logic)
985 let max_iterations = 10000;
986 let mut iteration_count = 0;
987
988 while current_start_ns <= effective_end {
989 iteration_count += 1;
990 if iteration_count > max_iterations {
991 // Safety break to prevent infinite loops
992 break;
993 }
994 let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
995
996 // Check if target file already exists (only when ensure_contiguous_files is true)
997 if ensure_contiguous_files {
998 let directory = self.make_path(type_name, identifier.clone())?;
999 let target_filename = format!(
1000 "{}/{}",
1001 directory,
1002 timestamps_to_filename(
1003 UnixNanos::from(current_start_ns),
1004 UnixNanos::from(current_end_ns)
1005 )
1006 );
1007
1008 if self.file_exists(&target_filename)? {
1009 // Skip if target file already exists
1010 current_start_ns += period_nanos;
1011 continue;
1012 }
1013 }
1014
1015 // Add query to execution list
1016 queries_to_execute.push(ConsolidationQuery {
1017 query_start: current_start_ns,
1018 query_end: current_end_ns,
1019 use_period_boundaries: ensure_contiguous_files,
1020 });
1021
1022 // Move to next period
1023 current_start_ns += period_nanos;
1024
1025 if current_start_ns > effective_end {
1026 break;
1027 }
1028 }
1029 }
1030
1031 // Sort queries by start date to enable efficient file removal
1032 // Files can be removed when interval[1] <= query_info["query_end"]
1033 // and processing in chronological order ensures optimal cleanup
1034 queries_to_execute.sort_by_key(|q| q.query_start);
1035
1036 Ok(queries_to_execute)
1037 }
1038
1039 /// Groups intervals into contiguous groups for efficient consolidation.
1040 ///
1041 /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1042 /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1043 /// before the start of the next interval. This grouping preserves data gaps while allowing
1044 /// consolidation within each contiguous group.
1045 ///
1046 /// # Parameters
1047 ///
1048 /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1049 ///
1050 /// # Returns
1051 ///
1052 /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1053 /// Returns an empty vector if the input is empty.
1054 ///
1055 /// # Algorithm
1056 ///
1057 /// 1. Starts with the first interval in a new group.
1058 /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1059 /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1060 /// 4. If not contiguous, starts a new group.
1061 /// 5. Returns all groups.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```text
1066 /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1067 /// Returns: [[(1,5), (6,10), (11,15)]]
1068 ///
1069 /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1070 /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1071 /// ```
1072 ///
1073 /// # Notes
1074 ///
1075 /// - Input intervals should be sorted by start timestamp.
1076 /// - Gaps between groups are preserved and not consolidated.
1077 /// - Used internally by period-based consolidation methods.
1078 #[must_use]
1079 pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1080 if intervals.is_empty() {
1081 return Vec::new();
1082 }
1083
1084 let mut contiguous_groups = Vec::new();
1085 let mut current_group = vec![intervals[0]];
1086
1087 for i in 1..intervals.len() {
1088 let prev_interval = intervals[i - 1];
1089 let curr_interval = intervals[i];
1090
1091 // Check if current interval is contiguous with previous (end + 1 == start)
1092 if prev_interval.1 + 1 == curr_interval.0 {
1093 current_group.push(curr_interval);
1094 } else {
1095 // Gap found, start new group
1096 contiguous_groups.push(current_group);
1097 current_group = vec![curr_interval];
1098 }
1099 }
1100
1101 // Add the last group
1102 contiguous_groups.push(current_group);
1103
1104 contiguous_groups
1105 }
1106
1107 /// Checks if a file exists in the object store.
1108 ///
1109 /// This method performs a HEAD operation on the object store to determine if a file
1110 /// exists without downloading its content. It works with both local and remote object stores.
1111 ///
1112 /// # Parameters
1113 ///
1114 /// - `path`: The file path to check, relative to the catalog structure.
1115 ///
1116 /// # Returns
1117 ///
1118 /// Returns `true` if the file exists, `false` if it doesn't exist.
1119 ///
1120 /// # Errors
1121 ///
1122 /// Returns an error if the object store operation fails due to network issues,
1123 /// authentication problems, or other I/O errors.
1124 fn file_exists(&self, path: &str) -> Result<bool> {
1125 let object_path = self.to_object_path(path);
1126 let exists =
1127 self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1128 Ok(exists)
1129 }
1130
1131 /// Deletes a file from the object store.
1132 ///
1133 /// This method removes a file from the object store. The operation is permanent
1134 /// and cannot be undone. It works with both local filesystems and remote object stores.
1135 ///
1136 /// # Parameters
1137 ///
1138 /// - `path`: The file path to delete, relative to the catalog structure.
1139 ///
1140 /// # Returns
1141 ///
1142 /// Returns `Ok(())` on successful deletion.
1143 ///
1144 /// # Errors
1145 ///
1146 /// Returns an error if:
1147 /// - The file doesn't exist.
1148 /// - Permission is denied.
1149 /// - Network issues occur (for remote stores).
1150 /// - The object store operation fails.
1151 ///
1152 /// # Safety
1153 ///
1154 /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1155 fn delete_file(&self, path: &str) -> Result<()> {
1156 let object_path = self.to_object_path(path);
1157 self.execute_async(async {
1158 self.object_store
1159 .delete(&object_path)
1160 .await
1161 .map_err(anyhow::Error::from)
1162 })?;
1163 Ok(())
1164 }
1165
1166 /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1167 ///
1168 /// This method scans all leaf data directories in the catalog and renames files based on
1169 /// the actual timestamp range of their content. This is useful when files have been
1170 /// modified or when filename conventions have changed.
1171 ///
1172 /// # Returns
1173 ///
1174 /// Returns `Ok(())` on success, or an error if the operation fails.
1175 ///
1176 /// # Errors
1177 ///
1178 /// This function will return an error if:
1179 /// - Directory listing fails.
1180 /// - File metadata reading fails.
1181 /// - File rename operations fail.
1182 /// - Interval validation fails after renaming.
1183 ///
1184 /// # Examples
1185 ///
1186 /// ```rust,no_run
1187 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1188 ///
1189 /// let catalog = ParquetDataCatalog::new(/* ... */);
1190 ///
1191 /// // Reset all filenames in the catalog
1192 /// catalog.reset_all_file_names()?;
1193 /// # Ok::<(), anyhow::Error>(())
1194 /// ```
1195 pub fn reset_all_file_names(&self) -> Result<()> {
1196 let leaf_directories = self.find_leaf_data_directories()?;
1197
1198 for directory in leaf_directories {
1199 self.reset_file_names(&directory)?;
1200 }
1201
1202 Ok(())
1203 }
1204
1205 /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1206 ///
1207 /// This method renames files in a specific directory based on the actual timestamp
1208 /// range of their content. This is useful for correcting filenames after data
1209 /// modifications or when filename conventions have changed.
1210 ///
1211 /// # Parameters
1212 ///
1213 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1214 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1215 ///
1216 /// # Returns
1217 ///
1218 /// Returns `Ok(())` on success, or an error if the operation fails.
1219 ///
1220 /// # Errors
1221 ///
1222 /// This function will return an error if:
1223 /// - The directory path cannot be constructed.
1224 /// - File metadata reading fails.
1225 /// - File rename operations fail.
1226 /// - Interval validation fails after renaming.
1227 ///
1228 /// # Examples
1229 ///
1230 /// ```rust,no_run
1231 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1232 ///
1233 /// let catalog = ParquetDataCatalog::new(/* ... */);
1234 ///
1235 /// // Reset filenames for all quote files
1236 /// catalog.reset_data_file_names("quotes", None)?;
1237 ///
1238 /// // Reset filenames for a specific instrument's trade files
1239 /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1240 /// # Ok::<(), anyhow::Error>(())
1241 /// ```
1242 pub fn reset_data_file_names(
1243 &self,
1244 data_cls: &str,
1245 instrument_id: Option<String>,
1246 ) -> Result<()> {
1247 let directory = self.make_path(data_cls, instrument_id)?;
1248 self.reset_file_names(&directory)
1249 }
1250
1251 /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1252 ///
1253 /// This internal method scans all Parquet files in a directory, reads their metadata to
1254 /// determine the actual timestamp range of their content, and renames the files accordingly.
1255 /// This ensures that filenames accurately reflect the data they contain.
1256 ///
1257 /// # Parameters
1258 ///
1259 /// - `directory`: The directory path containing Parquet files to rename.
1260 ///
1261 /// # Returns
1262 ///
1263 /// Returns `Ok(())` on success, or an error if the operation fails.
1264 ///
1265 /// # Process
1266 ///
1267 /// 1. Lists all Parquet files in the directory
1268 /// 2. For each file, reads metadata to extract min/max timestamps
1269 /// 3. Generates a new filename based on actual timestamp range
1270 /// 4. Moves the file to the new name using object store operations
1271 /// 5. Validates that intervals remain disjoint after renaming
1272 ///
1273 /// # Errors
1274 ///
1275 /// This function will return an error if:
1276 /// - Directory listing fails.
1277 /// - Metadata reading fails for any file.
1278 /// - File move operations fail.
1279 /// - Interval validation fails after renaming.
1280 /// - Object store operations fail.
1281 ///
1282 /// # Notes
1283 ///
1284 /// - This operation can be time-consuming for directories with many files.
1285 /// - Files are processed sequentially to avoid conflicts.
1286 /// - The operation is atomic per file but not across the entire directory.
1287 fn reset_file_names(&self, directory: &str) -> Result<()> {
1288 let parquet_files = self.list_parquet_files(directory)?;
1289
1290 for file in parquet_files {
1291 let object_path = ObjectPath::from(file.as_str());
1292 let (first_ts, last_ts) = self.execute_async(async {
1293 min_max_from_parquet_metadata_object_store(
1294 self.object_store.clone(),
1295 &object_path,
1296 "ts_init",
1297 )
1298 .await
1299 })?;
1300
1301 let new_filename =
1302 timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1303 let new_file_path = make_object_store_path(directory, &[&new_filename]);
1304 let new_object_path = ObjectPath::from(new_file_path);
1305
1306 self.move_file(&object_path, &new_object_path)?;
1307 }
1308
1309 let intervals = self.get_directory_intervals(directory)?;
1310
1311 if !are_intervals_disjoint(&intervals) {
1312 anyhow::bail!("Intervals are not disjoint after resetting file names");
1313 }
1314
1315 Ok(())
1316 }
1317
1318 /// Finds all leaf data directories in the catalog.
1319 ///
1320 /// A leaf directory is one that contains data files but no subdirectories.
1321 /// This method is used to identify directories that can be processed for
1322 /// consolidation or other operations.
1323 ///
1324 /// # Returns
1325 ///
1326 /// Returns a vector of directory path strings representing leaf directories,
1327 /// or an error if directory traversal fails.
1328 ///
1329 /// # Errors
1330 ///
1331 /// This function will return an error if:
1332 /// - Object store listing operations fail.
1333 /// - Directory structure cannot be analyzed.
1334 ///
1335 /// # Examples
1336 ///
1337 /// ```rust,no_run
1338 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1339 ///
1340 /// let catalog = ParquetDataCatalog::new(/* ... */);
1341 ///
1342 /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1343 /// for dir in leaf_dirs {
1344 /// println!("Found leaf directory: {}", dir);
1345 /// }
1346 /// # Ok::<(), anyhow::Error>(())
1347 /// ```
1348 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1349 let data_dir = make_object_store_path(&self.base_path, &["data"]);
1350
1351 let leaf_dirs = self.execute_async(async {
1352 let mut all_paths = std::collections::HashSet::new();
1353 let mut directories = std::collections::HashSet::new();
1354 let mut files_in_dirs = std::collections::HashMap::new();
1355
1356 // List all objects under the data directory
1357 let prefix = ObjectPath::from(format!("{data_dir}/"));
1358 let mut stream = self.object_store.list(Some(&prefix));
1359
1360 while let Some(object) = stream.next().await {
1361 let object = object?;
1362 let path_str = object.location.to_string();
1363 all_paths.insert(path_str.clone());
1364
1365 // Extract directory path
1366 if let Some(parent) = std::path::Path::new(&path_str).parent() {
1367 let parent_str = parent.to_string_lossy().to_string();
1368 directories.insert(parent_str.clone());
1369
1370 // Track files in each directory
1371 files_in_dirs
1372 .entry(parent_str)
1373 .or_insert_with(Vec::new)
1374 .push(path_str);
1375 }
1376 }
1377
1378 // Find leaf directories (directories with files but no subdirectories)
1379 let mut leaf_dirs = Vec::new();
1380 for dir in &directories {
1381 let has_files = files_in_dirs
1382 .get(dir)
1383 .is_some_and(|files| !files.is_empty());
1384 let has_subdirs = directories
1385 .iter()
1386 .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1387
1388 if has_files && !has_subdirs {
1389 leaf_dirs.push(dir.clone());
1390 }
1391 }
1392
1393 Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1394 })?;
1395
1396 Ok(leaf_dirs)
1397 }
1398
1399 /// Deletes data within a specified time range for a specific data type and instrument.
1400 ///
1401 /// This method identifies all parquet files that intersect with the specified time range
1402 /// and handles them appropriately:
1403 /// - Files completely within the range are deleted
1404 /// - Files partially overlapping the range are split to preserve data outside the range
1405 /// - The original intersecting files are removed after processing
1406 ///
1407 /// # Parameters
1408 ///
1409 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1410 /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1411 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1412 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1413 ///
1414 /// # Returns
1415 ///
1416 /// Returns `Ok(())` on success, or an error if deletion fails.
1417 ///
1418 /// # Errors
1419 ///
1420 /// This function will return an error if:
1421 /// - The directory path cannot be constructed.
1422 /// - File operations fail.
1423 /// - Data querying or writing fails.
1424 ///
1425 /// # Notes
1426 ///
1427 /// - This operation permanently removes data and cannot be undone.
1428 /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1429 /// - The method ensures data integrity by using atomic operations where possible.
1430 /// - Empty directories are not automatically removed after deletion.
1431 ///
1432 /// # Examples
1433 ///
1434 /// ```rust,no_run
1435 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1436 /// use nautilus_core::UnixNanos;
1437 ///
1438 /// let catalog = ParquetDataCatalog::new(/* ... */);
1439 ///
1440 /// // Delete all quote data for a specific instrument
1441 /// catalog.delete_data_range(
1442 /// "quotes",
1443 /// Some("BTCUSD".to_string()),
1444 /// None,
1445 /// None
1446 /// )?;
1447 ///
1448 /// // Delete trade data within a specific time range
1449 /// catalog.delete_data_range(
1450 /// "trades",
1451 /// None,
1452 /// Some(UnixNanos::from(1609459200000000000)),
1453 /// Some(UnixNanos::from(1609545600000000000))
1454 /// )?;
1455 /// # Ok::<(), anyhow::Error>(())
1456 /// ```
1457 pub fn delete_data_range(
1458 &mut self,
1459 type_name: &str,
1460 identifier: Option<String>,
1461 start: Option<UnixNanos>,
1462 end: Option<UnixNanos>,
1463 ) -> Result<()> {
1464 // Use match statement to call the generic delete_data_range for various types
1465 match type_name {
1466 "quotes" => {
1467 use nautilus_model::data::QuoteTick;
1468 self.delete_data_range_generic::<QuoteTick>(identifier, start, end)
1469 }
1470 "trades" => {
1471 use nautilus_model::data::TradeTick;
1472 self.delete_data_range_generic::<TradeTick>(identifier, start, end)
1473 }
1474 "bars" => {
1475 use nautilus_model::data::Bar;
1476 self.delete_data_range_generic::<Bar>(identifier, start, end)
1477 }
1478 "order_book_deltas" => {
1479 use nautilus_model::data::OrderBookDelta;
1480 self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1481 }
1482 "order_book_depth10" => {
1483 use nautilus_model::data::OrderBookDepth10;
1484 self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1485 }
1486 _ => Err(anyhow::anyhow!("Unsupported data type: {}", type_name)),
1487 }
1488 }
1489
1490 /// Deletes data within a specified time range across the entire catalog.
1491 ///
1492 /// This method identifies all leaf directories in the catalog that contain parquet files
1493 /// and deletes data within the specified time range from each directory. A leaf directory
1494 /// is one that contains files but no subdirectories. This is a convenience method that
1495 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1496 ///
1497 /// # Parameters
1498 ///
1499 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1500 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1501 ///
1502 /// # Returns
1503 ///
1504 /// Returns `Ok(())` on success, or an error if deletion fails.
1505 ///
1506 /// # Errors
1507 ///
1508 /// This function will return an error if:
1509 /// - Directory traversal fails.
1510 /// - Data class extraction from paths fails.
1511 /// - Individual delete operations fail.
1512 ///
1513 /// # Notes
1514 ///
1515 /// - This operation permanently removes data and cannot be undone.
1516 /// - The deletion process handles file intersections intelligently by splitting files
1517 /// when they partially overlap with the deletion range.
1518 /// - Files completely within the deletion range are removed entirely.
1519 /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1520 /// - This method is useful for bulk data cleanup operations across the entire catalog.
1521 /// - Empty directories are not automatically removed after deletion.
1522 ///
1523 /// # Examples
1524 ///
1525 /// ```rust,no_run
1526 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1527 /// use nautilus_core::UnixNanos;
1528 ///
1529 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1530 ///
1531 /// // Delete all data before a specific date across entire catalog
1532 /// catalog.delete_catalog_range(
1533 /// None,
1534 /// Some(UnixNanos::from(1609459200000000000))
1535 /// )?;
1536 ///
1537 /// // Delete all data within a specific range across entire catalog
1538 /// catalog.delete_catalog_range(
1539 /// Some(UnixNanos::from(1609459200000000000)),
1540 /// Some(UnixNanos::from(1609545600000000000))
1541 /// )?;
1542 ///
1543 /// // Delete all data after a specific date across entire catalog
1544 /// catalog.delete_catalog_range(
1545 /// Some(UnixNanos::from(1609459200000000000)),
1546 /// None
1547 /// )?;
1548 /// # Ok::<(), anyhow::Error>(())
1549 /// ```
1550 pub fn delete_catalog_range(
1551 &mut self,
1552 start: Option<UnixNanos>,
1553 end: Option<UnixNanos>,
1554 ) -> Result<()> {
1555 let leaf_directories = self.find_leaf_data_directories()?;
1556
1557 for directory in leaf_directories {
1558 if let Ok((Some(data_type), identifier)) =
1559 self.extract_data_cls_and_identifier_from_path(&directory)
1560 {
1561 // Call the existing delete_data_range method
1562 if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1563 eprintln!("Failed to delete data in directory {directory}: {e}");
1564 // Continue with other directories instead of failing completely
1565 }
1566 }
1567 }
1568
1569 Ok(())
1570 }
1571
1572 /// Generic implementation for deleting data within a specified time range.
1573 ///
1574 /// This method provides the core deletion logic that works with any data type
1575 /// that implements the required traits. It handles file intersection analysis,
1576 /// data splitting for partial overlaps, and file cleanup.
1577 ///
1578 /// # Type Parameters
1579 ///
1580 /// - `T`: The data type that implements required traits for catalog operations.
1581 ///
1582 /// # Parameters
1583 ///
1584 /// - `identifier`: Optional instrument ID to delete data for.
1585 /// - `start`: Optional start timestamp for the deletion range.
1586 /// - `end`: Optional end timestamp for the deletion range.
1587 ///
1588 /// # Returns
1589 ///
1590 /// Returns `Ok(())` on success, or an error if deletion fails.
1591 pub fn delete_data_range_generic<T>(
1592 &mut self,
1593 identifier: Option<String>,
1594 start: Option<UnixNanos>,
1595 end: Option<UnixNanos>,
1596 ) -> Result<()>
1597 where
1598 T: DecodeDataFromRecordBatch
1599 + CatalogPathPrefix
1600 + EncodeToRecordBatch
1601 + HasTsInit
1602 + TryFrom<Data>
1603 + Clone,
1604 {
1605 // Get intervals for cleaner implementation
1606 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1607
1608 if intervals.is_empty() {
1609 return Ok(()); // No files to process
1610 }
1611
1612 // Prepare all operations for execution
1613 let operations_to_execute = self.prepare_delete_operations(
1614 T::path_prefix(),
1615 identifier.clone(),
1616 &intervals,
1617 start,
1618 end,
1619 )?;
1620
1621 if operations_to_execute.is_empty() {
1622 return Ok(()); // No operations to execute
1623 }
1624
1625 // Execute all operations
1626 let mut files_to_remove = std::collections::HashSet::new();
1627
1628 for operation in operations_to_execute {
1629 // Reset the session before each operation to ensure fresh data is loaded
1630 // This clears any cached table registrations that might interfere with file operations
1631 self.reset_session();
1632 match operation.operation_type.as_str() {
1633 "split_before" => {
1634 // Query data before the deletion range and write it
1635 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1636 let before_data = self.query_typed_data::<T>(
1637 instrument_ids,
1638 Some(UnixNanos::from(operation.query_start)),
1639 Some(UnixNanos::from(operation.query_end)),
1640 None,
1641 Some(operation.files.clone()),
1642 )?;
1643
1644 if !before_data.is_empty() {
1645 let start_ts = UnixNanos::from(operation.file_start_ns);
1646 let end_ts = UnixNanos::from(operation.file_end_ns);
1647 self.write_to_parquet(
1648 before_data,
1649 Some(start_ts),
1650 Some(end_ts),
1651 Some(true),
1652 )?;
1653 }
1654 }
1655 "split_after" => {
1656 // Query data after the deletion range and write it
1657 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1658 let after_data = self.query_typed_data::<T>(
1659 instrument_ids,
1660 Some(UnixNanos::from(operation.query_start)),
1661 Some(UnixNanos::from(operation.query_end)),
1662 None,
1663 Some(operation.files.clone()),
1664 )?;
1665
1666 if !after_data.is_empty() {
1667 let start_ts = UnixNanos::from(operation.file_start_ns);
1668 let end_ts = UnixNanos::from(operation.file_end_ns);
1669 self.write_to_parquet(
1670 after_data,
1671 Some(start_ts),
1672 Some(end_ts),
1673 Some(true),
1674 )?;
1675 }
1676 }
1677 _ => {
1678 // For "remove" operations, just mark files for removal
1679 }
1680 }
1681
1682 // Mark files for removal (applies to all operation types)
1683 for file in operation.files {
1684 files_to_remove.insert(file);
1685 }
1686 }
1687
1688 // Remove all files that were processed
1689 for file in files_to_remove {
1690 if let Err(e) = self.delete_file(&file) {
1691 eprintln!("Failed to delete file {file}: {e}");
1692 }
1693 }
1694
1695 Ok(())
1696 }
1697
1698 /// Prepares all operations for data deletion by identifying files that need to be
1699 /// split or removed.
1700 ///
1701 /// This auxiliary function handles all the preparation logic for deletion:
1702 /// 1. Filters intervals by time range
1703 /// 2. Identifies files that intersect with the deletion range
1704 /// 3. Creates split operations for files that partially overlap
1705 /// 4. Generates removal operations for files completely within the range
1706 ///
1707 /// # Parameters
1708 ///
1709 /// - `type_name`: The data type directory name for path generation.
1710 /// - `identifier`: Optional instrument identifier for path generation.
1711 /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1712 /// - `start`: Optional start timestamp for deletion range.
1713 /// - `end`: Optional end timestamp for deletion range.
1714 ///
1715 /// # Returns
1716 ///
1717 /// Returns a vector of `DeleteOperation` structs ready for execution.
1718 pub fn prepare_delete_operations(
1719 &self,
1720 type_name: &str,
1721 identifier: Option<String>,
1722 intervals: &[(u64, u64)],
1723 start: Option<UnixNanos>,
1724 end: Option<UnixNanos>,
1725 ) -> Result<Vec<DeleteOperation>> {
1726 // Convert start/end to nanoseconds
1727 let delete_start_ns = start.map(|s| s.as_u64());
1728 let delete_end_ns = end.map(|e| e.as_u64());
1729
1730 let mut operations = Vec::new();
1731
1732 // Get directory for file path construction
1733 let directory = self.make_path(type_name, identifier)?;
1734
1735 // Process each interval (which represents an actual file)
1736 for &(file_start_ns, file_end_ns) in intervals {
1737 // Check if file intersects with deletion range
1738 let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1739 && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1740
1741 if !intersects {
1742 continue; // File doesn't intersect with deletion range
1743 }
1744
1745 // Construct file path from interval timestamps
1746 let filename = timestamps_to_filename(
1747 UnixNanos::from(file_start_ns),
1748 UnixNanos::from(file_end_ns),
1749 );
1750 let file_path = make_object_store_path(&directory, &[&filename]);
1751
1752 // Determine what type of operation is needed
1753 let file_completely_within_range = (delete_start_ns.is_none()
1754 || delete_start_ns.unwrap() <= file_start_ns)
1755 && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1756
1757 if file_completely_within_range {
1758 // File is completely within deletion range - just mark for removal
1759 operations.push(DeleteOperation {
1760 operation_type: "remove".to_string(),
1761 files: vec![file_path],
1762 query_start: 0,
1763 query_end: 0,
1764 file_start_ns: 0,
1765 file_end_ns: 0,
1766 });
1767 } else {
1768 // File partially overlaps - need to split
1769 if let Some(delete_start) = delete_start_ns
1770 && file_start_ns < delete_start
1771 {
1772 // Keep data before deletion range
1773 operations.push(DeleteOperation {
1774 operation_type: "split_before".to_string(),
1775 files: vec![file_path.clone()],
1776 query_start: file_start_ns,
1777 query_end: delete_start.saturating_sub(1), // Exclusive end
1778 file_start_ns,
1779 file_end_ns: delete_start.saturating_sub(1),
1780 });
1781 }
1782
1783 if let Some(delete_end) = delete_end_ns
1784 && delete_end < file_end_ns
1785 {
1786 // Keep data after deletion range
1787 operations.push(DeleteOperation {
1788 operation_type: "split_after".to_string(),
1789 files: vec![file_path.clone()],
1790 query_start: delete_end.saturating_add(1), // Exclusive start
1791 query_end: file_end_ns,
1792 file_start_ns: delete_end.saturating_add(1),
1793 file_end_ns,
1794 });
1795 }
1796 }
1797 }
1798
1799 Ok(operations)
1800 }
1801}