nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a comprehensive data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores.
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.).
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying.
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance.
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation.
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │   └── INSTRUMENT_ID/
37//! │       └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │   └── INSTRUMENT_ID/
40//! │       └── start_ts-end_ts.parquet
41//! └── bars/
42//!     └── INSTRUMENT_ID/
43//!         └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//!     PathBuf::from("/path/to/data"),
55//!     None,        // storage_options
56//!     Some(5000),  // batch_size
57//!     None,        // compression (defaults to SNAPPY)
58//!     None,        // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66    fmt::Debug,
67    ops::Bound,
68    path::{Path, PathBuf},
69    sync::Arc,
70};
71
72use datafusion::arrow::record_batch::RecordBatch;
73use futures::StreamExt;
74use heck::ToSnakeCase;
75use itertools::Itertools;
76use log::info;
77use nautilus_core::{
78    UnixNanos,
79    datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
80};
81use nautilus_model::data::{
82    Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
83    QuoteTick, TradeTick, close::InstrumentClose, to_variant,
84};
85use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
86use object_store::{ObjectStore, path::Path as ObjectPath};
87use serde::Serialize;
88use unbounded_interval_tree::interval_tree::IntervalTree;
89
90use super::session::{self, DataBackendSession, QueryResult, build_query};
91use crate::parquet::write_batches_to_object_store;
92
93/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
94///
95/// The `ParquetDataCatalog` provides a comprehensive solution for managing large volumes of financial
96/// market data with efficient storage, querying, and consolidation capabilities. It supports various
97/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
98///
99/// # Features
100///
101/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression.
102/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate.
103/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance.
104/// - **Data Validation**: Ensures timestamp ordering and interval consistency.
105/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed.
106/// - **Type Safety**: Strongly typed data handling with compile-time guarantees.
107///
108/// # Data Organization
109///
110/// Data is organized hierarchically by data type and instrument:
111/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`.
112/// - Files are named with their timestamp ranges for efficient range queries.
113/// - Intervals are validated to be disjoint to prevent data overlap.
114///
115/// # Performance Considerations
116///
117/// - **Batch Size**: Controls memory usage during data processing.
118/// - **Compression**: SNAPPY compression provides good balance of speed and size.
119/// - **Row Group Size**: Affects query performance and memory usage.
120/// - **File Consolidation**: Reduces the number of files for better query performance.
121pub struct ParquetDataCatalog {
122    /// The base path for data storage within the object store.
123    pub base_path: String,
124    /// The original URI provided when creating the catalog.
125    pub original_uri: String,
126    /// The object store backend for data persistence.
127    pub object_store: Arc<dyn ObjectStore>,
128    /// The DataFusion session for query execution.
129    pub session: DataBackendSession,
130    /// The number of records to process in each batch.
131    pub batch_size: usize,
132    /// The compression algorithm used for Parquet files.
133    pub compression: parquet::basic::Compression,
134    /// The maximum number of rows in each Parquet row group.
135    pub max_row_group_size: usize,
136}
137
138impl Debug for ParquetDataCatalog {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct(stringify!(ParquetDataCatalog))
141            .field("base_path", &self.base_path)
142            .finish()
143    }
144}
145
146impl ParquetDataCatalog {
147    /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
148    ///
149    /// This is a convenience constructor that converts a local path to a URI format
150    /// and delegates to [`Self::from_uri`].
151    ///
152    /// # Parameters
153    ///
154    /// - `base_path`: The base directory path for data storage.
155    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
156    /// - `batch_size`: Number of records to process in each batch (default: 5000).
157    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
158    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
159    ///
160    /// # Panics
161    ///
162    /// Panics if the path cannot be converted to a valid URI or if the object store
163    /// cannot be created from the path.
164    ///
165    /// # Examples
166    ///
167    /// ```rust,no_run
168    /// use std::path::PathBuf;
169    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
170    ///
171    /// let catalog = ParquetDataCatalog::new(
172    ///     PathBuf::from("/tmp/nautilus_data"),
173    ///     None,        // no storage options
174    ///     Some(1000),  // smaller batch size
175    ///     None,        // default compression
176    ///     None,        // default row group size
177    /// );
178    /// ```
179    #[must_use]
180    pub fn new(
181        base_path: PathBuf,
182        storage_options: Option<std::collections::HashMap<String, String>>,
183        batch_size: Option<usize>,
184        compression: Option<parquet::basic::Compression>,
185        max_row_group_size: Option<usize>,
186    ) -> Self {
187        let path_str = base_path.to_string_lossy().to_string();
188        Self::from_uri(
189            &path_str,
190            storage_options,
191            batch_size,
192            compression,
193            max_row_group_size,
194        )
195        .expect("Failed to create catalog from path")
196    }
197
198    /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
199    ///
200    /// Supports various URI schemes including local file paths and multiple cloud storage backends
201    /// supported by the `object_store` crate.
202    ///
203    /// # Supported URI Schemes
204    ///
205    /// - **AWS S3**: `s3://bucket/path`.
206    /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`.
207    /// - **Azure Blob Storage**: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`.
208    /// - **HTTP/WebDAV**: `http://` or `https://`.
209    /// - **Local files**: `file://path` or plain paths.
210    ///
211    /// # Parameters
212    ///
213    /// - `uri`: The URI for the data storage location.
214    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
215    ///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
216    ///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
217    ///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
218    /// - `batch_size`: Number of records to process in each batch (default: 5000).
219    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
220    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if:
225    /// - The URI format is invalid or unsupported.
226    /// - The object store cannot be created or accessed.
227    /// - Authentication fails for cloud storage backends.
228    ///
229    /// # Examples
230    ///
231    /// ```rust,no_run
232    /// use std::collections::HashMap;
233    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
234    ///
235    /// // Local filesystem
236    /// let local_catalog = ParquetDataCatalog::from_uri(
237    ///     "/tmp/nautilus_data",
238    ///     None, None, None, None
239    /// )?;
240    ///
241    /// // S3 bucket
242    /// let s3_catalog = ParquetDataCatalog::from_uri(
243    ///     "s3://my-bucket/nautilus-data",
244    ///     None, None, None, None
245    /// )?;
246    ///
247    /// // Google Cloud Storage
248    /// let gcs_catalog = ParquetDataCatalog::from_uri(
249    ///     "gs://my-bucket/nautilus-data",
250    ///     None, None, None, None
251    /// )?;
252    ///
253    /// // Azure Blob Storage
254    /// let azure_catalog = ParquetDataCatalog::from_uri(
255    ///     "azure://account/container/nautilus-data",
256    ///     None, None, None, None
257    /// )?;
258    ///
259    /// // S3 with custom endpoint and credentials
260    /// let mut storage_options = HashMap::new();
261    /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
262    /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
263    /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
264    ///
265    /// let s3_catalog = ParquetDataCatalog::from_uri(
266    ///     "s3://my-bucket/nautilus-data",
267    ///     Some(storage_options),
268    ///     None, None, None,
269    /// )?;
270    /// # Ok::<(), anyhow::Error>(())
271    /// ```
272    pub fn from_uri(
273        uri: &str,
274        storage_options: Option<std::collections::HashMap<String, String>>,
275        batch_size: Option<usize>,
276        compression: Option<parquet::basic::Compression>,
277        max_row_group_size: Option<usize>,
278    ) -> anyhow::Result<Self> {
279        let batch_size = batch_size.unwrap_or(5000);
280        let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
281        let max_row_group_size = max_row_group_size.unwrap_or(5000);
282
283        let (object_store, base_path, original_uri) =
284            crate::parquet::create_object_store_from_path(uri, storage_options)?;
285
286        Ok(Self {
287            base_path,
288            original_uri,
289            object_store,
290            session: session::DataBackendSession::new(batch_size),
291            batch_size,
292            compression,
293            max_row_group_size,
294        })
295    }
296
297    /// Returns the base path of the catalog for testing purposes.
298    #[must_use]
299    pub fn get_base_path(&self) -> String {
300        self.base_path.clone()
301    }
302
303    /// Writes mixed data types to the catalog by separating them into type-specific collections.
304    ///
305    /// This method takes a heterogeneous collection of market data and separates it by type,
306    /// then writes each type to its appropriate location in the catalog. This is useful when
307    /// processing mixed data streams or bulk data imports.
308    ///
309    /// # Parameters
310    ///
311    /// - `data`: A vector of mixed [`Data`] enum variants.
312    /// - `start`: Optional start timestamp to override the data's natural range.
313    /// - `end`: Optional end timestamp to override the data's natural range.
314    ///
315    /// # Notes
316    ///
317    /// - Data is automatically sorted by type before writing.
318    /// - Each data type is written to its own directory structure.
319    /// - Instrument data handling is not yet implemented (TODO).
320    ///
321    /// # Examples
322    ///
323    /// ```rust,no_run
324    /// use nautilus_model::data::Data;
325    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
326    ///
327    /// let catalog = ParquetDataCatalog::new(/* ... */);
328    /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
329    ///
330    /// catalog.write_data_enum(mixed_data, None, None)?;
331    /// ```
332    pub fn write_data_enum(
333        &self,
334        data: Vec<Data>,
335        start: Option<UnixNanos>,
336        end: Option<UnixNanos>,
337    ) -> anyhow::Result<()> {
338        let mut deltas: Vec<OrderBookDelta> = Vec::new();
339        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
340        let mut quotes: Vec<QuoteTick> = Vec::new();
341        let mut trades: Vec<TradeTick> = Vec::new();
342        let mut bars: Vec<Bar> = Vec::new();
343        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
344        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
345        let mut closes: Vec<InstrumentClose> = Vec::new();
346
347        for d in data.iter().cloned() {
348            match d {
349                Data::Deltas(_) => continue,
350                Data::Delta(d) => {
351                    deltas.push(d);
352                }
353                Data::Depth10(d) => {
354                    depth10s.push(*d);
355                }
356                Data::Quote(d) => {
357                    quotes.push(d);
358                }
359                Data::Trade(d) => {
360                    trades.push(d);
361                }
362                Data::Bar(d) => {
363                    bars.push(d);
364                }
365                Data::MarkPriceUpdate(p) => {
366                    mark_prices.push(p);
367                }
368                Data::IndexPriceUpdate(p) => {
369                    index_prices.push(p);
370                }
371                Data::InstrumentClose(c) => {
372                    closes.push(c);
373                }
374            }
375        }
376
377        // TODO: need to handle instruments here
378
379        self.write_to_parquet(deltas, start, end, None)?;
380        self.write_to_parquet(depth10s, start, end, None)?;
381        self.write_to_parquet(quotes, start, end, None)?;
382        self.write_to_parquet(trades, start, end, None)?;
383        self.write_to_parquet(bars, start, end, None)?;
384        self.write_to_parquet(mark_prices, start, end, None)?;
385        self.write_to_parquet(index_prices, start, end, None)?;
386        self.write_to_parquet(closes, start, end, None)?;
387
388        Ok(())
389    }
390
391    /// Writes typed data to a Parquet file in the catalog.
392    ///
393    /// This is the core method for persisting market data to the catalog. It handles data
394    /// validation, batching, compression, and ensures proper file organization with
395    /// timestamp-based naming.
396    ///
397    /// # Type Parameters
398    ///
399    /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
400    ///
401    /// # Parameters
402    ///
403    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
404    /// - `start`: Optional start timestamp to override the natural data range.
405    /// - `end`: Optional end timestamp to override the natural data range.
406    ///
407    /// # Returns
408    ///
409    /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
410    ///
411    /// # Errors
412    ///
413    /// This function will return an error if:
414    /// - Data serialization to Arrow record batches fails.
415    /// - Object store write operations fail.
416    /// - File path construction fails.
417    /// - Timestamp interval validation fails after writing.
418    ///
419    /// # Panics
420    ///
421    /// Panics if:
422    /// - Data timestamps are not in ascending order.
423    /// - Record batches are empty after conversion.
424    /// - Required metadata is missing from the schema.
425    ///
426    /// # Examples
427    ///
428    /// ```rust,no_run
429    /// use nautilus_model::data::QuoteTick;
430    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
431    ///
432    /// let catalog = ParquetDataCatalog::new(/* ... */);
433    /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
434    ///
435    /// let path = catalog.write_to_parquet(quotes, None, None)?;
436    /// println!("Data written to: {:?}", path);
437    /// # Ok::<(), anyhow::Error>(())
438    /// ```
439    pub fn write_to_parquet<T>(
440        &self,
441        data: Vec<T>,
442        start: Option<UnixNanos>,
443        end: Option<UnixNanos>,
444        skip_disjoint_check: Option<bool>,
445    ) -> anyhow::Result<PathBuf>
446    where
447        T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
448    {
449        if data.is_empty() {
450            return Ok(PathBuf::new());
451        }
452
453        let type_name = std::any::type_name::<T>().to_snake_case();
454        Self::check_ascending_timestamps(&data, &type_name)?;
455
456        let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
457        let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
458
459        let batches = self.data_to_record_batches(data)?;
460        let schema = batches.first().expect("Batches are empty.").schema();
461        let instrument_id = schema.metadata.get("instrument_id").cloned();
462
463        let directory = self.make_path(T::path_prefix(), instrument_id)?;
464        let filename = timestamps_to_filename(start_ts, end_ts);
465        let path = PathBuf::from(format!("{directory}/{filename}"));
466
467        // Write all batches to parquet file
468        info!(
469            "Writing {} batches of {type_name} data to {path:?}",
470            batches.len()
471        );
472
473        // Convert path to object store path
474        let object_path = self.to_object_path(&path.to_string_lossy());
475
476        self.execute_async(async {
477            write_batches_to_object_store(
478                &batches,
479                self.object_store.clone(),
480                &object_path,
481                Some(self.compression),
482                Some(self.max_row_group_size),
483            )
484            .await
485        })?;
486
487        if !skip_disjoint_check.unwrap_or(false) {
488            let intervals = self.get_directory_intervals(&directory)?;
489
490            if !are_intervals_disjoint(&intervals) {
491                anyhow::bail!("Intervals are not disjoint after writing a new file");
492            }
493        }
494
495        Ok(path)
496    }
497
498    /// Writes typed data to a JSON file in the catalog.
499    ///
500    /// This method provides an alternative to Parquet format for data export and debugging.
501    /// JSON files are human-readable but less efficient for large datasets.
502    ///
503    /// # Type Parameters
504    ///
505    /// - `T`: The data type to write, must implement serialization and cataloging traits.
506    ///
507    /// # Parameters
508    ///
509    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
510    /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
511    /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
512    ///
513    /// # Returns
514    ///
515    /// Returns the [`PathBuf`] of the created JSON file.
516    ///
517    /// # Errors
518    ///
519    /// This function will return an error if:
520    /// - JSON serialization fails.
521    /// - Object store write operations fail.
522    /// - File path construction fails.
523    ///
524    /// # Panics
525    ///
526    /// Panics if data timestamps are not in ascending order.
527    ///
528    /// # Examples
529    ///
530    /// ```rust,no_run
531    /// use std::path::PathBuf;
532    /// use nautilus_model::data::TradeTick;
533    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
534    ///
535    /// let catalog = ParquetDataCatalog::new(/* ... */);
536    /// let trades: Vec<TradeTick> = vec![/* trade data */];
537    ///
538    /// let path = catalog.write_to_json(
539    ///     trades,
540    ///     Some(PathBuf::from("/custom/path")),
541    ///     true  // write metadata
542    /// )?;
543    /// # Ok::<(), anyhow::Error>(())
544    /// ```
545    pub fn write_to_json<T>(
546        &self,
547        data: Vec<T>,
548        path: Option<PathBuf>,
549        write_metadata: bool,
550    ) -> anyhow::Result<PathBuf>
551    where
552        T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
553    {
554        if data.is_empty() {
555            return Ok(PathBuf::new());
556        }
557
558        let type_name = std::any::type_name::<T>().to_snake_case();
559        Self::check_ascending_timestamps(&data, &type_name)?;
560
561        let start_ts = data.first().unwrap().ts_init();
562        let end_ts = data.last().unwrap().ts_init();
563
564        let directory =
565            path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
566        let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
567        let json_path = directory.join(&filename);
568
569        info!(
570            "Writing {} records of {type_name} data to {json_path:?}",
571            data.len()
572        );
573
574        if write_metadata {
575            let metadata = T::chunk_metadata(&data);
576            let metadata_path = json_path.with_extension("metadata.json");
577            info!("Writing metadata to {metadata_path:?}");
578
579            // Use object store for metadata file
580            let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
581            let metadata_json = serde_json::to_vec_pretty(&metadata)?;
582            self.execute_async(async {
583                self.object_store
584                    .put(&metadata_object_path, metadata_json.into())
585                    .await
586                    .map_err(anyhow::Error::from)
587            })?;
588        }
589
590        // Use object store for main JSON file
591        let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
592        let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
593        self.execute_async(async {
594            self.object_store
595                .put(&json_object_path, json_data.into())
596                .await
597                .map_err(anyhow::Error::from)
598        })?;
599
600        Ok(json_path)
601    }
602
603    /// Validates that data timestamps are in ascending order.
604    ///
605    /// # Parameters
606    ///
607    /// - `data`: Slice of data records to validate.
608    /// - `type_name`: Name of the data type for error messages.
609    ///
610    /// # Panics
611    ///
612    /// Panics if any timestamp is less than the previous timestamp.
613    pub fn check_ascending_timestamps<T: HasTsInit>(
614        data: &[T],
615        type_name: &str,
616    ) -> anyhow::Result<()> {
617        if !data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()) {
618            anyhow::bail!("{type_name} timestamps must be in ascending order");
619        }
620
621        Ok(())
622    }
623
624    /// Converts data into Arrow record batches for Parquet serialization.
625    ///
626    /// This method chunks the data according to the configured batch size and converts
627    /// each chunk into an Arrow record batch with appropriate metadata.
628    ///
629    /// # Type Parameters
630    ///
631    /// - `T`: The data type to convert, must implement required encoding traits.
632    ///
633    /// # Parameters
634    ///
635    /// - `data`: Vector of data records to convert.
636    ///
637    /// # Returns
638    ///
639    /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if record batch encoding fails for any chunk.
644    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
645    where
646        T: HasTsInit + EncodeToRecordBatch,
647    {
648        let mut batches = Vec::new();
649
650        for chunk in &data.into_iter().chunks(self.batch_size) {
651            let data = chunk.collect_vec();
652            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
653            let record_batch = T::encode_batch(&metadata, &data)?;
654            batches.push(record_batch);
655        }
656
657        Ok(batches)
658    }
659
660    /// Extends the timestamp range of an existing Parquet file by renaming it.
661    ///
662    /// This method finds an existing file that is adjacent to the specified time range
663    /// and renames it to include the new range. This is useful when appending data
664    /// that extends the time coverage of existing files.
665    ///
666    /// # Parameters
667    ///
668    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
669    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
670    /// - `start`: Start timestamp of the new range to extend to.
671    /// - `end`: End timestamp of the new range to extend to.
672    ///
673    /// # Returns
674    ///
675    /// Returns `Ok(())` on success, or an error if the operation fails.
676    ///
677    /// # Errors
678    ///
679    /// This function will return an error if:
680    /// - The directory path cannot be constructed.
681    /// - No adjacent file is found to extend.
682    /// - File rename operations fail.
683    /// - Interval validation fails after extension.
684    ///
685    /// # Examples
686    ///
687    /// ```rust,no_run
688    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
689    /// use nautilus_core::UnixNanos;
690    ///
691    /// let catalog = ParquetDataCatalog::new(/* ... */);
692    ///
693    /// // Extend a file's range backwards or forwards
694    /// catalog.extend_file_name(
695    ///     "quotes",
696    ///     Some("BTCUSD".to_string()),
697    ///     UnixNanos::from(1609459200000000000),
698    ///     UnixNanos::from(1609545600000000000)
699    /// )?;
700    /// # Ok::<(), anyhow::Error>(())
701    /// ```
702    pub fn extend_file_name(
703        &self,
704        data_cls: &str,
705        instrument_id: Option<String>,
706        start: UnixNanos,
707        end: UnixNanos,
708    ) -> anyhow::Result<()> {
709        let directory = self.make_path(data_cls, instrument_id)?;
710        let intervals = self.get_directory_intervals(&directory)?;
711
712        let start = start.as_u64();
713        let end = end.as_u64();
714
715        for interval in intervals {
716            if interval.0 == end + 1 {
717                // Extend backwards: new file covers [start, interval.1]
718                self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
719                break;
720            } else if interval.1 == start - 1 {
721                // Extend forwards: new file covers [interval.0, end]
722                self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
723                break;
724            }
725        }
726
727        let intervals = self.get_directory_intervals(&directory)?;
728
729        if !are_intervals_disjoint(&intervals) {
730            anyhow::bail!("Intervals are not disjoint after extending a file");
731        }
732
733        Ok(())
734    }
735
736    /// Lists all Parquet files in a specified directory.
737    ///
738    /// This method scans a directory and returns the full paths of all files with the `.parquet`
739    /// extension. It works with both local filesystems and remote object stores, making it
740    /// suitable for various storage backends.
741    ///
742    /// # Parameters
743    ///
744    /// - `directory`: The directory path to scan for Parquet files.
745    ///
746    /// # Returns
747    ///
748    /// Returns a vector of full file paths (as strings) for all Parquet files found in the directory.
749    /// The paths are relative to the object store root and suitable for use with object store operations.
750    /// Returns an empty vector if the directory doesn't exist or contains no Parquet files.
751    ///
752    /// # Errors
753    ///
754    /// This function will return an error if:
755    /// - Object store listing operations fail.
756    /// - Directory access is denied.
757    /// - Network issues occur (for remote object stores).
758    ///
759    /// # Notes
760    ///
761    /// - Only files ending with `.parquet` are included.
762    /// - Subdirectories are not recursively scanned.
763    /// - File paths are returned in the order provided by the object store.
764    /// - Works with all supported object store backends (local, S3, GCS, Azure, etc.).
765    ///
766    /// # Examples
767    ///
768    /// ```rust,no_run
769    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
770    ///
771    /// let catalog = ParquetDataCatalog::new(/* ... */);
772    /// let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
773    ///
774    /// for file in files {
775    ///     println!("Found Parquet file: {}", file);
776    /// }
777    /// # Ok::<(), anyhow::Error>(())
778    /// ```
779    pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
780        self.execute_async(async {
781            let prefix = ObjectPath::from(format!("{directory}/"));
782            let mut stream = self.object_store.list(Some(&prefix));
783            let mut files = Vec::new();
784
785            while let Some(object) = stream.next().await {
786                let object = object?;
787                if object.location.as_ref().ends_with(".parquet") {
788                    files.push(object.location.to_string());
789                }
790            }
791            Ok::<Vec<String>, anyhow::Error>(files)
792        })
793    }
794
795    /// Helper method to reconstruct full URI for remote object store paths
796    #[must_use]
797    pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
798        // Check if this is a remote URI scheme that needs reconstruction
799        if self.is_remote_uri() {
800            // Extract the base URL (scheme + host) from the original URI
801            if let Ok(url) = url::Url::parse(&self.original_uri)
802                && let Some(host) = url.host_str()
803            {
804                return format!("{}://{}/{}", url.scheme(), host, path_str);
805            }
806        }
807
808        // For local paths, extract the directory from the original URI
809        if self.original_uri.starts_with("file://") {
810            // Extract the path from the file:// URI
811            if let Ok(url) = url::Url::parse(&self.original_uri)
812                && let Ok(base_path) = url.to_file_path()
813            {
814                return format!("{}/{}", base_path.display(), path_str);
815            }
816        }
817
818        // For local paths without file:// prefix, use the original URI as base
819        if self.base_path.is_empty() {
820            // If base_path is empty and not a file URI, try using original_uri as base
821            if self.original_uri.contains("://") {
822                // Fallback: return the path as-is
823                path_str.to_string()
824            } else {
825                format!("{}/{}", self.original_uri.trim_end_matches('/'), path_str)
826            }
827        } else {
828            let base = self.base_path.trim_end_matches('/');
829            format!("{base}/{path_str}")
830        }
831    }
832
833    /// Helper method to check if the original URI uses a remote object store scheme
834    #[must_use]
835    pub fn is_remote_uri(&self) -> bool {
836        self.original_uri.starts_with("s3://")
837            || self.original_uri.starts_with("gs://")
838            || self.original_uri.starts_with("gcs://")
839            || self.original_uri.starts_with("azure://")
840            || self.original_uri.starts_with("abfs://")
841            || self.original_uri.starts_with("http://")
842            || self.original_uri.starts_with("https://")
843    }
844
845    /// Executes a query against the catalog to retrieve market data of a specific type.
846    ///
847    /// This is the primary method for querying data from the catalog. It registers the appropriate
848    /// object store with the DataFusion session, finds all relevant Parquet files, and executes
849    /// the query across them. The method supports filtering by instrument IDs, time ranges, and
850    /// custom SQL WHERE clauses.
851    ///
852    /// # Type Parameters
853    ///
854    /// - `T`: The data type to query, must implement required traits for deserialization and cataloging.
855    ///
856    /// # Parameters
857    ///
858    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
859    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
860    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
861    /// - `where_clause`: Optional SQL WHERE clause for additional filtering (e.g., "price > 100").
862    ///
863    /// # Returns
864    ///
865    /// Returns a [`QueryResult`] containing the query execution context and data.
866    /// Use [`QueryResult::collect()`] to retrieve the actual data records.
867    ///
868    /// # Errors
869    ///
870    /// This function will return an error if:
871    /// - Object store registration fails for remote URIs.
872    /// - File discovery fails.
873    /// - DataFusion query execution fails.
874    /// - Data deserialization fails.
875    ///
876    /// # Performance Notes
877    ///
878    /// - Files are automatically filtered by timestamp ranges before querying.
879    /// - DataFusion optimizes queries across multiple Parquet files.
880    /// - Use specific instrument IDs and time ranges to improve performance.
881    /// - WHERE clauses are pushed down to the Parquet reader when possible.
882    ///
883    /// # Examples
884    ///
885    /// ```rust,no_run
886    /// use nautilus_model::data::QuoteTick;
887    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
888    /// use nautilus_core::UnixNanos;
889    ///
890    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
891    ///
892    /// // Query all quote data
893    /// let result = catalog.query::<QuoteTick>(None, None, None, None)?;
894    /// let quotes = result.collect();
895    ///
896    /// // Query specific instruments within a time range
897    /// let result = catalog.query::<QuoteTick>(
898    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
899    ///     Some(UnixNanos::from(1609459200000000000)),
900    ///     Some(UnixNanos::from(1609545600000000000)),
901    ///     None
902    /// )?;
903    ///
904    /// // Query with custom WHERE clause
905    /// let result = catalog.query::<QuoteTick>(
906    ///     Some(vec!["EURUSD".to_string()]),
907    ///     None,
908    ///     None,
909    ///     Some("bid_price > 1.2000")
910    /// )?;
911    /// # Ok::<(), anyhow::Error>(())
912    /// ```
913    pub fn query<T>(
914        &mut self,
915        instrument_ids: Option<Vec<String>>,
916        start: Option<UnixNanos>,
917        end: Option<UnixNanos>,
918        where_clause: Option<&str>,
919        files: Option<Vec<String>>,
920    ) -> anyhow::Result<QueryResult>
921    where
922        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
923    {
924        // Register the object store with the session for remote URIs
925        if self.is_remote_uri() {
926            let url = url::Url::parse(&self.original_uri)?;
927            let host = url
928                .host_str()
929                .ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
930            let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
931            self.session
932                .register_object_store(&base_url, self.object_store.clone());
933        }
934
935        let files_list = if let Some(files) = files {
936            files
937        } else {
938            self.query_files(T::path_prefix(), instrument_ids, start, end)?
939        };
940
941        // Use a unique timestamp-based suffix to avoid table name conflicts
942        let unique_suffix = std::time::SystemTime::now()
943            .duration_since(std::time::UNIX_EPOCH)
944            .unwrap()
945            .as_nanos();
946
947        for (idx, file_uri) in files_list.iter().enumerate() {
948            let table_name = format!("{}_{}_{}", T::path_prefix(), unique_suffix, idx);
949            let query = build_query(&table_name, start, end, where_clause);
950
951            // Convert object store path to filesystem path for DataFusion
952            // Only apply reconstruction if the path is not already absolute
953            let resolved_path = if file_uri.starts_with('/') {
954                // Path is already absolute, use as-is
955                file_uri.clone()
956            } else {
957                // Path is relative, reconstruct full URI
958                self.reconstruct_full_uri(file_uri)
959            };
960            self.session
961                .add_file::<T>(&table_name, &resolved_path, Some(&query))?;
962        }
963
964        Ok(self.session.get_query_result())
965    }
966
967    /// Queries typed data from the catalog and returns results as a strongly-typed vector.
968    ///
969    /// This is a convenience method that wraps the generic `query` method and automatically
970    /// collects and converts the results into a vector of the specific data type. It handles
971    /// the type conversion from the generic [`Data`] enum to the concrete type `T`.
972    ///
973    /// # Type Parameters
974    ///
975    /// - `T`: The specific data type to query and return. Must implement required traits for
976    ///   deserialization, cataloging, and conversion from the [`Data`] enum.
977    ///
978    /// # Parameters
979    ///
980    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
981    ///   For exact matches, provide the full instrument ID. For bars, partial matches are supported.
982    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
983    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
984    /// - `where_clause`: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax
985    ///   with column names matching the Parquet schema (e.g., "`bid_price` > 1.2000", "volume > 1000").
986    ///
987    /// # Returns
988    ///
989    /// Returns a vector of the specific data type `T`, sorted by timestamp. The vector will be
990    /// empty if no data matches the query criteria.
991    ///
992    /// # Errors
993    ///
994    /// This function will return an error if:
995    /// - The underlying query execution fails.
996    /// - Data type conversion fails.
997    /// - Object store access fails.
998    /// - Invalid WHERE clause syntax is provided.
999    ///
1000    /// # Performance Considerations
1001    ///
1002    /// - Use specific instrument IDs and time ranges to minimize data scanning.
1003    /// - WHERE clauses are pushed down to Parquet readers when possible.
1004    /// - Results are automatically sorted by timestamp during collection.
1005    /// - Memory usage scales with the amount of data returned.
1006    ///
1007    /// # Examples
1008    ///
1009    /// ```rust,no_run
1010    /// use nautilus_model::data::{QuoteTick, TradeTick, Bar};
1011    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1012    /// use nautilus_core::UnixNanos;
1013    ///
1014    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1015    ///
1016    /// // Query all quotes for a specific instrument
1017    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1018    ///     Some(vec!["EURUSD".to_string()]),
1019    ///     None,
1020    ///     None,
1021    ///     None
1022    /// )?;
1023    ///
1024    /// // Query trades within a specific time range
1025    /// let trades: Vec<TradeTick> = catalog.query_typed_data(
1026    ///     Some(vec!["BTCUSD".to_string()]),
1027    ///     Some(UnixNanos::from(1609459200000000000)),
1028    ///     Some(UnixNanos::from(1609545600000000000)),
1029    ///     None
1030    /// )?;
1031    ///
1032    /// // Query bars with volume filter
1033    /// let bars: Vec<Bar> = catalog.query_typed_data(
1034    ///     Some(vec!["AAPL".to_string()]),
1035    ///     None,
1036    ///     None,
1037    ///     Some("volume > 1000000")
1038    /// )?;
1039    ///
1040    /// // Query multiple instruments with price filter
1041    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1042    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
1043    ///     None,
1044    ///     None,
1045    ///     Some("bid_price > 1.2000 AND ask_price < 1.3000")
1046    /// )?;
1047    /// # Ok::<(), anyhow::Error>(())
1048    /// ```
1049    pub fn query_typed_data<T>(
1050        &mut self,
1051        instrument_ids: Option<Vec<String>>,
1052        start: Option<UnixNanos>,
1053        end: Option<UnixNanos>,
1054        where_clause: Option<&str>,
1055        files: Option<Vec<String>>,
1056    ) -> anyhow::Result<Vec<T>>
1057    where
1058        T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
1059    {
1060        let query_result = self.query::<T>(instrument_ids, start, end, where_clause, files)?;
1061        let all_data = query_result.collect();
1062
1063        // Convert Data enum variants to specific type T using to_variant
1064        Ok(to_variant::<T>(all_data))
1065    }
1066
1067    /// Queries all Parquet files for a specific data type and optional instrument IDs.
1068    ///
1069    /// This method finds all Parquet files that match the specified criteria and returns
1070    /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1071    /// and timestamp range (if provided).
1072    ///
1073    /// # Parameters
1074    ///
1075    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1076    /// - `instrument_ids`: Optional list of instrument IDs to filter by.
1077    /// - `start`: Optional start timestamp to filter files by their time range.
1078    /// - `end`: Optional end timestamp to filter files by their time range.
1079    ///
1080    /// # Returns
1081    ///
1082    /// Returns a vector of file URI strings that match the query criteria,
1083    /// or an error if the query fails.
1084    ///
1085    /// # Errors
1086    ///
1087    /// This function will return an error if:
1088    /// - The directory path cannot be constructed.
1089    /// - Object store listing operations fail.
1090    /// - URI reconstruction fails.
1091    ///
1092    /// # Examples
1093    ///
1094    /// ```rust,no_run
1095    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1096    /// use nautilus_core::UnixNanos;
1097    ///
1098    /// let catalog = ParquetDataCatalog::new(/* ... */);
1099    ///
1100    /// // Query all quote files
1101    /// let files = catalog.query_files("quotes", None, None, None)?;
1102    ///
1103    /// // Query trade files for specific instruments within a time range
1104    /// let files = catalog.query_files(
1105    ///     "trades",
1106    ///     Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
1107    ///     Some(UnixNanos::from(1609459200000000000)),
1108    ///     Some(UnixNanos::from(1609545600000000000))
1109    /// )?;
1110    /// # Ok::<(), anyhow::Error>(())
1111    /// ```
1112    pub fn query_files(
1113        &self,
1114        data_cls: &str,
1115        instrument_ids: Option<Vec<String>>,
1116        start: Option<UnixNanos>,
1117        end: Option<UnixNanos>,
1118    ) -> anyhow::Result<Vec<String>> {
1119        let mut files = Vec::new();
1120
1121        let start_u64 = start.map(|s| s.as_u64());
1122        let end_u64 = end.map(|e| e.as_u64());
1123
1124        let base_dir = self.make_path(data_cls, None)?;
1125
1126        // Use recursive listing to match Python's glob behavior
1127        let list_result = self.execute_async(async {
1128            let prefix = ObjectPath::from(format!("{base_dir}/"));
1129            let mut stream = self.object_store.list(Some(&prefix));
1130            let mut objects = Vec::new();
1131            while let Some(object) = stream.next().await {
1132                objects.push(object?);
1133            }
1134            Ok::<Vec<_>, anyhow::Error>(objects)
1135        })?;
1136
1137        let mut file_paths: Vec<String> = list_result
1138            .into_iter()
1139            .filter_map(|object| {
1140                let path_str = object.location.to_string();
1141                if path_str.ends_with(".parquet") {
1142                    Some(path_str)
1143                } else {
1144                    None
1145                }
1146            })
1147            .collect();
1148
1149        // Apply identifier filtering if provided
1150        if let Some(identifiers) = instrument_ids {
1151            let safe_identifiers: Vec<String> = identifiers
1152                .iter()
1153                .map(|id| urisafe_instrument_id(id))
1154                .collect();
1155
1156            // Exact match by default for instrument_ids or bar_types
1157            let exact_match_file_paths: Vec<String> = file_paths
1158                .iter()
1159                .filter(|file_path| {
1160                    // Extract the directory name (second to last path component)
1161                    let path_parts: Vec<&str> = file_path.split('/').collect();
1162                    if path_parts.len() >= 2 {
1163                        let dir_name = path_parts[path_parts.len() - 2];
1164                        safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
1165                    } else {
1166                        false
1167                    }
1168                })
1169                .cloned()
1170                .collect();
1171
1172            if exact_match_file_paths.is_empty() && data_cls == "bars" {
1173                // Partial match of instrument_ids in bar_types for bars
1174                file_paths.retain(|file_path| {
1175                    let path_parts: Vec<&str> = file_path.split('/').collect();
1176                    if path_parts.len() >= 2 {
1177                        let dir_name = path_parts[path_parts.len() - 2];
1178                        safe_identifiers
1179                            .iter()
1180                            .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
1181                    } else {
1182                        false
1183                    }
1184                });
1185            } else {
1186                file_paths = exact_match_file_paths;
1187            }
1188        }
1189
1190        // Apply timestamp filtering
1191        file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
1192
1193        // Convert to full URIs
1194        for file_path in file_paths {
1195            let full_uri = self.reconstruct_full_uri(&file_path);
1196            files.push(full_uri);
1197        }
1198
1199        Ok(files)
1200    }
1201
1202    /// Finds the missing time intervals for a specific data type and instrument ID.
1203    ///
1204    /// This method compares a requested time range against the existing data coverage
1205    /// and returns the gaps that need to be filled. This is useful for determining
1206    /// what data needs to be fetched or backfilled.
1207    ///
1208    /// # Parameters
1209    ///
1210    /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
1211    /// - `end`: End timestamp of the requested range (Unix nanoseconds).
1212    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1213    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1214    ///
1215    /// # Returns
1216    ///
1217    /// Returns a vector of (start, end) tuples representing the missing intervals,
1218    /// or an error if the operation fails.
1219    ///
1220    /// # Errors
1221    ///
1222    /// This function will return an error if:
1223    /// - The directory path cannot be constructed.
1224    /// - Interval retrieval fails.
1225    /// - Gap calculation fails.
1226    ///
1227    /// # Examples
1228    ///
1229    /// ```rust,no_run
1230    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1231    ///
1232    /// let catalog = ParquetDataCatalog::new(/* ... */);
1233    ///
1234    /// // Find missing intervals for quote data
1235    /// let missing = catalog.get_missing_intervals_for_request(
1236    ///     1609459200000000000,  // start
1237    ///     1609545600000000000,  // end
1238    ///     "quotes",
1239    ///     Some("BTCUSD".to_string())
1240    /// )?;
1241    ///
1242    /// for (start, end) in missing {
1243    ///     println!("Missing data from {} to {}", start, end);
1244    /// }
1245    /// # Ok::<(), anyhow::Error>(())
1246    /// ```
1247    pub fn get_missing_intervals_for_request(
1248        &self,
1249        start: u64,
1250        end: u64,
1251        data_cls: &str,
1252        instrument_id: Option<String>,
1253    ) -> anyhow::Result<Vec<(u64, u64)>> {
1254        let intervals = self.get_intervals(data_cls, instrument_id)?;
1255
1256        Ok(query_interval_diff(start, end, &intervals))
1257    }
1258
1259    /// Gets the last (most recent) timestamp for a specific data type and instrument ID.
1260    ///
1261    /// This method finds the latest timestamp covered by existing data files for
1262    /// the specified data type and instrument. This is useful for determining
1263    /// the most recent data available or for incremental data updates.
1264    ///
1265    /// # Parameters
1266    ///
1267    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1268    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1269    ///
1270    /// # Returns
1271    ///
1272    /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
1273    /// or an error if the operation fails.
1274    ///
1275    /// # Errors
1276    ///
1277    /// This function will return an error if:
1278    /// - The directory path cannot be constructed.
1279    /// - Interval retrieval fails.
1280    ///
1281    /// # Examples
1282    ///
1283    /// ```rust,no_run
1284    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1285    ///
1286    /// let catalog = ParquetDataCatalog::new(/* ... */);
1287    ///
1288    /// // Get the last timestamp for quote data
1289    /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
1290    ///     println!("Last quote timestamp: {}", last_ts);
1291    /// } else {
1292    ///     println!("No quote data found");
1293    /// }
1294    /// # Ok::<(), anyhow::Error>(())
1295    /// ```
1296    pub fn query_last_timestamp(
1297        &self,
1298        data_cls: &str,
1299        instrument_id: Option<String>,
1300    ) -> anyhow::Result<Option<u64>> {
1301        let intervals = self.get_intervals(data_cls, instrument_id)?;
1302
1303        if intervals.is_empty() {
1304            return Ok(None);
1305        }
1306
1307        Ok(Some(intervals.last().unwrap().1))
1308    }
1309
1310    /// Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
1311    ///
1312    /// This method returns all time intervals covered by existing data files for the
1313    /// specified data type and instrument. The intervals are sorted by start time and
1314    /// represent the complete data coverage available.
1315    ///
1316    /// # Parameters
1317    ///
1318    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1319    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1320    ///
1321    /// # Returns
1322    ///
1323    /// Returns a vector of (start, end) tuples representing the covered intervals,
1324    /// sorted by start time, or an error if the operation fails.
1325    ///
1326    /// # Errors
1327    ///
1328    /// This function will return an error if:
1329    /// - The directory path cannot be constructed.
1330    /// - Directory listing fails.
1331    /// - Filename parsing fails.
1332    ///
1333    /// # Examples
1334    ///
1335    /// ```rust,no_run
1336    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1337    ///
1338    /// let catalog = ParquetDataCatalog::new(/* ... */);
1339    ///
1340    /// // Get all intervals for quote data
1341    /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
1342    /// for (start, end) in intervals {
1343    ///     println!("Data available from {} to {}", start, end);
1344    /// }
1345    /// # Ok::<(), anyhow::Error>(())
1346    /// ```
1347    pub fn get_intervals(
1348        &self,
1349        data_cls: &str,
1350        instrument_id: Option<String>,
1351    ) -> anyhow::Result<Vec<(u64, u64)>> {
1352        let directory = self.make_path(data_cls, instrument_id)?;
1353
1354        self.get_directory_intervals(&directory)
1355    }
1356
1357    /// Gets the time intervals covered by Parquet files in a specific directory.
1358    ///
1359    /// This method scans a directory for Parquet files and extracts the timestamp ranges
1360    /// from their filenames. It's used internally by other methods to determine data coverage
1361    /// and is essential for interval-based operations like gap detection and consolidation.
1362    ///
1363    /// # Parameters
1364    ///
1365    /// - `directory`: The directory path to scan for Parquet files.
1366    ///
1367    /// # Returns
1368    ///
1369    /// Returns a vector of (start, end) tuples representing the time intervals covered
1370    /// by files in the directory, sorted by start timestamp. Returns an empty vector
1371    /// if the directory doesn't exist or contains no valid Parquet files.
1372    ///
1373    /// # Errors
1374    ///
1375    /// This function will return an error if:
1376    /// - Object store listing operations fail.
1377    /// - Directory access is denied.
1378    ///
1379    /// # Notes
1380    ///
1381    /// - Only files with valid timestamp-based filenames are included.
1382    /// - Files with unparseable names are silently ignored.
1383    /// - The method works with both local and remote object stores.
1384    /// - Results are automatically sorted by start timestamp.
1385    ///
1386    /// # Examples
1387    ///
1388    /// ```rust,no_run
1389    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1390    ///
1391    /// let catalog = ParquetDataCatalog::new(/* ... */);
1392    /// let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
1393    ///
1394    /// for (start, end) in intervals {
1395    ///     println!("File covers {} to {}", start, end);
1396    /// }
1397    /// # Ok::<(), anyhow::Error>(())
1398    /// ```
1399    pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
1400        let mut intervals = Vec::new();
1401
1402        // Use object store for all operations
1403        let list_result = self.execute_async(async {
1404            let path = object_store::path::Path::from(directory);
1405            Ok(self
1406                .object_store
1407                .list(Some(&path))
1408                .collect::<Vec<_>>()
1409                .await)
1410        })?;
1411
1412        for result in list_result {
1413            match result {
1414                Ok(object) => {
1415                    let path_str = object.location.to_string();
1416                    if path_str.ends_with(".parquet")
1417                        && let Some(interval) = parse_filename_timestamps(&path_str)
1418                    {
1419                        intervals.push(interval);
1420                    }
1421                }
1422                Err(_) => {
1423                    // Directory doesn't exist or is empty, which is fine
1424                    break;
1425                }
1426            }
1427        }
1428
1429        intervals.sort_by_key(|&(start, _)| start);
1430
1431        Ok(intervals)
1432    }
1433
1434    /// Constructs a directory path for storing data of a specific type and instrument.
1435    ///
1436    /// This method builds the hierarchical directory structure used by the catalog to organize
1437    /// data by type and instrument. The path follows the pattern: `{base_path}/data/{type_name}/{instrument_id}`.
1438    /// Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
1439    ///
1440    /// # Parameters
1441    ///
1442    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1443    /// - `instrument_id`: Optional instrument ID. If provided, creates a subdirectory for the instrument.
1444    ///   If `None`, returns the path to the data type directory.
1445    ///
1446    /// # Returns
1447    ///
1448    /// Returns the constructed directory path as a string, or an error if path construction fails.
1449    ///
1450    /// # Errors
1451    ///
1452    /// This function will return an error if:
1453    /// - The instrument ID contains invalid characters that cannot be made URI-safe.
1454    /// - Path construction fails due to system limitations.
1455    ///
1456    /// # Path Structure
1457    ///
1458    /// - Without instrument ID: `{base_path}/data/{type_name}`.
1459    /// - With instrument ID: `{base_path}/data/{type_name}/{safe_instrument_id}`.
1460    /// - If `base_path` is empty: `data/{type_name}[/{safe_instrument_id}]`.
1461    ///
1462    /// # Examples
1463    ///
1464    /// ```rust,no_run
1465    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1466    ///
1467    /// let catalog = ParquetDataCatalog::new(/* ... */);
1468    ///
1469    /// // Path for all quote data
1470    /// let quotes_path = catalog.make_path("quotes", None)?;
1471    /// // Returns: "/base/path/data/quotes"
1472    ///
1473    /// // Path for specific instrument quotes
1474    /// let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
1475    /// // Returns: "/base/path/data/quotes/EURUSD" (slash removed)
1476    ///
1477    /// // Path for bar data with complex instrument ID
1478    /// let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
1479    /// // Returns: "/base/path/data/bars/BTCUSD-1H"
1480    /// # Ok::<(), anyhow::Error>(())
1481    /// ```
1482    pub fn make_path(
1483        &self,
1484        type_name: &str,
1485        instrument_id: Option<String>,
1486    ) -> anyhow::Result<String> {
1487        let mut path = if self.base_path.is_empty() {
1488            format!("data/{type_name}")
1489        } else {
1490            // Remove trailing slash from base_path to avoid double slashes
1491            let base_path = self.base_path.trim_end_matches('/');
1492            format!("{base_path}/data/{type_name}")
1493        };
1494
1495        if let Some(id) = instrument_id {
1496            path = format!("{}/{}", path, urisafe_instrument_id(&id));
1497        }
1498
1499        Ok(path)
1500    }
1501
1502    /// Helper method to rename a parquet file by moving it via object store operations
1503    fn rename_parquet_file(
1504        &self,
1505        directory: &str,
1506        old_start: u64,
1507        old_end: u64,
1508        new_start: u64,
1509        new_end: u64,
1510    ) -> anyhow::Result<()> {
1511        let old_filename =
1512            timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
1513        let old_path = format!("{directory}/{old_filename}");
1514        let old_object_path = self.to_object_path(&old_path);
1515
1516        let new_filename =
1517            timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
1518        let new_path = format!("{directory}/{new_filename}");
1519        let new_object_path = self.to_object_path(&new_path);
1520
1521        self.move_file(&old_object_path, &new_object_path)
1522    }
1523
1524    /// Converts a catalog path string to an [`ObjectPath`] for object store operations.
1525    ///
1526    /// This method handles the conversion between catalog-relative paths and object store paths,
1527    /// taking into account the catalog's base path configuration. It automatically strips the
1528    /// base path prefix when present to create the correct object store path.
1529    ///
1530    /// # Parameters
1531    ///
1532    /// - `path`: The catalog path string to convert. Can be absolute or relative.
1533    ///
1534    /// # Returns
1535    ///
1536    /// Returns an [`ObjectPath`] suitable for use with object store operations.
1537    ///
1538    /// # Path Handling
1539    ///
1540    /// - If `base_path` is empty, the path is used as-is.
1541    /// - If `base_path` is set, it's stripped from the path if present.
1542    /// - Trailing slashes are automatically handled.
1543    /// - The resulting path is relative to the object store root.
1544    ///
1545    /// # Examples
1546    ///
1547    /// ```rust,no_run
1548    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1549    ///
1550    /// let catalog = ParquetDataCatalog::new(/* ... */);
1551    ///
1552    /// // Convert a full catalog path
1553    /// let object_path = catalog.to_object_path("/base/data/quotes/file.parquet");
1554    /// // Returns: ObjectPath("data/quotes/file.parquet") if base_path is "/base"
1555    ///
1556    /// // Convert a relative path
1557    /// let object_path = catalog.to_object_path("data/trades/file.parquet");
1558    /// // Returns: ObjectPath("data/trades/file.parquet")
1559    /// ```
1560    #[must_use]
1561    pub fn to_object_path(&self, path: &str) -> ObjectPath {
1562        if self.base_path.is_empty() {
1563            return ObjectPath::from(path);
1564        }
1565
1566        let base = self.base_path.trim_end_matches('/');
1567
1568        // Remove the catalog base prefix if present
1569        let without_base = path
1570            .strip_prefix(&format!("{base}/"))
1571            .or_else(|| path.strip_prefix(base))
1572            .unwrap_or(path);
1573
1574        ObjectPath::from(without_base)
1575    }
1576
1577    /// Helper method to move a file using object store rename operation
1578    pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
1579        self.execute_async(async {
1580            self.object_store
1581                .rename(old_path, new_path)
1582                .await
1583                .map_err(anyhow::Error::from)
1584        })
1585    }
1586
1587    /// Helper method to execute async operations with a runtime
1588    pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
1589    where
1590        F: std::future::Future<Output = anyhow::Result<R>>,
1591    {
1592        let rt = nautilus_common::runtime::get_runtime();
1593        rt.block_on(future)
1594    }
1595}
1596
1597/// Trait for providing catalog path prefixes for different data types.
1598///
1599/// This trait enables type-safe organization of data within the catalog by providing
1600/// a standardized way to determine the directory structure for each data type.
1601/// Each data type maps to a specific subdirectory within the catalog's data folder.
1602///
1603/// # Implementation
1604///
1605/// Types implementing this trait should return a static string that represents
1606/// the directory name where data of that type should be stored.
1607///
1608/// # Examples
1609///
1610/// ```rust
1611/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
1612/// use nautilus_model::data::QuoteTick;
1613///
1614/// assert_eq!(QuoteTick::path_prefix(), "quotes");
1615/// ```
1616pub trait CatalogPathPrefix {
1617    /// Returns the path prefix (directory name) for this data type.
1618    ///
1619    /// # Returns
1620    ///
1621    /// A static string representing the directory name where this data type is stored.
1622    fn path_prefix() -> &'static str;
1623}
1624
1625/// Macro for implementing [`CatalogPathPrefix`] for data types.
1626///
1627/// This macro provides a convenient way to implement the trait for multiple types
1628/// with their corresponding path prefixes.
1629///
1630/// # Parameters
1631///
1632/// - `$type`: The data type to implement the trait for.
1633/// - `$path`: The path prefix string for that type.
1634macro_rules! impl_catalog_path_prefix {
1635    ($type:ty, $path:expr) => {
1636        impl CatalogPathPrefix for $type {
1637            fn path_prefix() -> &'static str {
1638                $path
1639            }
1640        }
1641    };
1642}
1643
1644// Standard implementations for financial data types
1645impl_catalog_path_prefix!(QuoteTick, "quotes");
1646impl_catalog_path_prefix!(TradeTick, "trades");
1647impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
1648impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
1649impl_catalog_path_prefix!(Bar, "bars");
1650impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
1651impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
1652impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
1653
1654////////////////////////////////////////////////////////////////////////////////
1655// Helper functions for filename operations
1656////////////////////////////////////////////////////////////////////////////////
1657
1658/// Converts timestamps to a filename using ISO 8601 format.
1659///
1660/// This function converts two Unix nanosecond timestamps to a filename that uses
1661/// ISO 8601 format with filesystem-safe characters. The format matches the Python
1662/// implementation for consistency.
1663///
1664/// # Parameters
1665///
1666/// - `timestamp_1`: First timestamp in Unix nanoseconds.
1667/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
1668///
1669/// # Returns
1670///
1671/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
1672///
1673/// # Examples
1674///
1675/// ```rust
1676/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
1677/// # use nautilus_core::UnixNanos;
1678/// let filename = timestamps_to_filename(
1679///     UnixNanos::from(1609459200000000000),
1680///     UnixNanos::from(1609545600000000000)
1681/// );
1682/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
1683/// ```
1684#[must_use]
1685pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
1686    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
1687    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
1688
1689    format!("{datetime_1}_{datetime_2}.parquet")
1690}
1691
1692/// Converts an ISO 8601 timestamp to a filesystem-safe format.
1693///
1694/// This function replaces colons and dots with hyphens to make the timestamp
1695/// safe for use in filenames across different filesystems.
1696///
1697/// # Parameters
1698///
1699/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1700///
1701/// # Returns
1702///
1703/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1704///
1705/// # Examples
1706///
1707/// ```rust
1708/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
1709/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
1710/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
1711/// ```
1712fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
1713    iso_timestamp.replace([':', '.'], "-")
1714}
1715
1716/// Converts a filesystem-safe timestamp back to ISO 8601 format.
1717///
1718/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
1719/// converting filesystem-safe timestamps back to standard ISO 8601 format.
1720///
1721/// # Parameters
1722///
1723/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1724///
1725/// # Returns
1726///
1727/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1728///
1729/// # Examples
1730///
1731/// ```rust
1732/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
1733/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
1734/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
1735/// ```
1736fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
1737    let (date_part, time_part) = file_timestamp
1738        .split_once('T')
1739        .unwrap_or((file_timestamp, ""));
1740    let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
1741
1742    // Find the last hyphen to separate nanoseconds
1743    if let Some(last_hyphen_idx) = time_part.rfind('-') {
1744        let time_with_dot_for_nanos = format!(
1745            "{}.{}",
1746            &time_part[..last_hyphen_idx],
1747            &time_part[last_hyphen_idx + 1..]
1748        );
1749        let final_time_part = time_with_dot_for_nanos.replace('-', ":");
1750        format!("{date_part}T{final_time_part}Z")
1751    } else {
1752        // Fallback if no nanoseconds part found
1753        let final_time_part = time_part.replace('-', ":");
1754        format!("{date_part}T{final_time_part}Z")
1755    }
1756}
1757
1758/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
1759///
1760/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
1761/// It's used to convert parsed timestamps back to the internal representation.
1762///
1763/// # Parameters
1764///
1765/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1766///
1767/// # Returns
1768///
1769/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
1770///
1771/// # Examples
1772///
1773/// ```rust
1774/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
1775/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
1776/// assert_eq!(nanos, 1609459200000000000);
1777/// ```
1778fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
1779    Ok(iso8601_to_unix_nanos(iso_timestamp.to_string())?.into())
1780}
1781
1782////////////////////////////////////////////////////////////////////////////////
1783// Helper functions for interval operations
1784////////////////////////////////////////////////////////////////////////////////
1785
1786/// Converts an instrument ID to a URI-safe format by removing forward slashes.
1787///
1788/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
1789/// suitable for use in file paths. This function removes these characters to
1790/// create a safe directory name.
1791///
1792/// # Parameters
1793///
1794/// - `instrument_id`: The original instrument ID string.
1795///
1796/// # Returns
1797///
1798/// A URI-safe version of the instrument ID with forward slashes removed.
1799///
1800/// # Examples
1801///
1802/// ```rust
1803/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
1804/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
1805/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
1806/// ```
1807fn urisafe_instrument_id(instrument_id: &str) -> String {
1808    instrument_id.replace('/', "")
1809}
1810
1811/// Checks if a filename's timestamp range intersects with a query interval.
1812///
1813/// This function determines whether a Parquet file (identified by its timestamp-based
1814/// filename) contains data that falls within the specified query time range.
1815///
1816/// # Parameters
1817///
1818/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
1819/// - `start`: Optional start timestamp for the query range.
1820/// - `end`: Optional end timestamp for the query range.
1821///
1822/// # Returns
1823///
1824/// Returns `true` if the file's time range intersects with the query range,
1825/// `false` otherwise. Returns `true` if the filename cannot be parsed.
1826///
1827/// # Examples
1828///
1829/// ```rust
1830/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
1831/// // Example with ISO format filenames
1832/// assert!(query_intersects_filename(
1833///     "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
1834///     Some(1609459200000000000),
1835///     Some(1609545600000000000)
1836/// ));
1837/// ```
1838fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
1839    if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
1840        (start.is_none() || start.unwrap() <= file_end)
1841            && (end.is_none() || file_start <= end.unwrap())
1842    } else {
1843        true
1844    }
1845}
1846
1847/// Parses timestamps from a Parquet filename.
1848///
1849/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
1850/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
1851///
1852/// # Parameters
1853///
1854/// - `filename`: The filename to parse (can be a full path).
1855///
1856/// # Returns
1857///
1858/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
1859/// `None` otherwise.
1860///
1861/// # Examples
1862///
1863/// ```rust
1864/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
1865/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
1866/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
1867/// ```
1868#[must_use]
1869pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
1870    let path = Path::new(filename);
1871    let base_name = path.file_name()?.to_str()?;
1872    let base_filename = base_name.strip_suffix(".parquet")?;
1873    let (first_part, second_part) = base_filename.split_once('_')?;
1874
1875    let first_iso = file_timestamp_to_iso_timestamp(first_part);
1876    let second_iso = file_timestamp_to_iso_timestamp(second_part);
1877
1878    let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
1879    let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
1880
1881    Some((first_ts, second_ts))
1882}
1883
1884/// Checks if a list of closed integer intervals are all mutually disjoint.
1885///
1886/// Two intervals are disjoint if they do not overlap. This function validates that
1887/// all intervals in the list are non-overlapping, which is a requirement for
1888/// maintaining data integrity in the catalog.
1889///
1890/// # Parameters
1891///
1892/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1893///
1894/// # Returns
1895///
1896/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
1897/// Returns `true` for empty lists or lists with a single interval.
1898///
1899/// # Examples
1900///
1901/// ```rust
1902/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
1903/// // Disjoint intervals
1904/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
1905///
1906/// // Overlapping intervals
1907/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
1908/// ```
1909#[must_use]
1910pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
1911    let n = intervals.len();
1912
1913    if n <= 1 {
1914        return true;
1915    }
1916
1917    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
1918    sorted_intervals.sort_by_key(|&(start, _)| start);
1919
1920    for i in 0..(n - 1) {
1921        let (_, end1) = sorted_intervals[i];
1922        let (start2, _) = sorted_intervals[i + 1];
1923
1924        if end1 >= start2 {
1925            return false;
1926        }
1927    }
1928
1929    true
1930}
1931
1932/// Checks if intervals are contiguous (adjacent with no gaps).
1933///
1934/// Intervals are contiguous if, when sorted by start time, each interval's start
1935/// timestamp is exactly one more than the previous interval's end timestamp.
1936/// This ensures complete coverage of a time range with no gaps.
1937///
1938/// # Parameters
1939///
1940/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
1941///
1942/// # Returns
1943///
1944/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
1945/// Returns `true` for empty lists or lists with a single interval.
1946///
1947/// # Examples
1948///
1949/// ```rust
1950/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
1951/// // Contiguous intervals
1952/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
1953///
1954/// // Non-contiguous intervals (gap between 5 and 8)
1955/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
1956/// ```
1957#[must_use]
1958pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
1959    let n = intervals.len();
1960    if n <= 1 {
1961        return true;
1962    }
1963
1964    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
1965    sorted_intervals.sort_by_key(|&(start, _)| start);
1966
1967    for i in 0..(n - 1) {
1968        let (_, end1) = sorted_intervals[i];
1969        let (start2, _) = sorted_intervals[i + 1];
1970
1971        if end1 + 1 != start2 {
1972            return false;
1973        }
1974    }
1975
1976    true
1977}
1978
1979/// Finds the parts of a query interval that are not covered by existing data intervals.
1980///
1981/// This function calculates the "gaps" in data coverage by comparing a requested
1982/// time range against the intervals covered by existing data files. It's used to
1983/// determine what data needs to be fetched or backfilled.
1984///
1985/// # Parameters
1986///
1987/// - `start`: Start timestamp of the query interval (inclusive).
1988/// - `end`: End timestamp of the query interval (inclusive).
1989/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
1990///
1991/// # Returns
1992///
1993/// Returns a vector of (start, end) tuples representing the gaps in coverage.
1994/// Returns an empty vector if the query range is invalid or fully covered.
1995///
1996/// # Examples
1997///
1998/// ```rust
1999/// # use nautilus_persistence::backend::catalog::query_interval_diff;
2000/// // Query 1-100, have data for 10-30 and 60-80
2001/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
2002/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
2003/// ```
2004fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
2005    if start > end {
2006        return Vec::new();
2007    }
2008
2009    let interval_set = get_interval_set(closed_intervals);
2010    let query_range = (Bound::Included(start), Bound::Included(end));
2011    let query_diff = interval_set.get_interval_difference(&query_range);
2012    let mut result: Vec<(u64, u64)> = Vec::new();
2013
2014    for interval in query_diff {
2015        if let Some(tuple) = interval_to_tuple(interval, start, end) {
2016            result.push(tuple);
2017        }
2018    }
2019
2020    result
2021}
2022
2023/// Creates an interval tree from closed integer intervals.
2024///
2025/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
2026/// for use with the interval tree data structure, which is used for efficient
2027/// interval operations and gap detection.
2028///
2029/// # Parameters
2030///
2031/// - `intervals`: A slice of closed intervals as (start, end) tuples.
2032///
2033/// # Returns
2034///
2035/// Returns an [`IntervalTree`] containing the converted intervals.
2036///
2037/// # Notes
2038///
2039/// - Invalid intervals (where start > end) are skipped.
2040/// - Uses saturating addition to prevent overflow when converting to half-open intervals.
2041fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
2042    let mut tree = IntervalTree::default();
2043
2044    if intervals.is_empty() {
2045        return tree;
2046    }
2047
2048    for &(start, end) in intervals {
2049        if start > end {
2050            continue;
2051        }
2052
2053        tree.insert((
2054            Bound::Included(start),
2055            Bound::Excluded(end.saturating_add(1)),
2056        ));
2057    }
2058
2059    tree
2060}
2061
2062/// Converts an interval tree result back to a closed interval tuple.
2063///
2064/// This helper function converts the bounded interval representation used by
2065/// the interval tree back into the (start, end) tuple format used throughout
2066/// the catalog.
2067///
2068/// # Parameters
2069///
2070/// - `interval`: The bounded interval from the interval tree.
2071/// - `query_start`: The start of the original query range.
2072/// - `query_end`: The end of the original query range.
2073///
2074/// # Returns
2075///
2076/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
2077fn interval_to_tuple(
2078    interval: (Bound<&u64>, Bound<&u64>),
2079    query_start: u64,
2080    query_end: u64,
2081) -> Option<(u64, u64)> {
2082    let (bound_start, bound_end) = interval;
2083
2084    let start = match bound_start {
2085        Bound::Included(val) => *val,
2086        Bound::Excluded(val) => val.saturating_add(1),
2087        Bound::Unbounded => query_start,
2088    };
2089
2090    let end = match bound_end {
2091        Bound::Included(val) => *val,
2092        Bound::Excluded(val) => {
2093            if *val == 0 {
2094                return None; // Empty interval
2095            }
2096            val - 1
2097        }
2098        Bound::Unbounded => query_end,
2099    };
2100
2101    if start <= end {
2102        Some((start, end))
2103    } else {
2104        None
2105    }
2106}