nautilus_persistence/backend/catalog_operations.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use std::collections::HashSet;
22
23use futures::StreamExt;
24use nautilus_core::UnixNanos;
25use nautilus_model::data::{
26 Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
27 QuoteTick, TradeTick, close::InstrumentClose,
28};
29use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
30use object_store::path::Path as ObjectPath;
31
32use crate::{
33 backend::catalog::{
34 CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
35 extract_path_components, make_object_store_path, parse_filename_timestamps,
36 timestamps_to_filename,
37 },
38 parquet::{
39 combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
40 },
41};
42
43/// Information about a consolidation query to be executed.
44///
45/// This struct encapsulates all the information needed to execute a single consolidation
46/// operation, including the data range to query and file naming strategy.
47///
48/// # Fields
49///
50/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
51/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
52/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
53///
54/// # Usage
55///
56/// This struct is used internally by the consolidation system to plan and execute
57/// data consolidation operations. It allows the system to:
58/// - Separate query planning from execution.
59/// - Handle complex scenarios like data splitting.
60/// - Optimize file naming strategies.
61/// - Batch multiple operations efficiently.
62/// - Maintain file contiguity across periods.
63///
64/// # Examples
65///
66/// ```rust,no_run
67/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
68///
69/// // Regular consolidation query
70/// let query = ConsolidationQuery {
71/// query_start: 1609459200000000000,
72/// query_end: 1609545600000000000,
73/// use_period_boundaries: true,
74/// };
75///
76/// // Split operation to preserve data
77/// let split_query = ConsolidationQuery {
78/// query_start: 1609459200000000000,
79/// query_end: 1609462800000000000,
80/// use_period_boundaries: false,
81/// };
82/// ```
83#[derive(Debug, Clone)]
84pub struct ConsolidationQuery {
85 /// Start timestamp for the query range (inclusive, in nanoseconds)
86 pub query_start: u64,
87 /// End timestamp for the query range (inclusive, in nanoseconds)
88 pub query_end: u64,
89 /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
90 pub use_period_boundaries: bool,
91}
92
93/// Information about a deletion operation to be executed.
94///
95/// This struct encapsulates all the information needed to execute a single deletion
96/// operation, including the type of operation and file handling details.
97#[derive(Debug, Clone)]
98pub struct DeleteOperation {
99 /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
100 pub operation_type: String,
101 /// List of files involved in this operation.
102 pub files: Vec<String>,
103 /// Start timestamp for data query (used for split operations).
104 pub query_start: u64,
105 /// End timestamp for data query (used for split operations).
106 pub query_end: u64,
107 /// Start timestamp for new file naming (used for split operations).
108 pub file_start_ns: u64,
109 /// End timestamp for new file naming (used for split operations).
110 pub file_end_ns: u64,
111}
112
113impl ParquetDataCatalog {
114 /// Consolidates all data files in the catalog.
115 ///
116 /// This method identifies all leaf directories in the catalog that contain parquet files
117 /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
118 /// This is a convenience method that effectively calls `consolidate_data` for all data types
119 /// and instrument IDs in the catalog.
120 ///
121 /// # Parameters
122 ///
123 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
124 /// greater than or equal to this value will be consolidated. If None, all files
125 /// from the beginning of time will be considered.
126 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
127 /// less than or equal to this value will be consolidated. If None, all files
128 /// up to the end of time will be considered.
129 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
130 ///
131 /// # Returns
132 ///
133 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
134 ///
135 /// # Errors
136 ///
137 /// Returns an error if:
138 /// - Directory listing fails.
139 /// - File consolidation operations fail.
140 /// - Interval validation fails (when `ensure_contiguous_files` is true).
141 ///
142 /// # Examples
143 ///
144 /// ```rust,no_run
145 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
146 /// use nautilus_core::UnixNanos;
147 ///
148 /// let catalog = ParquetDataCatalog::new(/* ... */);
149 ///
150 /// // Consolidate all files in the catalog
151 /// catalog.consolidate_catalog(None, None, None)?;
152 ///
153 /// // Consolidate only files within a specific time range
154 /// catalog.consolidate_catalog(
155 /// Some(UnixNanos::from(1609459200000000000)),
156 /// Some(UnixNanos::from(1609545600000000000)),
157 /// Some(true)
158 /// )?;
159 /// # Ok::<(), anyhow::Error>(())
160 /// ```
161 pub fn consolidate_catalog(
162 &self,
163 start: Option<UnixNanos>,
164 end: Option<UnixNanos>,
165 ensure_contiguous_files: Option<bool>,
166 ) -> anyhow::Result<()> {
167 let leaf_directories = self.find_leaf_data_directories()?;
168
169 for directory in leaf_directories {
170 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)?;
171 }
172
173 Ok(())
174 }
175
176 /// Consolidates data files for a specific data type and instrument.
177 ///
178 /// This method consolidates Parquet files within a specific directory (defined by data type
179 /// and optional instrument ID) by merging multiple files into a single file. This improves
180 /// query performance and can reduce storage overhead.
181 ///
182 /// # Parameters
183 ///
184 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
185 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
186 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
187 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
188 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
189 ///
190 /// # Returns
191 ///
192 /// Returns `Ok(())` on success, or an error if consolidation fails.
193 ///
194 /// # Errors
195 ///
196 /// Returns an error if:
197 /// - The directory path cannot be constructed.
198 /// - File consolidation operations fail.
199 /// - Interval validation fails (when `ensure_contiguous_files` is true).
200 ///
201 /// # Examples
202 ///
203 /// ```rust,no_run
204 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
205 /// use nautilus_core::UnixNanos;
206 ///
207 /// let catalog = ParquetDataCatalog::new(/* ... */);
208 ///
209 /// // Consolidate all quote files for a specific instrument
210 /// catalog.consolidate_data(
211 /// "quotes",
212 /// Some("BTCUSD".to_string()),
213 /// None,
214 /// None,
215 /// None
216 /// )?;
217 ///
218 /// // Consolidate trade files within a time range
219 /// catalog.consolidate_data(
220 /// "trades",
221 /// None,
222 /// Some(UnixNanos::from(1609459200000000000)),
223 /// Some(UnixNanos::from(1609545600000000000)),
224 /// Some(true)
225 /// )?;
226 /// # Ok::<(), anyhow::Error>(())
227 /// ```
228 pub fn consolidate_data(
229 &self,
230 type_name: &str,
231 instrument_id: Option<String>,
232 start: Option<UnixNanos>,
233 end: Option<UnixNanos>,
234 ensure_contiguous_files: Option<bool>,
235 ) -> anyhow::Result<()> {
236 let directory = self.make_path(type_name, instrument_id)?;
237 self.consolidate_directory(&directory, start, end, ensure_contiguous_files)
238 }
239
240 /// Consolidates Parquet files within a specific directory by merging them into a single file.
241 ///
242 /// This internal method performs the actual consolidation work for a single directory.
243 /// It identifies files within the specified time range, validates their intervals,
244 /// and combines them into a single Parquet file with optimized storage.
245 ///
246 /// # Parameters
247 ///
248 /// - `directory`: The directory path containing Parquet files to consolidate.
249 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
250 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
251 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
252 ///
253 /// # Returns
254 ///
255 /// Returns `Ok(())` on success, or an error if consolidation fails.
256 ///
257 /// # Behavior
258 ///
259 /// - Skips consolidation if directory contains 1 or fewer files.
260 /// - Filters files by timestamp range if start/end are specified.
261 /// - Sorts intervals by start timestamp before consolidation.
262 /// - Creates a new file spanning the entire time range of input files.
263 /// - Validates interval disjointness after consolidation (if enabled).
264 ///
265 /// # Errors
266 ///
267 /// Returns an error if:
268 /// - Directory listing fails.
269 /// - File combination operations fail.
270 /// - Interval validation fails (when `ensure_contiguous_files` is true).
271 /// - Object store operations fail.
272 fn consolidate_directory(
273 &self,
274 directory: &str,
275 start: Option<UnixNanos>,
276 end: Option<UnixNanos>,
277 ensure_contiguous_files: Option<bool>,
278 ) -> anyhow::Result<()> {
279 let parquet_files = self.list_parquet_files(directory)?;
280
281 if parquet_files.len() <= 1 {
282 return Ok(());
283 }
284
285 let mut files_to_consolidate = Vec::new();
286 let mut intervals = Vec::new();
287 let start = start.map(|t| t.as_u64());
288 let end = end.map(|t| t.as_u64());
289
290 for file in parquet_files {
291 if let Some(interval) = parse_filename_timestamps(&file) {
292 let (interval_start, interval_end) = interval;
293 let include_file = match (start, end) {
294 (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
295 (Some(s), None) => interval_start >= s,
296 (None, Some(e)) => interval_end <= e,
297 (None, None) => true,
298 };
299
300 if include_file {
301 files_to_consolidate.push(file);
302 intervals.push(interval);
303 }
304 }
305 }
306
307 intervals.sort_by_key(|&(start, _)| start);
308
309 if !intervals.is_empty() {
310 let file_name = timestamps_to_filename(
311 UnixNanos::from(intervals[0].0),
312 UnixNanos::from(intervals.last().unwrap().1),
313 );
314 let path = make_object_store_path(directory, &[&file_name]);
315
316 // Convert string paths to ObjectPath for the function call
317 let object_paths: Vec<ObjectPath> = files_to_consolidate
318 .iter()
319 .map(|path| ObjectPath::from(path.as_str()))
320 .collect();
321
322 self.execute_async(async {
323 combine_parquet_files_from_object_store(
324 self.object_store.clone(),
325 object_paths,
326 &ObjectPath::from(path),
327 Some(self.compression),
328 Some(self.max_row_group_size),
329 )
330 .await
331 })?;
332 }
333
334 if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
335 anyhow::bail!("Intervals are not disjoint after consolidating a directory");
336 }
337
338 Ok(())
339 }
340
341 /// Consolidates all data files in the catalog by splitting them into fixed time periods.
342 ///
343 /// This method identifies all leaf directories in the catalog that contain parquet files
344 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
345 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
346 /// and instrument IDs in the catalog.
347 ///
348 /// # Parameters
349 ///
350 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
351 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
352 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
353 /// greater than or equal to this value will be consolidated. If None, all files
354 /// from the beginning of time will be considered.
355 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
356 /// less than or equal to this value will be consolidated. If None, all files
357 /// up to the end of time will be considered.
358 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
359 /// If false, uses actual data timestamps for file naming.
360 ///
361 /// # Returns
362 ///
363 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
364 ///
365 /// # Errors
366 ///
367 /// Returns an error if:
368 /// - Directory listing fails.
369 /// - Data type extraction from path fails.
370 /// - Period-based consolidation operations fail.
371 ///
372 /// # Notes
373 ///
374 /// - This operation can be resource-intensive for large catalogs with many data types.
375 /// and instruments.
376 /// - The consolidation process splits data into fixed time periods rather than combining.
377 /// all files into a single file per directory.
378 /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
379 /// - Original files are removed and replaced with period-based consolidated files.
380 /// - This method is useful for periodic maintenance of the catalog to standardize.
381 /// file organization by time periods.
382 ///
383 /// # Examples
384 ///
385 /// ```rust,no_run
386 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
387 /// use nautilus_core::UnixNanos;
388 ///
389 /// let catalog = ParquetDataCatalog::new(/* ... */);
390 ///
391 /// // Consolidate all files in the catalog by 1-day periods
392 /// catalog.consolidate_catalog_by_period(
393 /// Some(86400000000000), // 1 day in nanoseconds
394 /// None,
395 /// None,
396 /// Some(true)
397 /// )?;
398 ///
399 /// // Consolidate only files within a specific time range by 1-hour periods
400 /// catalog.consolidate_catalog_by_period(
401 /// Some(3600000000000), // 1 hour in nanoseconds
402 /// Some(UnixNanos::from(1609459200000000000)),
403 /// Some(UnixNanos::from(1609545600000000000)),
404 /// Some(false)
405 /// )?;
406 /// # Ok::<(), anyhow::Error>(())
407 /// ```
408 pub fn consolidate_catalog_by_period(
409 &mut self,
410 period_nanos: Option<u64>,
411 start: Option<UnixNanos>,
412 end: Option<UnixNanos>,
413 ensure_contiguous_files: Option<bool>,
414 ) -> anyhow::Result<()> {
415 let leaf_directories = self.find_leaf_data_directories()?;
416
417 for directory in leaf_directories {
418 let (data_cls, identifier) =
419 self.extract_data_cls_and_identifier_from_path(&directory)?;
420
421 if let Some(data_cls_name) = data_cls {
422 // Use match statement to call the generic consolidate_data_by_period for various types
423 match data_cls_name.as_str() {
424 "quotes" => {
425 self.consolidate_data_by_period_generic::<QuoteTick>(
426 identifier,
427 period_nanos,
428 start,
429 end,
430 ensure_contiguous_files,
431 )?;
432 }
433 "trades" => {
434 self.consolidate_data_by_period_generic::<TradeTick>(
435 identifier,
436 period_nanos,
437 start,
438 end,
439 ensure_contiguous_files,
440 )?;
441 }
442 "order_book_deltas" => {
443 self.consolidate_data_by_period_generic::<OrderBookDelta>(
444 identifier,
445 period_nanos,
446 start,
447 end,
448 ensure_contiguous_files,
449 )?;
450 }
451 "order_book_depths" => {
452 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
453 identifier,
454 period_nanos,
455 start,
456 end,
457 ensure_contiguous_files,
458 )?;
459 }
460 "bars" => {
461 self.consolidate_data_by_period_generic::<Bar>(
462 identifier,
463 period_nanos,
464 start,
465 end,
466 ensure_contiguous_files,
467 )?;
468 }
469 "index_prices" => {
470 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
471 identifier,
472 period_nanos,
473 start,
474 end,
475 ensure_contiguous_files,
476 )?;
477 }
478 "mark_prices" => {
479 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
480 identifier,
481 period_nanos,
482 start,
483 end,
484 ensure_contiguous_files,
485 )?;
486 }
487 "instrument_closes" => {
488 self.consolidate_data_by_period_generic::<InstrumentClose>(
489 identifier,
490 period_nanos,
491 start,
492 end,
493 ensure_contiguous_files,
494 )?;
495 }
496 _ => {
497 // Skip unknown data types
498 log::warn!("Unknown data type for consolidation: {data_cls_name}");
499 continue;
500 }
501 }
502 }
503 }
504
505 Ok(())
506 }
507
508 /// Extracts data class and identifier from a directory path.
509 ///
510 /// This method parses a directory path to extract the data type and optional
511 /// instrument identifier. It's used to determine what type of data consolidation
512 /// to perform for each directory.
513 ///
514 /// # Parameters
515 ///
516 /// - `path`: The directory path to parse.
517 ///
518 /// # Returns
519 ///
520 /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
521 pub fn extract_data_cls_and_identifier_from_path(
522 &self,
523 path: &str,
524 ) -> anyhow::Result<(Option<String>, Option<String>)> {
525 // Use cross-platform path parsing
526 let path_components = extract_path_components(path);
527
528 // Find the "data" directory in the path
529 if let Some(data_index) = path_components.iter().position(|part| part == "data")
530 && data_index + 1 < path_components.len()
531 {
532 let data_cls = path_components[data_index + 1].clone();
533
534 // Check if there's an identifier (instrument ID) after the data class
535 let identifier = if data_index + 2 < path_components.len() {
536 Some(path_components[data_index + 2].clone())
537 } else {
538 None
539 };
540
541 return Ok((Some(data_cls), identifier));
542 }
543
544 // If we can't parse the path, return None for both
545 Ok((None, None))
546 }
547
548 /// Consolidates data files by splitting them into fixed time periods.
549 ///
550 /// This method queries data by period and writes consolidated files immediately,
551 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
552 /// the function automatically splits those files to preserve all data.
553 ///
554 /// # Parameters
555 ///
556 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
557 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
558 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
559 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
560 /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
561 /// If specified and intersects existing files, those files will be split to preserve
562 /// data outside the consolidation range.
563 /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
564 /// If specified and intersects existing files, those files will be split to preserve
565 /// data outside the consolidation range.
566 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
567 /// If false, uses actual data timestamps for file naming.
568 ///
569 /// # Returns
570 ///
571 /// Returns `Ok(())` on success, or an error if consolidation fails.
572 ///
573 /// # Errors
574 ///
575 /// Returns an error if:
576 /// - The directory path cannot be constructed.
577 /// - File operations fail.
578 /// - Data querying or writing fails.
579 ///
580 /// # Notes
581 ///
582 /// - Uses two-phase approach: first determines all queries, then executes them.
583 /// - Groups intervals into contiguous groups to preserve holes between groups.
584 /// - Allows consolidation across multiple files within each contiguous group.
585 /// - Skips queries if target files already exist for efficiency.
586 /// - Original files are removed immediately after querying each period.
587 /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
588 /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
589 /// - Uses modulo arithmetic for efficient period boundary calculation.
590 /// - Preserves holes in data by preventing queries from spanning across gaps.
591 /// - Automatically splits files at start/end boundaries to preserve all data.
592 /// - Split operations are executed before consolidation to ensure data preservation.
593 ///
594 /// # Examples
595 ///
596 /// ```rust,no_run
597 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
598 /// use nautilus_core::UnixNanos;
599 ///
600 /// let catalog = ParquetDataCatalog::new(/* ... */);
601 ///
602 /// // Consolidate all quote files by 1-day periods
603 /// catalog.consolidate_data_by_period(
604 /// "quotes",
605 /// None,
606 /// Some(86400000000000), // 1 day in nanoseconds
607 /// None,
608 /// None,
609 /// Some(true)
610 /// )?;
611 ///
612 /// // Consolidate specific instrument by 1-hour periods
613 /// catalog.consolidate_data_by_period(
614 /// "trades",
615 /// Some("BTCUSD".to_string()),
616 /// Some(3600000000000), // 1 hour in nanoseconds
617 /// Some(UnixNanos::from(1609459200000000000)),
618 /// Some(UnixNanos::from(1609545600000000000)),
619 /// Some(false)
620 /// )?;
621 /// # Ok::<(), anyhow::Error>(())
622 /// ```
623 pub fn consolidate_data_by_period(
624 &mut self,
625 type_name: &str,
626 identifier: Option<String>,
627 period_nanos: Option<u64>,
628 start: Option<UnixNanos>,
629 end: Option<UnixNanos>,
630 ensure_contiguous_files: Option<bool>,
631 ) -> anyhow::Result<()> {
632 // Use match statement to call the generic consolidate_data_by_period for various types
633 match type_name {
634 "quotes" => {
635 self.consolidate_data_by_period_generic::<QuoteTick>(
636 identifier,
637 period_nanos,
638 start,
639 end,
640 ensure_contiguous_files,
641 )?;
642 }
643 "trades" => {
644 self.consolidate_data_by_period_generic::<TradeTick>(
645 identifier,
646 period_nanos,
647 start,
648 end,
649 ensure_contiguous_files,
650 )?;
651 }
652 "order_book_deltas" => {
653 self.consolidate_data_by_period_generic::<OrderBookDelta>(
654 identifier,
655 period_nanos,
656 start,
657 end,
658 ensure_contiguous_files,
659 )?;
660 }
661 "order_book_depths" => {
662 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
663 identifier,
664 period_nanos,
665 start,
666 end,
667 ensure_contiguous_files,
668 )?;
669 }
670 "bars" => {
671 self.consolidate_data_by_period_generic::<Bar>(
672 identifier,
673 period_nanos,
674 start,
675 end,
676 ensure_contiguous_files,
677 )?;
678 }
679 "index_prices" => {
680 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
681 identifier,
682 period_nanos,
683 start,
684 end,
685 ensure_contiguous_files,
686 )?;
687 }
688 "mark_prices" => {
689 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
690 identifier,
691 period_nanos,
692 start,
693 end,
694 ensure_contiguous_files,
695 )?;
696 }
697 "instrument_closes" => {
698 self.consolidate_data_by_period_generic::<InstrumentClose>(
699 identifier,
700 period_nanos,
701 start,
702 end,
703 ensure_contiguous_files,
704 )?;
705 }
706 _ => {
707 anyhow::bail!("Unknown data type for consolidation: {}", type_name);
708 }
709 }
710
711 Ok(())
712 }
713
714 /// Generic consolidate data files by splitting them into fixed time periods.
715 ///
716 /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
717 /// to ensure compile-time correctness and enable reuse across different data types.
718 ///
719 /// # Type Parameters
720 ///
721 /// - `T`: The data type to consolidate, must implement required traits for serialization.
722 ///
723 /// # Parameters
724 ///
725 /// - `identifier`: Optional instrument ID to target a specific instrument's data.
726 /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
727 /// - `start`: Optional start timestamp for consolidation range.
728 /// - `end`: Optional end timestamp for consolidation range.
729 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
730 ///
731 /// # Returns
732 ///
733 /// Returns `Ok(())` on success, or an error if consolidation fails.
734 pub fn consolidate_data_by_period_generic<T>(
735 &mut self,
736 identifier: Option<String>,
737 period_nanos: Option<u64>,
738 start: Option<UnixNanos>,
739 end: Option<UnixNanos>,
740 ensure_contiguous_files: Option<bool>,
741 ) -> anyhow::Result<()>
742 where
743 T: DecodeDataFromRecordBatch
744 + CatalogPathPrefix
745 + EncodeToRecordBatch
746 + HasTsInit
747 + TryFrom<Data>
748 + Clone,
749 {
750 let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
751 let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
752
753 // Use get_intervals for cleaner implementation
754 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
755
756 if intervals.is_empty() {
757 return Ok(()); // No files to consolidate
758 }
759
760 // Use auxiliary function to prepare all queries for execution
761 let queries_to_execute = self.prepare_consolidation_queries(
762 T::path_prefix(),
763 identifier.clone(),
764 &intervals,
765 period_nanos,
766 start,
767 end,
768 ensure_contiguous_files,
769 )?;
770
771 if queries_to_execute.is_empty() {
772 return Ok(()); // No queries to execute
773 }
774
775 // Get directory for file operations
776 let directory = self.make_path(T::path_prefix(), identifier.clone())?;
777 let mut existing_files = self.list_parquet_files(&directory)?;
778 existing_files.sort();
779
780 // Track files to remove and maintain existing_files list
781 let mut files_to_remove = HashSet::new();
782 let original_files_count = existing_files.len();
783
784 // Phase 2: Execute queries, write, and delete
785 let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
786
787 for query_info in queries_to_execute {
788 // Query data for this period using query_typed_data
789 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
790
791 let period_data = self.query_typed_data::<T>(
792 instrument_ids,
793 Some(UnixNanos::from(query_info.query_start)),
794 Some(UnixNanos::from(query_info.query_end)),
795 None,
796 Some(existing_files.clone()),
797 )?;
798
799 if period_data.is_empty() {
800 // Skip if no data found, but maintain contiguity by using query start
801 if file_start_ns.is_none() {
802 file_start_ns = Some(query_info.query_start);
803 }
804 continue;
805 }
806 file_start_ns = None;
807
808 // Determine final file timestamps
809 let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
810 // Use period boundaries for file naming, maintaining contiguity
811 if file_start_ns.is_none() {
812 file_start_ns = Some(query_info.query_start);
813 }
814 (file_start_ns.unwrap(), query_info.query_end)
815 } else {
816 // Use actual data timestamps for file naming
817 let first_ts = period_data.first().unwrap().ts_init().as_u64();
818 let last_ts = period_data.last().unwrap().ts_init().as_u64();
819 (first_ts, last_ts)
820 };
821
822 // Check again if target file exists (in case it was created during this process)
823 let target_filename = format!(
824 "{}/{}",
825 directory,
826 timestamps_to_filename(
827 UnixNanos::from(final_start_ns),
828 UnixNanos::from(final_end_ns)
829 )
830 );
831
832 if self.file_exists(&target_filename)? {
833 // Skip if target file already exists
834 continue;
835 }
836
837 // Write consolidated data for this period using write_to_parquet
838 // Use skip_disjoint_check since we're managing file removal carefully
839 let start_ts = UnixNanos::from(final_start_ns);
840 let end_ts = UnixNanos::from(final_end_ns);
841 self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
842
843 // Identify files that are completely covered by this period
844 // Only remove files AFTER successfully writing a new file
845 // Use slice copy to avoid modification during iteration (match Python logic)
846 for file in existing_files.clone() {
847 if let Some(interval) = parse_filename_timestamps(&file)
848 && interval.1 <= query_info.query_end
849 {
850 files_to_remove.insert(file.clone());
851 existing_files.retain(|f| f != &file);
852 }
853 }
854
855 // Remove files as soon as we have some to remove
856 if !files_to_remove.is_empty() {
857 for file in files_to_remove.drain() {
858 self.delete_file(&file)?;
859 }
860 }
861 }
862
863 // Remove any remaining files that weren't removed in the loop
864 // This matches the Python implementation's final cleanup step
865 // Only remove files if any consolidation actually happened (i.e., files were processed)
866 let files_were_processed = existing_files.len() < original_files_count;
867 if files_were_processed {
868 for file in existing_files {
869 self.delete_file(&file)?;
870 }
871 }
872
873 Ok(())
874 }
875
876 /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
877 ///
878 /// This auxiliary function handles all the preparation logic for consolidation:
879 /// 1. Filters intervals by time range.
880 /// 2. Groups intervals into contiguous groups.
881 /// 3. Identifies and creates split operations for data preservation.
882 /// 4. Generates period-based consolidation queries.
883 /// 5. Checks for existing target files.
884 #[allow(clippy::too_many_arguments)]
885 pub fn prepare_consolidation_queries(
886 &self,
887 type_name: &str,
888 identifier: Option<String>,
889 intervals: &[(u64, u64)],
890 period_nanos: u64,
891 start: Option<UnixNanos>,
892 end: Option<UnixNanos>,
893 ensure_contiguous_files: bool,
894 ) -> anyhow::Result<Vec<ConsolidationQuery>> {
895 // Filter intervals by time range if specified
896 let used_start = start.map(|s| s.as_u64());
897 let used_end = end.map(|e| e.as_u64());
898
899 let mut filtered_intervals = Vec::new();
900 for &(interval_start, interval_end) in intervals {
901 // Check if interval overlaps with the specified range
902 if (used_start.is_none() || used_start.unwrap() <= interval_end)
903 && (used_end.is_none() || interval_start <= used_end.unwrap())
904 {
905 filtered_intervals.push((interval_start, interval_end));
906 }
907 }
908
909 if filtered_intervals.is_empty() {
910 return Ok(Vec::new()); // No intervals in the specified range
911 }
912
913 // Check contiguity of filtered intervals if required
914 if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
915 anyhow::bail!(
916 "Intervals are not contiguous. When ensure_contiguous_files=true, \
917 all files in the consolidation range must have contiguous timestamps."
918 );
919 }
920
921 // Group intervals into contiguous groups to preserve holes between groups
922 // but allow consolidation within each contiguous group
923 let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals);
924
925 let mut queries_to_execute = Vec::new();
926
927 // Handle interval splitting by creating split operations for data preservation
928 if !filtered_intervals.is_empty() {
929 if let Some(start_ts) = used_start {
930 let first_interval = filtered_intervals[0];
931 if first_interval.0 < start_ts && start_ts <= first_interval.1 {
932 // Split before start: preserve data from interval_start to start-1
933 queries_to_execute.push(ConsolidationQuery {
934 query_start: first_interval.0,
935 query_end: start_ts - 1,
936 use_period_boundaries: false,
937 });
938 }
939 }
940
941 if let Some(end_ts) = used_end {
942 let last_interval = filtered_intervals[filtered_intervals.len() - 1];
943 if last_interval.0 <= end_ts && end_ts < last_interval.1 {
944 // Split after end: preserve data from end+1 to interval_end
945 queries_to_execute.push(ConsolidationQuery {
946 query_start: end_ts + 1,
947 query_end: last_interval.1,
948 use_period_boundaries: false,
949 });
950 }
951 }
952 }
953
954 // Generate period-based consolidation queries for each contiguous group
955 for group in contiguous_groups {
956 let group_start = group[0].0;
957 let group_end = group[group.len() - 1].1;
958
959 // Apply start/end filtering to the group
960 let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
961 let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
962
963 if effective_start > effective_end {
964 continue; // Skip if no overlap
965 }
966
967 // Generate period-based queries within this contiguous group
968 let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
969
970 // Add safety check to prevent infinite loops (match Python logic)
971 let max_iterations = 10000;
972 let mut iteration_count = 0;
973
974 while current_start_ns <= effective_end {
975 iteration_count += 1;
976 if iteration_count > max_iterations {
977 // Safety break to prevent infinite loops
978 break;
979 }
980 let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
981
982 // Check if target file already exists (only when ensure_contiguous_files is true)
983 if ensure_contiguous_files {
984 let directory = self.make_path(type_name, identifier.clone())?;
985 let target_filename = format!(
986 "{}/{}",
987 directory,
988 timestamps_to_filename(
989 UnixNanos::from(current_start_ns),
990 UnixNanos::from(current_end_ns)
991 )
992 );
993
994 if self.file_exists(&target_filename)? {
995 // Skip if target file already exists
996 current_start_ns += period_nanos;
997 continue;
998 }
999 }
1000
1001 // Add query to execution list
1002 queries_to_execute.push(ConsolidationQuery {
1003 query_start: current_start_ns,
1004 query_end: current_end_ns,
1005 use_period_boundaries: ensure_contiguous_files,
1006 });
1007
1008 // Move to next period
1009 current_start_ns += period_nanos;
1010
1011 if current_start_ns > effective_end {
1012 break;
1013 }
1014 }
1015 }
1016
1017 // Sort queries by start date to enable efficient file removal
1018 // Files can be removed when interval[1] <= query_info["query_end"]
1019 // and processing in chronological order ensures optimal cleanup
1020 queries_to_execute.sort_by_key(|q| q.query_start);
1021
1022 Ok(queries_to_execute)
1023 }
1024
1025 /// Groups intervals into contiguous groups for efficient consolidation.
1026 ///
1027 /// This method analyzes a list of time intervals and groups them into contiguous sequences.
1028 /// Intervals are considered contiguous if the end of one interval is exactly one nanosecond
1029 /// before the start of the next interval. This grouping preserves data gaps while allowing
1030 /// consolidation within each contiguous group.
1031 ///
1032 /// # Parameters
1033 ///
1034 /// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1035 ///
1036 /// # Returns
1037 ///
1038 /// Returns a vector of groups, where each group is a vector of contiguous intervals.
1039 /// Returns an empty vector if the input is empty.
1040 ///
1041 /// # Algorithm
1042 ///
1043 /// 1. Starts with the first interval in a new group.
1044 /// 2. For each subsequent interval, checks if it's contiguous with the previous.
1045 /// 3. If contiguous (`prev_end` + 1 == `curr_start`), adds to current group.
1046 /// 4. If not contiguous, starts a new group.
1047 /// 5. Returns all groups.
1048 ///
1049 /// # Examples
1050 ///
1051 /// ```text
1052 /// Contiguous intervals: [(1,5), (6,10), (11,15)]
1053 /// Returns: [[(1,5), (6,10), (11,15)]]
1054 ///
1055 /// Non-contiguous intervals: [(1,5), (8,10), (12,15)]
1056 /// Returns: [[(1,5)], [(8,10)], [(12,15)]]
1057 /// ```
1058 ///
1059 /// # Notes
1060 ///
1061 /// - Input intervals should be sorted by start timestamp.
1062 /// - Gaps between groups are preserved and not consolidated.
1063 /// - Used internally by period-based consolidation methods.
1064 #[must_use]
1065 pub fn group_contiguous_intervals(&self, intervals: &[(u64, u64)]) -> Vec<Vec<(u64, u64)>> {
1066 if intervals.is_empty() {
1067 return Vec::new();
1068 }
1069
1070 let mut contiguous_groups = Vec::new();
1071 let mut current_group = vec![intervals[0]];
1072
1073 for i in 1..intervals.len() {
1074 let prev_interval = intervals[i - 1];
1075 let curr_interval = intervals[i];
1076
1077 // Check if current interval is contiguous with previous (end + 1 == start)
1078 if prev_interval.1 + 1 == curr_interval.0 {
1079 current_group.push(curr_interval);
1080 } else {
1081 // Gap found, start new group
1082 contiguous_groups.push(current_group);
1083 current_group = vec![curr_interval];
1084 }
1085 }
1086
1087 // Add the last group
1088 contiguous_groups.push(current_group);
1089
1090 contiguous_groups
1091 }
1092
1093 /// Checks if a file exists in the object store.
1094 ///
1095 /// This method performs a HEAD operation on the object store to determine if a file
1096 /// exists without downloading its content. It works with both local and remote object stores.
1097 ///
1098 /// # Parameters
1099 ///
1100 /// - `path`: The file path to check, relative to the catalog structure.
1101 ///
1102 /// # Returns
1103 ///
1104 /// Returns `true` if the file exists, `false` if it doesn't exist.
1105 ///
1106 /// # Errors
1107 ///
1108 /// Returns an error if the object store operation fails due to network issues,
1109 /// authentication problems, or other I/O errors.
1110 fn file_exists(&self, path: &str) -> anyhow::Result<bool> {
1111 let object_path = self.to_object_path(path);
1112 let exists =
1113 self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
1114 Ok(exists)
1115 }
1116
1117 /// Deletes a file from the object store.
1118 ///
1119 /// This method removes a file from the object store. The operation is permanent
1120 /// and cannot be undone. It works with both local filesystems and remote object stores.
1121 ///
1122 /// # Parameters
1123 ///
1124 /// - `path`: The file path to delete, relative to the catalog structure.
1125 ///
1126 /// # Returns
1127 ///
1128 /// Returns `Ok(())` on successful deletion.
1129 ///
1130 /// # Errors
1131 ///
1132 /// Returns an error if:
1133 /// - The file doesn't exist.
1134 /// - Permission is denied.
1135 /// - Network issues occur (for remote stores).
1136 /// - The object store operation fails.
1137 ///
1138 /// # Safety
1139 ///
1140 /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1141 fn delete_file(&self, path: &str) -> anyhow::Result<()> {
1142 let object_path = self.to_object_path(path);
1143 self.execute_async(async {
1144 self.object_store
1145 .delete(&object_path)
1146 .await
1147 .map_err(anyhow::Error::from)
1148 })?;
1149 Ok(())
1150 }
1151
1152 /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1153 ///
1154 /// This method scans all leaf data directories in the catalog and renames files based on
1155 /// the actual timestamp range of their content. This is useful when files have been
1156 /// modified or when filename conventions have changed.
1157 ///
1158 /// # Returns
1159 ///
1160 /// Returns `Ok(())` on success, or an error if the operation fails.
1161 ///
1162 /// # Errors
1163 ///
1164 /// Returns an error if:
1165 /// - Directory listing fails.
1166 /// - File metadata reading fails.
1167 /// - File rename operations fail.
1168 /// - Interval validation fails after renaming.
1169 ///
1170 /// # Examples
1171 ///
1172 /// ```rust,no_run
1173 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1174 ///
1175 /// let catalog = ParquetDataCatalog::new(/* ... */);
1176 ///
1177 /// // Reset all filenames in the catalog
1178 /// catalog.reset_all_file_names()?;
1179 /// # Ok::<(), anyhow::Error>(())
1180 /// ```
1181 pub fn reset_all_file_names(&self) -> anyhow::Result<()> {
1182 let leaf_directories = self.find_leaf_data_directories()?;
1183
1184 for directory in leaf_directories {
1185 self.reset_file_names(&directory)?;
1186 }
1187
1188 Ok(())
1189 }
1190
1191 /// Resets the filenames of Parquet files for a specific data type and instrument ID.
1192 ///
1193 /// This method renames files in a specific directory based on the actual timestamp
1194 /// range of their content. This is useful for correcting filenames after data
1195 /// modifications or when filename conventions have changed.
1196 ///
1197 /// # Parameters
1198 ///
1199 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1200 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1201 ///
1202 /// # Returns
1203 ///
1204 /// Returns `Ok(())` on success, or an error if the operation fails.
1205 ///
1206 /// # Errors
1207 ///
1208 /// Returns an error if:
1209 /// - The directory path cannot be constructed.
1210 /// - File metadata reading fails.
1211 /// - File rename operations fail.
1212 /// - Interval validation fails after renaming.
1213 ///
1214 /// # Examples
1215 ///
1216 /// ```rust,no_run
1217 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1218 ///
1219 /// let catalog = ParquetDataCatalog::new(/* ... */);
1220 ///
1221 /// // Reset filenames for all quote files
1222 /// catalog.reset_data_file_names("quotes", None)?;
1223 ///
1224 /// // Reset filenames for a specific instrument's trade files
1225 /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1226 /// # Ok::<(), anyhow::Error>(())
1227 /// ```
1228 pub fn reset_data_file_names(
1229 &self,
1230 data_cls: &str,
1231 instrument_id: Option<String>,
1232 ) -> anyhow::Result<()> {
1233 let directory = self.make_path(data_cls, instrument_id)?;
1234 self.reset_file_names(&directory)
1235 }
1236
1237 /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1238 ///
1239 /// This internal method scans all Parquet files in a directory, reads their metadata to
1240 /// determine the actual timestamp range of their content, and renames the files accordingly.
1241 /// This ensures that filenames accurately reflect the data they contain.
1242 ///
1243 /// # Parameters
1244 ///
1245 /// - `directory`: The directory path containing Parquet files to rename.
1246 ///
1247 /// # Returns
1248 ///
1249 /// Returns `Ok(())` on success, or an error if the operation fails.
1250 ///
1251 /// # Process
1252 ///
1253 /// 1. Lists all Parquet files in the directory
1254 /// 2. For each file, reads metadata to extract min/max timestamps
1255 /// 3. Generates a new filename based on actual timestamp range
1256 /// 4. Moves the file to the new name using object store operations
1257 /// 5. Validates that intervals remain disjoint after renaming
1258 ///
1259 /// # Errors
1260 ///
1261 /// Returns an error if:
1262 /// - Directory listing fails.
1263 /// - Metadata reading fails for any file.
1264 /// - File move operations fail.
1265 /// - Interval validation fails after renaming.
1266 /// - Object store operations fail.
1267 ///
1268 /// # Notes
1269 ///
1270 /// - This operation can be time-consuming for directories with many files.
1271 /// - Files are processed sequentially to avoid conflicts.
1272 /// - The operation is atomic per file but not across the entire directory.
1273 fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1274 let parquet_files = self.list_parquet_files(directory)?;
1275
1276 for file in parquet_files {
1277 let object_path = ObjectPath::from(file.as_str());
1278 let (first_ts, last_ts) = self.execute_async(async {
1279 min_max_from_parquet_metadata_object_store(
1280 self.object_store.clone(),
1281 &object_path,
1282 "ts_init",
1283 )
1284 .await
1285 })?;
1286
1287 let new_filename =
1288 timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1289 let new_file_path = make_object_store_path(directory, &[&new_filename]);
1290 let new_object_path = ObjectPath::from(new_file_path);
1291
1292 self.move_file(&object_path, &new_object_path)?;
1293 }
1294
1295 let intervals = self.get_directory_intervals(directory)?;
1296
1297 if !are_intervals_disjoint(&intervals) {
1298 anyhow::bail!("Intervals are not disjoint after resetting file names");
1299 }
1300
1301 Ok(())
1302 }
1303
1304 /// Finds all leaf data directories in the catalog.
1305 ///
1306 /// A leaf directory is one that contains data files but no subdirectories.
1307 /// This method is used to identify directories that can be processed for
1308 /// consolidation or other operations.
1309 ///
1310 /// # Returns
1311 ///
1312 /// Returns a vector of directory path strings representing leaf directories,
1313 /// or an error if directory traversal fails.
1314 ///
1315 /// # Errors
1316 ///
1317 /// Returns an error if:
1318 /// - Object store listing operations fail.
1319 /// - Directory structure cannot be analyzed.
1320 ///
1321 /// # Examples
1322 ///
1323 /// ```rust,no_run
1324 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1325 ///
1326 /// let catalog = ParquetDataCatalog::new(/* ... */);
1327 ///
1328 /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1329 /// for dir in leaf_dirs {
1330 /// println!("Found leaf directory: {}", dir);
1331 /// }
1332 /// # Ok::<(), anyhow::Error>(())
1333 /// ```
1334 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1335 let data_dir = make_object_store_path(&self.base_path, &["data"]);
1336
1337 let leaf_dirs = self.execute_async(async {
1338 let mut all_paths = std::collections::HashSet::new();
1339 let mut directories = std::collections::HashSet::new();
1340 let mut files_in_dirs = std::collections::HashMap::new();
1341
1342 // List all objects under the data directory
1343 let prefix = ObjectPath::from(format!("{data_dir}/"));
1344 let mut stream = self.object_store.list(Some(&prefix));
1345
1346 while let Some(object) = stream.next().await {
1347 let object = object?;
1348 let path_str = object.location.to_string();
1349 all_paths.insert(path_str.clone());
1350
1351 // Extract directory path
1352 if let Some(parent) = std::path::Path::new(&path_str).parent() {
1353 let parent_str = parent.to_string_lossy().to_string();
1354 directories.insert(parent_str.clone());
1355
1356 // Track files in each directory
1357 files_in_dirs
1358 .entry(parent_str)
1359 .or_insert_with(Vec::new)
1360 .push(path_str);
1361 }
1362 }
1363
1364 // Find leaf directories (directories with files but no subdirectories)
1365 let mut leaf_dirs = Vec::new();
1366 for dir in &directories {
1367 let has_files = files_in_dirs
1368 .get(dir)
1369 .is_some_and(|files| !files.is_empty());
1370 let has_subdirs = directories
1371 .iter()
1372 .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1373
1374 if has_files && !has_subdirs {
1375 leaf_dirs.push(dir.clone());
1376 }
1377 }
1378
1379 Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1380 })?;
1381
1382 Ok(leaf_dirs)
1383 }
1384
1385 /// Deletes data within a specified time range for a specific data type and instrument.
1386 ///
1387 /// This method identifies all parquet files that intersect with the specified time range
1388 /// and handles them appropriately:
1389 /// - Files completely within the range are deleted
1390 /// - Files partially overlapping the range are split to preserve data outside the range
1391 /// - The original intersecting files are removed after processing
1392 ///
1393 /// # Parameters
1394 ///
1395 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1396 /// - `identifier`: Optional instrument ID to delete data for. If None, deletes data across all instruments.
1397 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1398 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1399 ///
1400 /// # Returns
1401 ///
1402 /// Returns `Ok(())` on success, or an error if deletion fails.
1403 ///
1404 /// # Errors
1405 ///
1406 /// Returns an error if:
1407 /// - The directory path cannot be constructed.
1408 /// - File operations fail.
1409 /// - Data querying or writing fails.
1410 ///
1411 /// # Notes
1412 ///
1413 /// - This operation permanently removes data and cannot be undone.
1414 /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1415 /// - The method ensures data integrity by using atomic operations where possible.
1416 /// - Empty directories are not automatically removed after deletion.
1417 ///
1418 /// # Examples
1419 ///
1420 /// ```rust,no_run
1421 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1422 /// use nautilus_core::UnixNanos;
1423 ///
1424 /// let catalog = ParquetDataCatalog::new(/* ... */);
1425 ///
1426 /// // Delete all quote data for a specific instrument
1427 /// catalog.delete_data_range(
1428 /// "quotes",
1429 /// Some("BTCUSD".to_string()),
1430 /// None,
1431 /// None
1432 /// )?;
1433 ///
1434 /// // Delete trade data within a specific time range
1435 /// catalog.delete_data_range(
1436 /// "trades",
1437 /// None,
1438 /// Some(UnixNanos::from(1609459200000000000)),
1439 /// Some(UnixNanos::from(1609545600000000000))
1440 /// )?;
1441 /// # Ok::<(), anyhow::Error>(())
1442 /// ```
1443 pub fn delete_data_range(
1444 &mut self,
1445 type_name: &str,
1446 identifier: Option<String>,
1447 start: Option<UnixNanos>,
1448 end: Option<UnixNanos>,
1449 ) -> anyhow::Result<()> {
1450 // Use match statement to call the generic delete_data_range for various types
1451 match type_name {
1452 "quotes" => self.delete_data_range_generic::<QuoteTick>(identifier, start, end),
1453 "trades" => self.delete_data_range_generic::<TradeTick>(identifier, start, end),
1454 "bars" => self.delete_data_range_generic::<Bar>(identifier, start, end),
1455 "order_book_deltas" => {
1456 self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1457 }
1458 "order_book_depth10" => {
1459 self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1460 }
1461 _ => anyhow::bail!("Unsupported data type: {type_name}"),
1462 }
1463 }
1464
1465 /// Deletes data within a specified time range across the entire catalog.
1466 ///
1467 /// This method identifies all leaf directories in the catalog that contain parquet files
1468 /// and deletes data within the specified time range from each directory. A leaf directory
1469 /// is one that contains files but no subdirectories. This is a convenience method that
1470 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1471 ///
1472 /// # Parameters
1473 ///
1474 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1475 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1476 ///
1477 /// # Returns
1478 ///
1479 /// Returns `Ok(())` on success, or an error if deletion fails.
1480 ///
1481 /// # Errors
1482 ///
1483 /// Returns an error if:
1484 /// - Directory traversal fails.
1485 /// - Data class extraction from paths fails.
1486 /// - Individual delete operations fail.
1487 ///
1488 /// # Notes
1489 ///
1490 /// - This operation permanently removes data and cannot be undone.
1491 /// - The deletion process handles file intersections intelligently by splitting files
1492 /// when they partially overlap with the deletion range.
1493 /// - Files completely within the deletion range are removed entirely.
1494 /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1495 /// - This method is useful for bulk data cleanup operations across the entire catalog.
1496 /// - Empty directories are not automatically removed after deletion.
1497 ///
1498 /// # Examples
1499 ///
1500 /// ```rust,no_run
1501 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1502 /// use nautilus_core::UnixNanos;
1503 ///
1504 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1505 ///
1506 /// // Delete all data before a specific date across entire catalog
1507 /// catalog.delete_catalog_range(
1508 /// None,
1509 /// Some(UnixNanos::from(1609459200000000000))
1510 /// )?;
1511 ///
1512 /// // Delete all data within a specific range across entire catalog
1513 /// catalog.delete_catalog_range(
1514 /// Some(UnixNanos::from(1609459200000000000)),
1515 /// Some(UnixNanos::from(1609545600000000000))
1516 /// )?;
1517 ///
1518 /// // Delete all data after a specific date across entire catalog
1519 /// catalog.delete_catalog_range(
1520 /// Some(UnixNanos::from(1609459200000000000)),
1521 /// None
1522 /// )?;
1523 /// # Ok::<(), anyhow::Error>(())
1524 /// ```
1525 pub fn delete_catalog_range(
1526 &mut self,
1527 start: Option<UnixNanos>,
1528 end: Option<UnixNanos>,
1529 ) -> anyhow::Result<()> {
1530 let leaf_directories = self.find_leaf_data_directories()?;
1531
1532 for directory in leaf_directories {
1533 if let Ok((Some(data_type), identifier)) =
1534 self.extract_data_cls_and_identifier_from_path(&directory)
1535 {
1536 // Call the existing delete_data_range method
1537 if let Err(e) = self.delete_data_range(&data_type, identifier, start, end) {
1538 eprintln!("Failed to delete data in directory {directory}: {e}");
1539 // Continue with other directories instead of failing completely
1540 }
1541 }
1542 }
1543
1544 Ok(())
1545 }
1546
1547 /// Generic implementation for deleting data within a specified time range.
1548 ///
1549 /// This method provides the core deletion logic that works with any data type
1550 /// that implements the required traits. It handles file intersection analysis,
1551 /// data splitting for partial overlaps, and file cleanup.
1552 ///
1553 /// # Type Parameters
1554 ///
1555 /// - `T`: The data type that implements required traits for catalog operations.
1556 ///
1557 /// # Parameters
1558 ///
1559 /// - `identifier`: Optional instrument ID to delete data for.
1560 /// - `start`: Optional start timestamp for the deletion range.
1561 /// - `end`: Optional end timestamp for the deletion range.
1562 ///
1563 /// # Returns
1564 ///
1565 /// Returns `Ok(())` on success, or an error if deletion fails.
1566 pub fn delete_data_range_generic<T>(
1567 &mut self,
1568 identifier: Option<String>,
1569 start: Option<UnixNanos>,
1570 end: Option<UnixNanos>,
1571 ) -> anyhow::Result<()>
1572 where
1573 T: DecodeDataFromRecordBatch
1574 + CatalogPathPrefix
1575 + EncodeToRecordBatch
1576 + HasTsInit
1577 + TryFrom<Data>
1578 + Clone,
1579 {
1580 // Get intervals for cleaner implementation
1581 let intervals = self.get_intervals(T::path_prefix(), identifier.clone())?;
1582
1583 if intervals.is_empty() {
1584 return Ok(()); // No files to process
1585 }
1586
1587 // Prepare all operations for execution
1588 let operations_to_execute = self.prepare_delete_operations(
1589 T::path_prefix(),
1590 identifier.clone(),
1591 &intervals,
1592 start,
1593 end,
1594 )?;
1595
1596 if operations_to_execute.is_empty() {
1597 return Ok(()); // No operations to execute
1598 }
1599
1600 // Execute all operations
1601 let mut files_to_remove = HashSet::<String>::new();
1602
1603 for operation in operations_to_execute {
1604 // Reset the session before each operation to ensure fresh data is loaded
1605 // This clears any cached table registrations that might interfere with file operations
1606 self.reset_session();
1607 match operation.operation_type.as_str() {
1608 "split_before" => {
1609 // Query data before the deletion range and write it
1610 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1611 let before_data = self.query_typed_data::<T>(
1612 instrument_ids,
1613 Some(UnixNanos::from(operation.query_start)),
1614 Some(UnixNanos::from(operation.query_end)),
1615 None,
1616 Some(operation.files.clone()),
1617 )?;
1618
1619 if !before_data.is_empty() {
1620 let start_ts = UnixNanos::from(operation.file_start_ns);
1621 let end_ts = UnixNanos::from(operation.file_end_ns);
1622 self.write_to_parquet(
1623 before_data,
1624 Some(start_ts),
1625 Some(end_ts),
1626 Some(true),
1627 )?;
1628 }
1629 }
1630 "split_after" => {
1631 // Query data after the deletion range and write it
1632 let instrument_ids = identifier.as_ref().map(|id| vec![id.clone()]);
1633 let after_data = self.query_typed_data::<T>(
1634 instrument_ids,
1635 Some(UnixNanos::from(operation.query_start)),
1636 Some(UnixNanos::from(operation.query_end)),
1637 None,
1638 Some(operation.files.clone()),
1639 )?;
1640
1641 if !after_data.is_empty() {
1642 let start_ts = UnixNanos::from(operation.file_start_ns);
1643 let end_ts = UnixNanos::from(operation.file_end_ns);
1644 self.write_to_parquet(
1645 after_data,
1646 Some(start_ts),
1647 Some(end_ts),
1648 Some(true),
1649 )?;
1650 }
1651 }
1652 _ => {
1653 // For "remove" operations, just mark files for removal
1654 }
1655 }
1656
1657 // Mark files for removal (applies to all operation types)
1658 for file in operation.files {
1659 files_to_remove.insert(file);
1660 }
1661 }
1662
1663 // Remove all files that were processed
1664 for file in files_to_remove {
1665 if let Err(e) = self.delete_file(&file) {
1666 eprintln!("Failed to delete file {file}: {e}");
1667 }
1668 }
1669
1670 Ok(())
1671 }
1672
1673 /// Prepares all operations for data deletion by identifying files that need to be
1674 /// split or removed.
1675 ///
1676 /// This auxiliary function handles all the preparation logic for deletion:
1677 /// 1. Filters intervals by time range
1678 /// 2. Identifies files that intersect with the deletion range
1679 /// 3. Creates split operations for files that partially overlap
1680 /// 4. Generates removal operations for files completely within the range
1681 ///
1682 /// # Parameters
1683 ///
1684 /// - `type_name`: The data type directory name for path generation.
1685 /// - `identifier`: Optional instrument identifier for path generation.
1686 /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
1687 /// - `start`: Optional start timestamp for deletion range.
1688 /// - `end`: Optional end timestamp for deletion range.
1689 ///
1690 /// # Returns
1691 ///
1692 /// Returns a vector of `DeleteOperation` structs ready for execution.
1693 pub fn prepare_delete_operations(
1694 &self,
1695 type_name: &str,
1696 identifier: Option<String>,
1697 intervals: &[(u64, u64)],
1698 start: Option<UnixNanos>,
1699 end: Option<UnixNanos>,
1700 ) -> anyhow::Result<Vec<DeleteOperation>> {
1701 // Convert start/end to nanoseconds
1702 let delete_start_ns = start.map(|s| s.as_u64());
1703 let delete_end_ns = end.map(|e| e.as_u64());
1704
1705 let mut operations = Vec::new();
1706
1707 // Get directory for file path construction
1708 let directory = self.make_path(type_name, identifier)?;
1709
1710 // Process each interval (which represents an actual file)
1711 for &(file_start_ns, file_end_ns) in intervals {
1712 // Check if file intersects with deletion range
1713 let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
1714 && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
1715
1716 if !intersects {
1717 continue; // File doesn't intersect with deletion range
1718 }
1719
1720 // Construct file path from interval timestamps
1721 let filename = timestamps_to_filename(
1722 UnixNanos::from(file_start_ns),
1723 UnixNanos::from(file_end_ns),
1724 );
1725 let file_path = make_object_store_path(&directory, &[&filename]);
1726
1727 // Determine what type of operation is needed
1728 let file_completely_within_range = (delete_start_ns.is_none()
1729 || delete_start_ns.unwrap() <= file_start_ns)
1730 && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
1731
1732 if file_completely_within_range {
1733 // File is completely within deletion range - just mark for removal
1734 operations.push(DeleteOperation {
1735 operation_type: "remove".to_string(),
1736 files: vec![file_path],
1737 query_start: 0,
1738 query_end: 0,
1739 file_start_ns: 0,
1740 file_end_ns: 0,
1741 });
1742 } else {
1743 // File partially overlaps - need to split
1744 if let Some(delete_start) = delete_start_ns
1745 && file_start_ns < delete_start
1746 {
1747 // Keep data before deletion range
1748 operations.push(DeleteOperation {
1749 operation_type: "split_before".to_string(),
1750 files: vec![file_path.clone()],
1751 query_start: file_start_ns,
1752 query_end: delete_start.saturating_sub(1), // Exclusive end
1753 file_start_ns,
1754 file_end_ns: delete_start.saturating_sub(1),
1755 });
1756 }
1757
1758 if let Some(delete_end) = delete_end_ns
1759 && delete_end < file_end_ns
1760 {
1761 // Keep data after deletion range
1762 operations.push(DeleteOperation {
1763 operation_type: "split_after".to_string(),
1764 files: vec![file_path.clone()],
1765 query_start: delete_end.saturating_add(1), // Exclusive start
1766 query_end: file_end_ns,
1767 file_start_ns: delete_end.saturating_add(1),
1768 file_end_ns,
1769 });
1770 }
1771 }
1772 }
1773
1774 Ok(operations)
1775 }
1776}