Skip to main content

nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a comprehensive data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores.
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.).
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying.
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance.
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation.
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │   └── INSTRUMENT_ID/
37//! │       └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │   └── INSTRUMENT_ID/
40//! │       └── start_ts-end_ts.parquet
41//! └── bars/
42//!     └── INSTRUMENT_ID/
43//!         └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//!     PathBuf::from("/path/to/data"),
55//!     None,        // storage_options
56//!     Some(5000),  // batch_size
57//!     None,        // compression (defaults to SNAPPY)
58//!     None,        // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66    fmt::Debug,
67    ops::Bound,
68    path::{Path, PathBuf},
69    sync::Arc,
70};
71
72use ahash::AHashMap;
73use datafusion::arrow::record_batch::RecordBatch;
74use futures::StreamExt;
75use heck::ToSnakeCase;
76use itertools::Itertools;
77use nautilus_common::live::get_runtime;
78use nautilus_core::{
79    UnixNanos,
80    datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
81};
82use nautilus_model::data::{
83    Bar, Data, FundingRateUpdate, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
84    OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose, to_variant,
85};
86use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
87use object_store::{ObjectStore, path::Path as ObjectPath};
88use serde::Serialize;
89use unbounded_interval_tree::interval_tree::IntervalTree;
90
91use super::session::{self, DataBackendSession, QueryResult, build_query};
92use crate::parquet::write_batches_to_object_store;
93
94/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
95///
96/// The `ParquetDataCatalog` provides a comprehensive solution for managing large volumes of financial
97/// market data with efficient storage, querying, and consolidation capabilities. It supports various
98/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
99///
100/// # Features
101///
102/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression.
103/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate.
104/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance.
105/// - **Data Validation**: Ensures timestamp ordering and interval consistency.
106/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed.
107/// - **Type Safety**: Strongly typed data handling with compile-time guarantees.
108///
109/// # Data Organization
110///
111/// Data is organized hierarchically by data type and instrument:
112/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`.
113/// - Files are named with their timestamp ranges for efficient range queries.
114/// - Intervals are validated to be disjoint to prevent data overlap.
115///
116/// # Performance Considerations
117///
118/// - **Batch Size**: Controls memory usage during data processing.
119/// - **Compression**: SNAPPY compression provides good balance of speed and size.
120/// - **Row Group Size**: Affects query performance and memory usage.
121/// - **File Consolidation**: Reduces the number of files for better query performance.
122pub struct ParquetDataCatalog {
123    /// The base path for data storage within the object store.
124    pub base_path: String,
125    /// The original URI provided when creating the catalog.
126    pub original_uri: String,
127    /// The object store backend for data persistence.
128    pub object_store: Arc<dyn ObjectStore>,
129    /// The DataFusion session for query execution.
130    pub session: DataBackendSession,
131    /// The number of records to process in each batch.
132    pub batch_size: usize,
133    /// The compression algorithm used for Parquet files.
134    pub compression: parquet::basic::Compression,
135    /// The maximum number of rows in each Parquet row group.
136    pub max_row_group_size: usize,
137}
138
139impl Debug for ParquetDataCatalog {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct(stringify!(ParquetDataCatalog))
142            .field("base_path", &self.base_path)
143            .finish()
144    }
145}
146
147impl ParquetDataCatalog {
148    /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
149    ///
150    /// This is a convenience constructor that converts a local path to a URI format
151    /// and delegates to [`Self::from_uri`].
152    ///
153    /// # Parameters
154    ///
155    /// - `base_path`: The base directory path for data storage.
156    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
157    /// - `batch_size`: Number of records to process in each batch (default: 5000).
158    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
159    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
160    ///
161    /// # Panics
162    ///
163    /// Panics if the path cannot be converted to a valid URI or if the object store
164    /// cannot be created from the path.
165    ///
166    /// # Examples
167    ///
168    /// ```rust,no_run
169    /// use std::path::PathBuf;
170    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
171    ///
172    /// let catalog = ParquetDataCatalog::new(
173    ///     PathBuf::from("/tmp/nautilus_data"),
174    ///     None,        // no storage options
175    ///     Some(1000),  // smaller batch size
176    ///     None,        // default compression
177    ///     None,        // default row group size
178    /// );
179    /// ```
180    #[must_use]
181    pub fn new(
182        base_path: PathBuf,
183        storage_options: Option<AHashMap<String, String>>,
184        batch_size: Option<usize>,
185        compression: Option<parquet::basic::Compression>,
186        max_row_group_size: Option<usize>,
187    ) -> Self {
188        let path_str = base_path.to_string_lossy().to_string();
189        Self::from_uri(
190            &path_str,
191            storage_options,
192            batch_size,
193            compression,
194            max_row_group_size,
195        )
196        .expect("Failed to create catalog from path")
197    }
198
199    /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
200    ///
201    /// Supports various URI schemes including local file paths and multiple cloud storage backends
202    /// supported by the `object_store` crate.
203    ///
204    /// # Supported URI Schemes
205    ///
206    /// - **AWS S3**: `s3://bucket/path`.
207    /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`.
208    /// - **Azure Blob Storage**: `az://container/path` or `abfs://container@account.dfs.core.windows.net/path`.
209    /// - **HTTP/WebDAV**: `http://` or `https://`.
210    /// - **Local files**: `file://path` or plain paths.
211    ///
212    /// # Parameters
213    ///
214    /// - `uri`: The URI for the data storage location.
215    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
216    ///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
217    ///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
218    ///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
219    /// - `batch_size`: Number of records to process in each batch (default: 5000).
220    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
221    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if:
226    /// - The URI format is invalid or unsupported.
227    /// - The object store cannot be created or accessed.
228    /// - Authentication fails for cloud storage backends.
229    ///
230    /// # Examples
231    ///
232    /// ```rust,no_run
233    /// use ahash::AHashMap;
234    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
235    ///
236    /// // Local filesystem
237    /// let local_catalog = ParquetDataCatalog::from_uri(
238    ///     "/tmp/nautilus_data",
239    ///     None, None, None, None
240    /// )?;
241    ///
242    /// // S3 bucket
243    /// let s3_catalog = ParquetDataCatalog::from_uri(
244    ///     "s3://my-bucket/nautilus-data",
245    ///     None, None, None, None
246    /// )?;
247    ///
248    /// // Google Cloud Storage
249    /// let gcs_catalog = ParquetDataCatalog::from_uri(
250    ///     "gs://my-bucket/nautilus-data",
251    ///     None, None, None, None
252    /// )?;
253    ///
254    /// // Azure Blob Storage
255    /// let azure_catalog = ParquetDataCatalog::from_uri(
256    ///     "az://container/nautilus-data",
257    ///     storage_options, None, None, None
258    /// )?;
259    ///
260    /// // S3 with custom endpoint and credentials
261    /// let mut storage_options = HashMap::new();
262    /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
263    /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
264    /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
265    ///
266    /// let s3_catalog = ParquetDataCatalog::from_uri(
267    ///     "s3://my-bucket/nautilus-data",
268    ///     Some(storage_options),
269    ///     None, None, None,
270    /// )?;
271    /// # Ok::<(), anyhow::Error>(())
272    /// ```
273    pub fn from_uri(
274        uri: &str,
275        storage_options: Option<AHashMap<String, String>>,
276        batch_size: Option<usize>,
277        compression: Option<parquet::basic::Compression>,
278        max_row_group_size: Option<usize>,
279    ) -> anyhow::Result<Self> {
280        let batch_size = batch_size.unwrap_or(5000);
281        let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
282        let max_row_group_size = max_row_group_size.unwrap_or(5000);
283
284        let (object_store, base_path, original_uri) =
285            crate::parquet::create_object_store_from_path(uri, storage_options)?;
286
287        Ok(Self {
288            base_path,
289            original_uri,
290            object_store,
291            session: session::DataBackendSession::new(batch_size),
292            batch_size,
293            compression,
294            max_row_group_size,
295        })
296    }
297
298    /// Returns the base path of the catalog for testing purposes.
299    #[must_use]
300    pub fn get_base_path(&self) -> String {
301        self.base_path.clone()
302    }
303
304    /// Resets the backend session to clear any cached table registrations.
305    ///
306    /// This is useful during catalog operations when files are being modified
307    /// and we need to ensure fresh data is loaded.
308    pub fn reset_session(&mut self) {
309        self.session.clear_registered_tables();
310    }
311
312    /// Writes mixed data types to the catalog by separating them into type-specific collections.
313    ///
314    /// This method takes a heterogeneous collection of market data and separates it by type,
315    /// then writes each type to its appropriate location in the catalog. This is useful when
316    /// processing mixed data streams or bulk data imports.
317    ///
318    /// # Parameters
319    ///
320    /// - `data`: A vector of mixed [`Data`] enum variants.
321    /// - `start`: Optional start timestamp to override the data's natural range.
322    /// - `end`: Optional end timestamp to override the data's natural range.
323    ///
324    /// # Notes
325    ///
326    /// - Data is automatically sorted by type before writing.
327    /// - Each data type is written to its own directory structure.
328    /// - Instrument data handling is not yet implemented (TODO).
329    ///
330    /// # Examples
331    ///
332    /// ```rust,no_run
333    /// use nautilus_model::data::Data;
334    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
335    ///
336    /// let catalog = ParquetDataCatalog::new(/* ... */);
337    /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
338    ///
339    /// catalog.write_data_enum(mixed_data, None, None)?;
340    /// ```
341    pub fn write_data_enum(
342        &self,
343        data: Vec<Data>,
344        start: Option<UnixNanos>,
345        end: Option<UnixNanos>,
346    ) -> anyhow::Result<()> {
347        let mut deltas: Vec<OrderBookDelta> = Vec::new();
348        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
349        let mut quotes: Vec<QuoteTick> = Vec::new();
350        let mut trades: Vec<TradeTick> = Vec::new();
351        let mut bars: Vec<Bar> = Vec::new();
352        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
353        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
354        let mut closes: Vec<InstrumentClose> = Vec::new();
355
356        for d in data.iter().cloned() {
357            match d {
358                Data::Deltas(_) => continue,
359                Data::Delta(d) => {
360                    deltas.push(d);
361                }
362                Data::Depth10(d) => {
363                    depth10s.push(*d);
364                }
365                Data::Quote(d) => {
366                    quotes.push(d);
367                }
368                Data::Trade(d) => {
369                    trades.push(d);
370                }
371                Data::Bar(d) => {
372                    bars.push(d);
373                }
374                Data::MarkPriceUpdate(p) => {
375                    mark_prices.push(p);
376                }
377                Data::IndexPriceUpdate(p) => {
378                    index_prices.push(p);
379                }
380                Data::InstrumentClose(c) => {
381                    closes.push(c);
382                }
383            }
384        }
385
386        // TODO: need to handle instruments here
387
388        self.write_to_parquet(deltas, start, end, None)?;
389        self.write_to_parquet(depth10s, start, end, None)?;
390        self.write_to_parquet(quotes, start, end, None)?;
391        self.write_to_parquet(trades, start, end, None)?;
392        self.write_to_parquet(bars, start, end, None)?;
393        self.write_to_parquet(mark_prices, start, end, None)?;
394        self.write_to_parquet(index_prices, start, end, None)?;
395        self.write_to_parquet(closes, start, end, None)?;
396
397        Ok(())
398    }
399
400    /// Writes typed data to a Parquet file in the catalog.
401    ///
402    /// This is the core method for persisting market data to the catalog. It handles data
403    /// validation, batching, compression, and ensures proper file organization with
404    /// timestamp-based naming.
405    ///
406    /// # Type Parameters
407    ///
408    /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
409    ///
410    /// # Parameters
411    ///
412    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
413    /// - `start`: Optional start timestamp to override the natural data range.
414    /// - `end`: Optional end timestamp to override the natural data range.
415    ///
416    /// # Returns
417    ///
418    /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
419    /// If the target file already exists, returns the path without writing (skips write).
420    ///
421    /// # Errors
422    ///
423    /// Returns an error if:
424    /// - Data serialization to Arrow record batches fails.
425    /// - Object store write operations fail.
426    /// - File path construction fails.
427    /// - Writing would create non-disjoint timestamp intervals.
428    ///
429    /// # Panics
430    ///
431    /// Panics if:
432    /// - Data timestamps are not in ascending order.
433    /// - Record batches are empty after conversion.
434    /// - Required metadata is missing from the schema.
435    ///
436    /// # Examples
437    ///
438    /// ```rust,no_run
439    /// use nautilus_model::data::QuoteTick;
440    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
441    ///
442    /// let catalog = ParquetDataCatalog::new(/* ... */);
443    /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
444    ///
445    /// let path = catalog.write_to_parquet(quotes, None, None)?;
446    /// println!("Data written to: {:?}", path);
447    /// # Ok::<(), anyhow::Error>(())
448    /// ```
449    pub fn write_to_parquet<T>(
450        &self,
451        data: Vec<T>,
452        start: Option<UnixNanos>,
453        end: Option<UnixNanos>,
454        skip_disjoint_check: Option<bool>,
455    ) -> anyhow::Result<PathBuf>
456    where
457        T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
458    {
459        if data.is_empty() {
460            return Ok(PathBuf::new());
461        }
462
463        let type_name = std::any::type_name::<T>().to_snake_case();
464        Self::check_ascending_timestamps(&data, &type_name)?;
465
466        let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
467        let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
468
469        let batches = self.data_to_record_batches(data)?;
470        let schema = batches.first().expect("Batches are empty.").schema();
471        let instrument_id = schema.metadata.get("instrument_id").cloned();
472
473        let directory = self.make_path(T::path_prefix(), instrument_id)?;
474        let filename = timestamps_to_filename(start_ts, end_ts);
475        let path = PathBuf::from(format!("{directory}/{filename}"));
476        let object_path = self.to_object_path(&path.to_string_lossy());
477
478        let file_exists =
479            self.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
480        if file_exists {
481            log::info!("File {path:?} already exists, skipping write");
482            return Ok(path);
483        }
484
485        if !skip_disjoint_check.unwrap_or(false) {
486            let current_intervals = self.get_directory_intervals(&directory)?;
487            let new_interval = (start_ts.as_u64(), end_ts.as_u64());
488            let mut new_intervals = current_intervals.clone();
489            new_intervals.push(new_interval);
490
491            if !are_intervals_disjoint(&new_intervals) {
492                anyhow::bail!(
493                    "Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
494                    non-disjoint intervals. Existing intervals: {current_intervals:?}"
495                );
496            }
497        }
498
499        log::info!(
500            "Writing {} batches of {type_name} data to {path:?}",
501            batches.len()
502        );
503
504        self.execute_async(async {
505            write_batches_to_object_store(
506                &batches,
507                self.object_store.clone(),
508                &object_path,
509                Some(self.compression),
510                Some(self.max_row_group_size),
511            )
512            .await
513        })?;
514
515        Ok(path)
516    }
517
518    /// Writes typed data to a JSON file in the catalog.
519    ///
520    /// This method provides an alternative to Parquet format for data export and debugging.
521    /// JSON files are human-readable but less efficient for large datasets.
522    ///
523    /// # Type Parameters
524    ///
525    /// - `T`: The data type to write, must implement serialization and cataloging traits.
526    ///
527    /// # Parameters
528    ///
529    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
530    /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
531    /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
532    ///
533    /// # Returns
534    ///
535    /// Returns the [`PathBuf`] of the created JSON file.
536    ///
537    /// # Errors
538    ///
539    /// Returns an error if:
540    /// - JSON serialization fails.
541    /// - Object store write operations fail.
542    /// - File path construction fails.
543    ///
544    /// # Panics
545    ///
546    /// Panics if data timestamps are not in ascending order.
547    ///
548    /// # Examples
549    ///
550    /// ```rust,no_run
551    /// use std::path::PathBuf;
552    /// use nautilus_model::data::TradeTick;
553    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
554    ///
555    /// let catalog = ParquetDataCatalog::new(/* ... */);
556    /// let trades: Vec<TradeTick> = vec![/* trade data */];
557    ///
558    /// let path = catalog.write_to_json(
559    ///     trades,
560    ///     Some(PathBuf::from("/custom/path")),
561    ///     true  // write metadata
562    /// )?;
563    /// # Ok::<(), anyhow::Error>(())
564    /// ```
565    pub fn write_to_json<T>(
566        &self,
567        data: Vec<T>,
568        path: Option<PathBuf>,
569        write_metadata: bool,
570    ) -> anyhow::Result<PathBuf>
571    where
572        T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
573    {
574        if data.is_empty() {
575            return Ok(PathBuf::new());
576        }
577
578        let type_name = std::any::type_name::<T>().to_snake_case();
579        Self::check_ascending_timestamps(&data, &type_name)?;
580
581        let start_ts = data.first().unwrap().ts_init();
582        let end_ts = data.last().unwrap().ts_init();
583
584        let directory =
585            path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
586        let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
587        let json_path = directory.join(&filename);
588
589        log::info!(
590            "Writing {} records of {type_name} data to {json_path:?}",
591            data.len()
592        );
593
594        if write_metadata {
595            let metadata = T::chunk_metadata(&data);
596            let metadata_path = json_path.with_extension("metadata.json");
597            log::info!("Writing metadata to {metadata_path:?}");
598
599            // Use object store for metadata file
600            let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
601            let metadata_json = serde_json::to_vec_pretty(&metadata)?;
602            self.execute_async(async {
603                self.object_store
604                    .put(&metadata_object_path, metadata_json.into())
605                    .await
606                    .map_err(anyhow::Error::from)
607            })?;
608        }
609
610        // Use object store for main JSON file
611        let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
612        let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
613        self.execute_async(async {
614            self.object_store
615                .put(&json_object_path, json_data.into())
616                .await
617                .map_err(anyhow::Error::from)
618        })?;
619
620        Ok(json_path)
621    }
622
623    /// Validates that data timestamps are in ascending order.
624    ///
625    /// # Parameters
626    ///
627    /// - `data`: Slice of data records to validate.
628    /// - `type_name`: Name of the data type for error messages.
629    ///
630    /// # Panics
631    ///
632    /// Panics if any timestamp is less than the previous timestamp.
633    pub fn check_ascending_timestamps<T: HasTsInit>(
634        data: &[T],
635        type_name: &str,
636    ) -> anyhow::Result<()> {
637        if !data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()) {
638            anyhow::bail!("{type_name} timestamps must be in ascending order");
639        }
640
641        Ok(())
642    }
643
644    /// Converts data into Arrow record batches for Parquet serialization.
645    ///
646    /// This method chunks the data according to the configured batch size and converts
647    /// each chunk into an Arrow record batch with appropriate metadata.
648    ///
649    /// # Type Parameters
650    ///
651    /// - `T`: The data type to convert, must implement required encoding traits.
652    ///
653    /// # Parameters
654    ///
655    /// - `data`: Vector of data records to convert.
656    ///
657    /// # Returns
658    ///
659    /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
660    ///
661    /// # Errors
662    ///
663    /// Returns an error if record batch encoding fails for any chunk.
664    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
665    where
666        T: HasTsInit + EncodeToRecordBatch,
667    {
668        let mut batches = Vec::new();
669
670        for chunk in &data.into_iter().chunks(self.batch_size) {
671            let data = chunk.collect_vec();
672            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
673            let record_batch = T::encode_batch(&metadata, &data)?;
674            batches.push(record_batch);
675        }
676
677        Ok(batches)
678    }
679
680    /// Extends the timestamp range of an existing Parquet file by renaming it.
681    ///
682    /// This method finds an existing file that is adjacent to the specified time range
683    /// and renames it to include the new range. This is useful when appending data
684    /// that extends the time coverage of existing files.
685    ///
686    /// # Parameters
687    ///
688    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
689    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
690    /// - `start`: Start timestamp of the new range to extend to.
691    /// - `end`: End timestamp of the new range to extend to.
692    ///
693    /// # Returns
694    ///
695    /// Returns `Ok(())` on success, or an error if the operation fails.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if:
700    /// - The directory path cannot be constructed.
701    /// - No adjacent file is found to extend.
702    /// - File rename operations fail.
703    /// - Interval validation fails after extension.
704    ///
705    /// # Examples
706    ///
707    /// ```rust,no_run
708    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
709    /// use nautilus_core::UnixNanos;
710    ///
711    /// let catalog = ParquetDataCatalog::new(/* ... */);
712    ///
713    /// // Extend a file's range backwards or forwards
714    /// catalog.extend_file_name(
715    ///     "quotes",
716    ///     Some("BTCUSD".to_string()),
717    ///     UnixNanos::from(1609459200000000000),
718    ///     UnixNanos::from(1609545600000000000)
719    /// )?;
720    /// # Ok::<(), anyhow::Error>(())
721    /// ```
722    pub fn extend_file_name(
723        &self,
724        data_cls: &str,
725        instrument_id: Option<String>,
726        start: UnixNanos,
727        end: UnixNanos,
728    ) -> anyhow::Result<()> {
729        let directory = self.make_path(data_cls, instrument_id)?;
730        let intervals = self.get_directory_intervals(&directory)?;
731
732        let start = start.as_u64();
733        let end = end.as_u64();
734
735        for interval in intervals {
736            if interval.0 == end + 1 {
737                // Extend backwards: new file covers [start, interval.1]
738                self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
739                break;
740            } else if interval.1 == start - 1 {
741                // Extend forwards: new file covers [interval.0, end]
742                self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
743                break;
744            }
745        }
746
747        let intervals = self.get_directory_intervals(&directory)?;
748
749        if !are_intervals_disjoint(&intervals) {
750            anyhow::bail!("Intervals are not disjoint after extending a file");
751        }
752
753        Ok(())
754    }
755
756    /// Lists all Parquet files in a specified directory.
757    ///
758    /// This method scans a directory and returns the full paths of all files with the `.parquet`
759    /// extension. It works with both local filesystems and remote object stores, making it
760    /// suitable for various storage backends.
761    ///
762    /// # Parameters
763    ///
764    /// - `directory`: The directory path to scan for Parquet files.
765    ///
766    /// # Returns
767    ///
768    /// Returns a vector of full file paths (as strings) for all Parquet files found in the directory.
769    /// The paths are relative to the object store root and suitable for use with object store operations.
770    /// Returns an empty vector if the directory doesn't exist or contains no Parquet files.
771    ///
772    /// # Errors
773    ///
774    /// Returns an error if:
775    /// - Object store listing operations fail.
776    /// - Directory access is denied.
777    /// - Network issues occur (for remote object stores).
778    ///
779    /// # Notes
780    ///
781    /// - Only files ending with `.parquet` are included.
782    /// - Subdirectories are not recursively scanned.
783    /// - File paths are returned in the order provided by the object store.
784    /// - Works with all supported object store backends (local, S3, GCS, Azure, etc.).
785    ///
786    /// # Examples
787    ///
788    /// ```rust,no_run
789    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
790    ///
791    /// let catalog = ParquetDataCatalog::new(/* ... */);
792    /// let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
793    ///
794    /// for file in files {
795    ///     println!("Found Parquet file: {}", file);
796    /// }
797    /// # Ok::<(), anyhow::Error>(())
798    /// ```
799    pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
800        self.execute_async(async {
801            let prefix = ObjectPath::from(format!("{directory}/"));
802            let mut stream = self.object_store.list(Some(&prefix));
803            let mut files = Vec::new();
804
805            while let Some(object) = stream.next().await {
806                let object = object?;
807                if object.location.as_ref().ends_with(".parquet") {
808                    files.push(object.location.to_string());
809                }
810            }
811            Ok::<Vec<String>, anyhow::Error>(files)
812        })
813    }
814
815    /// Helper method to reconstruct full URI for remote object store paths
816    #[must_use]
817    pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
818        // Check if this is a remote URI scheme that needs reconstruction
819        if self.is_remote_uri() {
820            // Extract the base URL (scheme + host) from the original URI
821            if let Ok(url) = url::Url::parse(&self.original_uri)
822                && let Some(host) = url.host_str()
823            {
824                return format!("{}://{}/{}", url.scheme(), host, path_str);
825            }
826        }
827
828        // For local paths, extract the directory from the original URI
829        if self.original_uri.starts_with("file://") {
830            // Extract the path from the file:// URI
831            if let Ok(url) = url::Url::parse(&self.original_uri)
832                && let Ok(base_path) = url.to_file_path()
833            {
834                // Use platform-appropriate path separator for display
835                // but object store paths always use forward slashes
836                let base_str = base_path.to_string_lossy();
837                return self.join_paths(&base_str, path_str);
838            }
839        }
840
841        // For local paths without file:// prefix, use the original URI as base
842        if self.base_path.is_empty() {
843            // If base_path is empty and not a file URI, try using original_uri as base
844            if self.original_uri.contains("://") {
845                // Fallback: return the path as-is
846                path_str.to_string()
847            } else {
848                self.join_paths(self.original_uri.trim_end_matches('/'), path_str)
849            }
850        } else {
851            let base = self.base_path.trim_end_matches('/');
852            self.join_paths(base, path_str)
853        }
854    }
855
856    /// Helper method to join paths using forward slashes (object store convention)
857    #[must_use]
858    fn join_paths(&self, base: &str, path: &str) -> String {
859        make_object_store_path(base, &[path])
860    }
861
862    /// Helper method to check if the original URI uses a remote object store scheme
863    #[must_use]
864    pub fn is_remote_uri(&self) -> bool {
865        self.original_uri.starts_with("s3://")
866            || self.original_uri.starts_with("gs://")
867            || self.original_uri.starts_with("gcs://")
868            || self.original_uri.starts_with("az://")
869            || self.original_uri.starts_with("abfs://")
870            || self.original_uri.starts_with("http://")
871            || self.original_uri.starts_with("https://")
872    }
873
874    /// Executes a query against the catalog to retrieve market data of a specific type.
875    ///
876    /// This is the primary method for querying data from the catalog. It registers the appropriate
877    /// object store with the DataFusion session, finds all relevant Parquet files, and executes
878    /// the query across them. The method supports filtering by instrument IDs, time ranges, and
879    /// custom SQL WHERE clauses.
880    ///
881    /// # Type Parameters
882    ///
883    /// - `T`: The data type to query, must implement required traits for deserialization and cataloging.
884    ///
885    /// # Parameters
886    ///
887    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
888    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
889    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
890    /// - `where_clause`: Optional SQL WHERE clause for additional filtering (e.g., "price > 100").
891    ///
892    /// # Returns
893    ///
894    /// Returns a [`QueryResult`] containing the query execution context and data.
895    /// Use [`QueryResult::collect()`] to retrieve the actual data records.
896    ///
897    /// # Errors
898    ///
899    /// Returns an error if:
900    /// - Object store registration fails for remote URIs.
901    /// - File discovery fails.
902    /// - DataFusion query execution fails.
903    /// - Data deserialization fails.
904    ///
905    /// # Performance Notes
906    ///
907    /// - Files are automatically filtered by timestamp ranges before querying.
908    /// - DataFusion optimizes queries across multiple Parquet files.
909    /// - Use specific instrument IDs and time ranges to improve performance.
910    /// - WHERE clauses are pushed down to the Parquet reader when possible.
911    ///
912    /// # Examples
913    ///
914    /// ```rust,no_run
915    /// use nautilus_model::data::QuoteTick;
916    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
917    /// use nautilus_core::UnixNanos;
918    ///
919    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
920    ///
921    /// // Query all quote data
922    /// let result = catalog.query::<QuoteTick>(None, None, None, None)?;
923    /// let quotes = result.collect();
924    ///
925    /// // Query specific instruments within a time range
926    /// let result = catalog.query::<QuoteTick>(
927    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
928    ///     Some(UnixNanos::from(1609459200000000000)),
929    ///     Some(UnixNanos::from(1609545600000000000)),
930    ///     None
931    /// )?;
932    ///
933    /// // Query with custom WHERE clause
934    /// let result = catalog.query::<QuoteTick>(
935    ///     Some(vec!["EURUSD".to_string()]),
936    ///     None,
937    ///     None,
938    ///     Some("bid_price > 1.2000")
939    /// )?;
940    /// # Ok::<(), anyhow::Error>(())
941    /// ```
942    pub fn query<T>(
943        &mut self,
944        instrument_ids: Option<Vec<String>>,
945        start: Option<UnixNanos>,
946        end: Option<UnixNanos>,
947        where_clause: Option<&str>,
948        files: Option<Vec<String>>,
949    ) -> anyhow::Result<QueryResult>
950    where
951        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
952    {
953        // Register the object store with the session for remote URIs
954        if self.is_remote_uri() {
955            let url = url::Url::parse(&self.original_uri)?;
956            let host = url
957                .host_str()
958                .ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
959            let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
960            self.session
961                .register_object_store(&base_url, self.object_store.clone());
962        }
963
964        let files_list = if let Some(files) = files {
965            files
966        } else {
967            self.query_files(T::path_prefix(), instrument_ids, start, end)?
968        };
969
970        for file_uri in &files_list {
971            // Extract identifier from file path and filename to create meaningful table names
972            let identifier = extract_identifier_from_path(file_uri);
973            let safe_sql_identifier = make_sql_safe_identifier(&identifier);
974            let safe_filename = extract_sql_safe_filename(file_uri);
975
976            // Create table name from path_prefix, identifier, and filename
977            let table_name = format!(
978                "{}_{}_{}",
979                T::path_prefix(),
980                safe_sql_identifier,
981                safe_filename
982            );
983            let query = build_query(&table_name, start, end, where_clause);
984
985            // Convert object store path to filesystem path for DataFusion
986            // Only apply reconstruction if the path is not already absolute
987            let resolved_path = if file_uri.starts_with('/') {
988                // Path is already absolute, use as-is
989                file_uri.clone()
990            } else {
991                // Path is relative, reconstruct full URI
992                self.reconstruct_full_uri(file_uri)
993            };
994            self.session
995                .add_file::<T>(&table_name, &resolved_path, Some(&query))?;
996        }
997
998        Ok(self.session.get_query_result())
999    }
1000
1001    /// Queries typed data from the catalog and returns results as a strongly-typed vector.
1002    ///
1003    /// This is a convenience method that wraps the generic `query` method and automatically
1004    /// collects and converts the results into a vector of the specific data type. It handles
1005    /// the type conversion from the generic [`Data`] enum to the concrete type `T`.
1006    ///
1007    /// # Type Parameters
1008    ///
1009    /// - `T`: The specific data type to query and return. Must implement required traits for
1010    ///   deserialization, cataloging, and conversion from the [`Data`] enum.
1011    ///
1012    /// # Parameters
1013    ///
1014    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
1015    ///   For exact matches, provide the full instrument ID. For bars, partial matches are supported.
1016    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1017    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1018    /// - `where_clause`: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax
1019    ///   with column names matching the Parquet schema (e.g., "`bid_price` > 1.2000", "volume > 1000").
1020    ///
1021    /// # Returns
1022    ///
1023    /// Returns a vector of the specific data type `T`, sorted by timestamp. The vector will be
1024    /// empty if no data matches the query criteria.
1025    ///
1026    /// # Errors
1027    ///
1028    /// Returns an error if:
1029    /// - The underlying query execution fails.
1030    /// - Data type conversion fails.
1031    /// - Object store access fails.
1032    /// - Invalid WHERE clause syntax is provided.
1033    ///
1034    /// # Performance Considerations
1035    ///
1036    /// - Use specific instrument IDs and time ranges to minimize data scanning.
1037    /// - WHERE clauses are pushed down to Parquet readers when possible.
1038    /// - Results are automatically sorted by timestamp during collection.
1039    /// - Memory usage scales with the amount of data returned.
1040    ///
1041    /// # Examples
1042    ///
1043    /// ```rust,no_run
1044    /// use nautilus_model::data::{QuoteTick, TradeTick, Bar};
1045    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1046    /// use nautilus_core::UnixNanos;
1047    ///
1048    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1049    ///
1050    /// // Query all quotes for a specific instrument
1051    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1052    ///     Some(vec!["EURUSD".to_string()]),
1053    ///     None,
1054    ///     None,
1055    ///     None
1056    /// )?;
1057    ///
1058    /// // Query trades within a specific time range
1059    /// let trades: Vec<TradeTick> = catalog.query_typed_data(
1060    ///     Some(vec!["BTCUSD".to_string()]),
1061    ///     Some(UnixNanos::from(1609459200000000000)),
1062    ///     Some(UnixNanos::from(1609545600000000000)),
1063    ///     None
1064    /// )?;
1065    ///
1066    /// // Query bars with volume filter
1067    /// let bars: Vec<Bar> = catalog.query_typed_data(
1068    ///     Some(vec!["AAPL".to_string()]),
1069    ///     None,
1070    ///     None,
1071    ///     Some("volume > 1000000")
1072    /// )?;
1073    ///
1074    /// // Query multiple instruments with price filter
1075    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1076    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
1077    ///     None,
1078    ///     None,
1079    ///     Some("bid_price > 1.2000 AND ask_price < 1.3000")
1080    /// )?;
1081    /// # Ok::<(), anyhow::Error>(())
1082    /// ```
1083    pub fn query_typed_data<T>(
1084        &mut self,
1085        instrument_ids: Option<Vec<String>>,
1086        start: Option<UnixNanos>,
1087        end: Option<UnixNanos>,
1088        where_clause: Option<&str>,
1089        files: Option<Vec<String>>,
1090    ) -> anyhow::Result<Vec<T>>
1091    where
1092        T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
1093    {
1094        // Reset session to allow repeated queries (streams are consumed on each query)
1095        self.reset_session();
1096
1097        let query_result = self.query::<T>(instrument_ids, start, end, where_clause, files)?;
1098        let all_data = query_result.collect();
1099
1100        // Convert Data enum variants to specific type T using to_variant
1101        Ok(to_variant::<T>(all_data))
1102    }
1103
1104    /// Queries all Parquet files for a specific data type and optional instrument IDs.
1105    ///
1106    /// This method finds all Parquet files that match the specified criteria and returns
1107    /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1108    /// and timestamp range (if provided).
1109    ///
1110    /// # Parameters
1111    ///
1112    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1113    /// - `instrument_ids`: Optional list of instrument IDs to filter by.
1114    /// - `start`: Optional start timestamp to filter files by their time range.
1115    /// - `end`: Optional end timestamp to filter files by their time range.
1116    ///
1117    /// # Returns
1118    ///
1119    /// Returns a vector of file URI strings that match the query criteria,
1120    /// or an error if the query fails.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Returns an error if:
1125    /// - The directory path cannot be constructed.
1126    /// - Object store listing operations fail.
1127    /// - URI reconstruction fails.
1128    ///
1129    /// # Examples
1130    ///
1131    /// ```rust,no_run
1132    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1133    /// use nautilus_core::UnixNanos;
1134    ///
1135    /// let catalog = ParquetDataCatalog::new(/* ... */);
1136    ///
1137    /// // Query all quote files
1138    /// let files = catalog.query_files("quotes", None, None, None)?;
1139    ///
1140    /// // Query trade files for specific instruments within a time range
1141    /// let files = catalog.query_files(
1142    ///     "trades",
1143    ///     Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
1144    ///     Some(UnixNanos::from(1609459200000000000)),
1145    ///     Some(UnixNanos::from(1609545600000000000))
1146    /// )?;
1147    /// # Ok::<(), anyhow::Error>(())
1148    /// ```
1149    pub fn query_files(
1150        &self,
1151        data_cls: &str,
1152        instrument_ids: Option<Vec<String>>,
1153        start: Option<UnixNanos>,
1154        end: Option<UnixNanos>,
1155    ) -> anyhow::Result<Vec<String>> {
1156        let mut files = Vec::new();
1157
1158        let start_u64 = start.map(|s| s.as_u64());
1159        let end_u64 = end.map(|e| e.as_u64());
1160
1161        let base_dir = self.make_path(data_cls, None)?;
1162
1163        // Use recursive listing to match Python's glob behavior
1164        let list_result = self.execute_async(async {
1165            let prefix = ObjectPath::from(format!("{base_dir}/"));
1166            let mut stream = self.object_store.list(Some(&prefix));
1167            let mut objects = Vec::new();
1168            while let Some(object) = stream.next().await {
1169                objects.push(object?);
1170            }
1171            Ok::<Vec<_>, anyhow::Error>(objects)
1172        })?;
1173
1174        let mut file_paths: Vec<String> = list_result
1175            .into_iter()
1176            .filter_map(|object| {
1177                let path_str = object.location.to_string();
1178                if path_str.ends_with(".parquet") {
1179                    Some(path_str)
1180                } else {
1181                    None
1182                }
1183            })
1184            .collect();
1185
1186        // Apply identifier filtering if provided
1187        if let Some(identifiers) = instrument_ids {
1188            let safe_identifiers: Vec<String> = identifiers
1189                .iter()
1190                .map(|id| urisafe_instrument_id(id))
1191                .collect();
1192
1193            // Exact match by default for instrument_ids or bar_types
1194            let exact_match_file_paths: Vec<String> = file_paths
1195                .iter()
1196                .filter(|file_path| {
1197                    // Extract the directory name (second to last path component)
1198                    let path_parts: Vec<&str> = file_path.split('/').collect();
1199                    if path_parts.len() >= 2 {
1200                        let dir_name = path_parts[path_parts.len() - 2];
1201                        safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
1202                    } else {
1203                        false
1204                    }
1205                })
1206                .cloned()
1207                .collect();
1208
1209            if exact_match_file_paths.is_empty() && data_cls == "bars" {
1210                // Partial match of instrument_ids in bar_types for bars
1211                file_paths.retain(|file_path| {
1212                    let path_parts: Vec<&str> = file_path.split('/').collect();
1213                    if path_parts.len() >= 2 {
1214                        let dir_name = path_parts[path_parts.len() - 2];
1215                        safe_identifiers
1216                            .iter()
1217                            .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
1218                    } else {
1219                        false
1220                    }
1221                });
1222            } else {
1223                file_paths = exact_match_file_paths;
1224            }
1225        }
1226
1227        // Apply timestamp filtering
1228        file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
1229
1230        // Convert to full URIs
1231        for file_path in file_paths {
1232            let full_uri = self.reconstruct_full_uri(&file_path);
1233            files.push(full_uri);
1234        }
1235
1236        Ok(files)
1237    }
1238
1239    /// Finds the missing time intervals for a specific data type and instrument ID.
1240    ///
1241    /// This method compares a requested time range against the existing data coverage
1242    /// and returns the gaps that need to be filled. This is useful for determining
1243    /// what data needs to be fetched or backfilled.
1244    ///
1245    /// # Parameters
1246    ///
1247    /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
1248    /// - `end`: End timestamp of the requested range (Unix nanoseconds).
1249    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1250    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1251    ///
1252    /// # Returns
1253    ///
1254    /// Returns a vector of (start, end) tuples representing the missing intervals,
1255    /// or an error if the operation fails.
1256    ///
1257    /// # Errors
1258    ///
1259    /// Returns an error if:
1260    /// - The directory path cannot be constructed.
1261    /// - Interval retrieval fails.
1262    /// - Gap calculation fails.
1263    ///
1264    /// # Examples
1265    ///
1266    /// ```rust,no_run
1267    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1268    ///
1269    /// let catalog = ParquetDataCatalog::new(/* ... */);
1270    ///
1271    /// // Find missing intervals for quote data
1272    /// let missing = catalog.get_missing_intervals_for_request(
1273    ///     1609459200000000000,  // start
1274    ///     1609545600000000000,  // end
1275    ///     "quotes",
1276    ///     Some("BTCUSD".to_string())
1277    /// )?;
1278    ///
1279    /// for (start, end) in missing {
1280    ///     println!("Missing data from {} to {}", start, end);
1281    /// }
1282    /// # Ok::<(), anyhow::Error>(())
1283    /// ```
1284    pub fn get_missing_intervals_for_request(
1285        &self,
1286        start: u64,
1287        end: u64,
1288        data_cls: &str,
1289        instrument_id: Option<String>,
1290    ) -> anyhow::Result<Vec<(u64, u64)>> {
1291        let intervals = self.get_intervals(data_cls, instrument_id)?;
1292
1293        Ok(query_interval_diff(start, end, &intervals))
1294    }
1295
1296    /// Gets the last (most recent) timestamp for a specific data type and instrument ID.
1297    ///
1298    /// This method finds the latest timestamp covered by existing data files for
1299    /// the specified data type and instrument. This is useful for determining
1300    /// the most recent data available or for incremental data updates.
1301    ///
1302    /// # Parameters
1303    ///
1304    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1305    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1306    ///
1307    /// # Returns
1308    ///
1309    /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
1310    /// or an error if the operation fails.
1311    ///
1312    /// # Errors
1313    ///
1314    /// Returns an error if:
1315    /// - The directory path cannot be constructed.
1316    /// - Interval retrieval fails.
1317    ///
1318    /// # Examples
1319    ///
1320    /// ```rust,no_run
1321    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1322    ///
1323    /// let catalog = ParquetDataCatalog::new(/* ... */);
1324    ///
1325    /// // Get the last timestamp for quote data
1326    /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
1327    ///     println!("Last quote timestamp: {}", last_ts);
1328    /// } else {
1329    ///     println!("No quote data found");
1330    /// }
1331    /// # Ok::<(), anyhow::Error>(())
1332    /// ```
1333    pub fn query_last_timestamp(
1334        &self,
1335        data_cls: &str,
1336        instrument_id: Option<String>,
1337    ) -> anyhow::Result<Option<u64>> {
1338        let intervals = self.get_intervals(data_cls, instrument_id)?;
1339
1340        if intervals.is_empty() {
1341            return Ok(None);
1342        }
1343
1344        Ok(Some(intervals.last().unwrap().1))
1345    }
1346
1347    /// Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
1348    ///
1349    /// This method returns all time intervals covered by existing data files for the
1350    /// specified data type and instrument. The intervals are sorted by start time and
1351    /// represent the complete data coverage available.
1352    ///
1353    /// # Parameters
1354    ///
1355    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1356    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1357    ///
1358    /// # Returns
1359    ///
1360    /// Returns a vector of (start, end) tuples representing the covered intervals,
1361    /// sorted by start time, or an error if the operation fails.
1362    ///
1363    /// # Errors
1364    ///
1365    /// Returns an error if:
1366    /// - The directory path cannot be constructed.
1367    /// - Directory listing fails.
1368    /// - Filename parsing fails.
1369    ///
1370    /// # Examples
1371    ///
1372    /// ```rust,no_run
1373    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1374    ///
1375    /// let catalog = ParquetDataCatalog::new(/* ... */);
1376    ///
1377    /// // Get all intervals for quote data
1378    /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
1379    /// for (start, end) in intervals {
1380    ///     println!("Data available from {} to {}", start, end);
1381    /// }
1382    /// # Ok::<(), anyhow::Error>(())
1383    /// ```
1384    pub fn get_intervals(
1385        &self,
1386        data_cls: &str,
1387        instrument_id: Option<String>,
1388    ) -> anyhow::Result<Vec<(u64, u64)>> {
1389        let directory = self.make_path(data_cls, instrument_id)?;
1390
1391        self.get_directory_intervals(&directory)
1392    }
1393
1394    /// Gets the time intervals covered by Parquet files in a specific directory.
1395    ///
1396    /// This method scans a directory for Parquet files and extracts the timestamp ranges
1397    /// from their filenames. It's used internally by other methods to determine data coverage
1398    /// and is essential for interval-based operations like gap detection and consolidation.
1399    ///
1400    /// # Parameters
1401    ///
1402    /// - `directory`: The directory path to scan for Parquet files.
1403    ///
1404    /// # Returns
1405    ///
1406    /// Returns a vector of (start, end) tuples representing the time intervals covered
1407    /// by files in the directory, sorted by start timestamp. Returns an empty vector
1408    /// if the directory doesn't exist or contains no valid Parquet files.
1409    ///
1410    /// # Errors
1411    ///
1412    /// Returns an error if:
1413    /// - Object store listing operations fail.
1414    /// - Directory access is denied.
1415    ///
1416    /// # Notes
1417    ///
1418    /// - Only files with valid timestamp-based filenames are included.
1419    /// - Files with unparsable names are silently ignored.
1420    /// - The method works with both local and remote object stores.
1421    /// - Results are automatically sorted by start timestamp.
1422    ///
1423    /// # Examples
1424    ///
1425    /// ```rust,no_run
1426    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1427    ///
1428    /// let catalog = ParquetDataCatalog::new(/* ... */);
1429    /// let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
1430    ///
1431    /// for (start, end) in intervals {
1432    ///     println!("File covers {} to {}", start, end);
1433    /// }
1434    /// # Ok::<(), anyhow::Error>(())
1435    /// ```
1436    pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
1437        let mut intervals = Vec::new();
1438
1439        // Use object store for all operations
1440        let list_result = self.execute_async(async {
1441            let path = object_store::path::Path::from(directory);
1442            Ok(self
1443                .object_store
1444                .list(Some(&path))
1445                .collect::<Vec<_>>()
1446                .await)
1447        })?;
1448
1449        for result in list_result {
1450            match result {
1451                Ok(object) => {
1452                    let path_str = object.location.to_string();
1453                    if path_str.ends_with(".parquet")
1454                        && let Some(interval) = parse_filename_timestamps(&path_str)
1455                    {
1456                        intervals.push(interval);
1457                    }
1458                }
1459                Err(_) => {
1460                    // Directory doesn't exist or is empty, which is fine
1461                    break;
1462                }
1463            }
1464        }
1465
1466        intervals.sort_by_key(|&(start, _)| start);
1467
1468        Ok(intervals)
1469    }
1470
1471    /// Constructs a directory path for storing data of a specific type and instrument.
1472    ///
1473    /// This method builds the hierarchical directory structure used by the catalog to organize
1474    /// data by type and instrument. The path follows the pattern: `{base_path}/data/{type_name}/{instrument_id}`.
1475    /// Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
1476    ///
1477    /// # Parameters
1478    ///
1479    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1480    /// - `instrument_id`: Optional instrument ID. If provided, creates a subdirectory for the instrument.
1481    ///   If `None`, returns the path to the data type directory.
1482    ///
1483    /// # Returns
1484    ///
1485    /// Returns the constructed directory path as a string, or an error if path construction fails.
1486    ///
1487    /// # Errors
1488    ///
1489    /// Returns an error if:
1490    /// - The instrument ID contains invalid characters that cannot be made URI-safe.
1491    /// - Path construction fails due to system limitations.
1492    ///
1493    /// # Path Structure
1494    ///
1495    /// - Without instrument ID: `{base_path}/data/{type_name}`.
1496    /// - With instrument ID: `{base_path}/data/{type_name}/{safe_instrument_id}`.
1497    /// - If `base_path` is empty: `data/{type_name}[/{safe_instrument_id}]`.
1498    ///
1499    /// # Examples
1500    ///
1501    /// ```rust,no_run
1502    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1503    ///
1504    /// let catalog = ParquetDataCatalog::new(/* ... */);
1505    ///
1506    /// // Path for all quote data
1507    /// let quotes_path = catalog.make_path("quotes", None)?;
1508    /// // Returns: "/base/path/data/quotes"
1509    ///
1510    /// // Path for specific instrument quotes
1511    /// let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
1512    /// // Returns: "/base/path/data/quotes/EURUSD" (slash removed)
1513    ///
1514    /// // Path for bar data with complex instrument ID
1515    /// let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
1516    /// // Returns: "/base/path/data/bars/BTCUSD-1H"
1517    /// # Ok::<(), anyhow::Error>(())
1518    /// ```
1519    pub fn make_path(
1520        &self,
1521        type_name: &str,
1522        instrument_id: Option<String>,
1523    ) -> anyhow::Result<String> {
1524        let mut components = vec!["data".to_string(), type_name.to_string()];
1525
1526        if let Some(id) = instrument_id {
1527            let safe_id = urisafe_instrument_id(&id);
1528            components.push(safe_id);
1529        }
1530
1531        let path = make_object_store_path_owned(&self.base_path, components);
1532        Ok(path)
1533    }
1534
1535    /// Helper method to rename a parquet file by moving it via object store operations
1536    fn rename_parquet_file(
1537        &self,
1538        directory: &str,
1539        old_start: u64,
1540        old_end: u64,
1541        new_start: u64,
1542        new_end: u64,
1543    ) -> anyhow::Result<()> {
1544        let old_filename =
1545            timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
1546        let old_path = format!("{directory}/{old_filename}");
1547        let old_object_path = self.to_object_path(&old_path);
1548
1549        let new_filename =
1550            timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
1551        let new_path = format!("{directory}/{new_filename}");
1552        let new_object_path = self.to_object_path(&new_path);
1553
1554        self.move_file(&old_object_path, &new_object_path)
1555    }
1556
1557    /// Converts a catalog path string to an [`ObjectPath`] for object store operations.
1558    ///
1559    /// This method handles the conversion between catalog-relative paths and object store paths,
1560    /// taking into account the catalog's base path configuration. It automatically strips the
1561    /// base path prefix when present to create the correct object store path.
1562    ///
1563    /// # Parameters
1564    ///
1565    /// - `path`: The catalog path string to convert. Can be absolute or relative.
1566    ///
1567    /// # Returns
1568    ///
1569    /// Returns an [`ObjectPath`] suitable for use with object store operations.
1570    ///
1571    /// # Path Handling
1572    ///
1573    /// - If `base_path` is empty, the path is used as-is.
1574    /// - If `base_path` is set, it's stripped from the path if present.
1575    /// - Trailing slashes and backslashes are automatically handled.
1576    /// - The resulting path is relative to the object store root.
1577    /// - All paths are normalized to use forward slashes (object store convention).
1578    ///
1579    /// # Examples
1580    ///
1581    /// ```rust,no_run
1582    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1583    ///
1584    /// let catalog = ParquetDataCatalog::new(/* ... */);
1585    ///
1586    /// // Convert a full catalog path
1587    /// let object_path = catalog.to_object_path("/base/data/quotes/file.parquet");
1588    /// // Returns: ObjectPath("data/quotes/file.parquet") if base_path is "/base"
1589    ///
1590    /// // Convert a relative path
1591    /// let object_path = catalog.to_object_path("data/trades/file.parquet");
1592    /// // Returns: ObjectPath("data/trades/file.parquet")
1593    /// ```
1594    #[must_use]
1595    pub fn to_object_path(&self, path: &str) -> ObjectPath {
1596        // Normalize path separators to forward slashes for object store
1597        let normalized_path = path.replace('\\', "/");
1598
1599        if self.base_path.is_empty() {
1600            return ObjectPath::from(normalized_path);
1601        }
1602
1603        // Normalize base path separators as well
1604        let normalized_base = self.base_path.replace('\\', "/");
1605        let base = normalized_base.trim_end_matches('/');
1606
1607        // Remove the catalog base prefix if present
1608        let without_base = normalized_path
1609            .strip_prefix(&format!("{base}/"))
1610            .or_else(|| normalized_path.strip_prefix(base))
1611            .unwrap_or(&normalized_path);
1612
1613        ObjectPath::from(without_base)
1614    }
1615
1616    /// Helper method to move a file using object store rename operation
1617    pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
1618        self.execute_async(async {
1619            self.object_store
1620                .rename(old_path, new_path)
1621                .await
1622                .map_err(anyhow::Error::from)
1623        })
1624    }
1625
1626    /// Helper method to execute async operations with a runtime
1627    pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
1628    where
1629        F: std::future::Future<Output = anyhow::Result<R>>,
1630    {
1631        let rt = get_runtime();
1632        rt.block_on(future)
1633    }
1634}
1635
1636/// Trait for providing catalog path prefixes for different data types.
1637///
1638/// This trait enables type-safe organization of data within the catalog by providing
1639/// a standardized way to determine the directory structure for each data type.
1640/// Each data type maps to a specific subdirectory within the catalog's data folder.
1641///
1642/// # Implementation
1643///
1644/// Types implementing this trait should return a static string that represents
1645/// the directory name where data of that type should be stored.
1646///
1647/// # Examples
1648///
1649/// ```rust
1650/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
1651/// use nautilus_model::data::QuoteTick;
1652///
1653/// assert_eq!(QuoteTick::path_prefix(), "quotes");
1654/// ```
1655pub trait CatalogPathPrefix {
1656    /// Returns the path prefix (directory name) for this data type.
1657    ///
1658    /// # Returns
1659    ///
1660    /// A static string representing the directory name where this data type is stored.
1661    fn path_prefix() -> &'static str;
1662}
1663
1664/// Macro for implementing [`CatalogPathPrefix`] for data types.
1665///
1666/// This macro provides a convenient way to implement the trait for multiple types
1667/// with their corresponding path prefixes.
1668///
1669/// # Parameters
1670///
1671/// - `$type`: The data type to implement the trait for.
1672/// - `$path`: The path prefix string for that type.
1673macro_rules! impl_catalog_path_prefix {
1674    ($type:ty, $path:expr) => {
1675        impl CatalogPathPrefix for $type {
1676            fn path_prefix() -> &'static str {
1677                $path
1678            }
1679        }
1680    };
1681}
1682
1683// Standard implementations for financial data types
1684impl_catalog_path_prefix!(QuoteTick, "quotes");
1685impl_catalog_path_prefix!(TradeTick, "trades");
1686impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
1687impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
1688impl_catalog_path_prefix!(Bar, "bars");
1689impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
1690impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
1691impl_catalog_path_prefix!(FundingRateUpdate, "funding_rates");
1692impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
1693
1694/// Converts timestamps to a filename using ISO 8601 format.
1695///
1696/// This function converts two Unix nanosecond timestamps to a filename that uses
1697/// ISO 8601 format with filesystem-safe characters. The format matches the Python
1698/// implementation for consistency.
1699///
1700/// # Parameters
1701///
1702/// - `timestamp_1`: First timestamp in Unix nanoseconds.
1703/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
1704///
1705/// # Returns
1706///
1707/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
1708///
1709/// # Examples
1710///
1711/// ```rust
1712/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
1713/// # use nautilus_core::UnixNanos;
1714/// let filename = timestamps_to_filename(
1715///     UnixNanos::from(1609459200000000000),
1716///     UnixNanos::from(1609545600000000000)
1717/// );
1718/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
1719/// ```
1720#[must_use]
1721pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
1722    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
1723    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
1724
1725    format!("{datetime_1}_{datetime_2}.parquet")
1726}
1727
1728/// Converts an ISO 8601 timestamp to a filesystem-safe format.
1729///
1730/// This function replaces colons and dots with hyphens to make the timestamp
1731/// safe for use in filenames across different filesystems.
1732///
1733/// # Parameters
1734///
1735/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1736///
1737/// # Returns
1738///
1739/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1740///
1741/// # Examples
1742///
1743/// ```rust
1744/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
1745/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
1746/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
1747/// ```
1748fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
1749    iso_timestamp.replace([':', '.'], "-")
1750}
1751
1752/// Converts a filesystem-safe timestamp back to ISO 8601 format.
1753///
1754/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
1755/// converting filesystem-safe timestamps back to standard ISO 8601 format.
1756///
1757/// # Parameters
1758///
1759/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1760///
1761/// # Returns
1762///
1763/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1764///
1765/// # Examples
1766///
1767/// ```rust
1768/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
1769/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
1770/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
1771/// ```
1772fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
1773    let (date_part, time_part) = file_timestamp
1774        .split_once('T')
1775        .unwrap_or((file_timestamp, ""));
1776    let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
1777
1778    // Find the last hyphen to separate nanoseconds
1779    if let Some(last_hyphen_idx) = time_part.rfind('-') {
1780        let time_with_dot_for_nanos = format!(
1781            "{}.{}",
1782            &time_part[..last_hyphen_idx],
1783            &time_part[last_hyphen_idx + 1..]
1784        );
1785        let final_time_part = time_with_dot_for_nanos.replace('-', ":");
1786        format!("{date_part}T{final_time_part}Z")
1787    } else {
1788        // Fallback if no nanoseconds part found
1789        let final_time_part = time_part.replace('-', ":");
1790        format!("{date_part}T{final_time_part}Z")
1791    }
1792}
1793
1794/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
1795///
1796/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
1797/// It's used to convert parsed timestamps back to the internal representation.
1798///
1799/// # Parameters
1800///
1801/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1802///
1803/// # Returns
1804///
1805/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
1806///
1807/// # Examples
1808///
1809/// ```rust
1810/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
1811/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
1812/// assert_eq!(nanos, 1609459200000000000);
1813/// ```
1814fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
1815    Ok(iso8601_to_unix_nanos(iso_timestamp.to_string())?.into())
1816}
1817
1818/// Converts an instrument ID to a URI-safe format by removing forward slashes.
1819///
1820/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
1821/// suitable for use in file paths. This function removes these characters to
1822/// create a safe directory name.
1823///
1824/// # Parameters
1825///
1826/// - `instrument_id`: The original instrument ID string.
1827///
1828/// # Returns
1829///
1830/// A URI-safe version of the instrument ID with forward slashes removed.
1831///
1832/// # Examples
1833///
1834/// ```rust
1835/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
1836/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
1837/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
1838/// ```
1839fn urisafe_instrument_id(instrument_id: &str) -> String {
1840    instrument_id.replace('/', "")
1841}
1842
1843/// Extracts the identifier from a file path.
1844///
1845/// The identifier is typically the second-to-last path component (directory name).
1846/// For example, from "`data/quote_tick/EURUSD/file.parquet`", extracts "EURUSD".
1847#[must_use]
1848pub fn extract_identifier_from_path(file_path: &str) -> String {
1849    let path_parts: Vec<&str> = file_path.split('/').collect();
1850    if path_parts.len() >= 2 {
1851        path_parts[path_parts.len() - 2].to_string()
1852    } else {
1853        "unknown".to_string()
1854    }
1855}
1856
1857/// Makes an identifier safe for use in SQL table names.
1858///
1859/// Removes forward slashes, replaces dots, hyphens, and spaces with underscores, and converts to lowercase.
1860#[must_use]
1861pub fn make_sql_safe_identifier(identifier: &str) -> String {
1862    urisafe_instrument_id(identifier)
1863        .replace(['.', '-', ' ', '%'], "_")
1864        .to_lowercase()
1865}
1866
1867/// Extracts the filename from a file path and makes it SQL-safe.
1868///
1869/// For example, from "data/quote_tick/EURUSD/2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
1870/// extracts "`2021_01_01t00_00_00_000000000z_2021_01_02t00_00_00_000000000z`".
1871#[must_use]
1872pub fn extract_sql_safe_filename(file_path: &str) -> String {
1873    if file_path.is_empty() {
1874        return "unknown_file".to_string();
1875    }
1876
1877    let filename = file_path.split('/').next_back().unwrap_or("unknown_file");
1878
1879    // Remove .parquet extension
1880    let name_without_ext = if let Some(dot_pos) = filename.rfind(".parquet") {
1881        &filename[..dot_pos]
1882    } else {
1883        filename
1884    };
1885
1886    // Remove characters that can pose problems: hyphens, colons, etc.
1887    name_without_ext
1888        .replace(['-', ':', '.'], "_")
1889        .to_lowercase()
1890}
1891
1892/// Creates a platform-appropriate local path using `PathBuf`.
1893///
1894/// This function constructs file system paths using the platform's native path separators.
1895/// Use this for local file operations that need to work with the actual file system.
1896///
1897/// # Arguments
1898///
1899/// * `base_path` - The base directory path
1900/// * `components` - Path components to join
1901///
1902/// # Returns
1903///
1904/// A `PathBuf` with platform-appropriate separators
1905///
1906/// # Examples
1907///
1908/// ```rust
1909/// # use nautilus_persistence::backend::catalog::make_local_path;
1910/// let path = make_local_path("/base", &["data", "quotes", "EURUSD"]);
1911/// // On Unix: "/base/data/quotes/EURUSD"
1912/// // On Windows: "\base\data\quotes\EURUSD"
1913/// ```
1914pub fn make_local_path<P: AsRef<Path>>(base_path: P, components: &[&str]) -> PathBuf {
1915    let mut path = PathBuf::from(base_path.as_ref());
1916    for component in components {
1917        path.push(component);
1918    }
1919    path
1920}
1921
1922/// Creates an object store path using forward slashes.
1923///
1924/// Object stores (S3, GCS, etc.) always expect forward slashes regardless of platform.
1925/// Use this when creating paths for object store operations.
1926///
1927/// # Arguments
1928///
1929/// * `base_path` - The base path (can be empty)
1930/// * `components` - Path components to join
1931///
1932/// # Returns
1933///
1934/// A string path with forward slash separators
1935///
1936/// # Examples
1937///
1938/// ```rust
1939/// # use nautilus_persistence::backend::catalog::make_object_store_path;
1940/// let path = make_object_store_path("base", &["data", "quotes", "EURUSD"]);
1941/// assert_eq!(path, "base/data/quotes/EURUSD");
1942/// ```
1943#[must_use]
1944pub fn make_object_store_path(base_path: &str, components: &[&str]) -> String {
1945    let mut parts = Vec::new();
1946
1947    if !base_path.is_empty() {
1948        let normalized_base = base_path
1949            .replace('\\', "/")
1950            .trim_end_matches('/')
1951            .to_string();
1952        if !normalized_base.is_empty() {
1953            parts.push(normalized_base);
1954        }
1955    }
1956
1957    for component in components {
1958        let normalized_component = component
1959            .replace('\\', "/")
1960            .trim_start_matches('/')
1961            .trim_end_matches('/')
1962            .to_string();
1963        if !normalized_component.is_empty() {
1964            parts.push(normalized_component);
1965        }
1966    }
1967
1968    parts.join("/")
1969}
1970
1971/// Creates an object store path using forward slashes with owned strings.
1972///
1973/// This variant accepts owned strings to avoid lifetime issues.
1974///
1975/// # Arguments
1976///
1977/// * `base_path` - The base path (can be empty)
1978/// * `components` - Path components to join (owned strings)
1979///
1980/// # Returns
1981///
1982/// A string path with forward slash separators
1983#[must_use]
1984pub fn make_object_store_path_owned(base_path: &str, components: Vec<String>) -> String {
1985    let mut parts = Vec::new();
1986
1987    if !base_path.is_empty() {
1988        let normalized_base = base_path
1989            .replace('\\', "/")
1990            .trim_end_matches('/')
1991            .to_string();
1992        if !normalized_base.is_empty() {
1993            parts.push(normalized_base);
1994        }
1995    }
1996
1997    for component in components {
1998        let normalized_component = component
1999            .replace('\\', "/")
2000            .trim_start_matches('/')
2001            .trim_end_matches('/')
2002            .to_string();
2003        if !normalized_component.is_empty() {
2004            parts.push(normalized_component);
2005        }
2006    }
2007
2008    parts.join("/")
2009}
2010
2011/// Converts a local `PathBuf` to an object store path string.
2012///
2013/// This function normalizes a local file system path to the forward-slash format
2014/// expected by object stores, handling platform differences.
2015///
2016/// # Arguments
2017///
2018/// * `local_path` - The local `PathBuf` to convert
2019///
2020/// # Returns
2021///
2022/// A string with forward slash separators suitable for object store operations
2023///
2024/// # Examples
2025///
2026/// ```rust
2027/// # use std::path::PathBuf;
2028/// # use nautilus_persistence::backend::catalog::local_to_object_store_path;
2029/// let local_path = PathBuf::from("data").join("quotes").join("EURUSD");
2030/// let object_path = local_to_object_store_path(&local_path);
2031/// assert_eq!(object_path, "data/quotes/EURUSD");
2032/// ```
2033#[must_use]
2034pub fn local_to_object_store_path(local_path: &Path) -> String {
2035    local_path.to_string_lossy().replace('\\', "/")
2036}
2037
2038/// Extracts path components using platform-appropriate path parsing.
2039///
2040/// This function safely parses a path into its components, handling both
2041/// local file system paths and object store paths correctly.
2042///
2043/// # Arguments
2044///
2045/// * `path_str` - The path string to parse
2046///
2047/// # Returns
2048///
2049/// A vector of path components
2050///
2051/// # Examples
2052///
2053/// ```rust
2054/// # use nautilus_persistence::backend::catalog::extract_path_components;
2055/// let components = extract_path_components("data/quotes/EURUSD");
2056/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2057///
2058/// // Works with both separators
2059/// let components = extract_path_components("data\\quotes\\EURUSD");
2060/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2061/// ```
2062#[must_use]
2063pub fn extract_path_components(path_str: &str) -> Vec<String> {
2064    // Normalize separators and split
2065    let normalized = path_str.replace('\\', "/");
2066    normalized
2067        .split('/')
2068        .filter(|s| !s.is_empty())
2069        .map(ToString::to_string)
2070        .collect()
2071}
2072
2073/// Checks if a filename's timestamp range intersects with a query interval.
2074///
2075/// This function determines whether a Parquet file (identified by its timestamp-based
2076/// filename) contains data that falls within the specified query time range.
2077///
2078/// # Parameters
2079///
2080/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
2081/// - `start`: Optional start timestamp for the query range.
2082/// - `end`: Optional end timestamp for the query range.
2083///
2084/// # Returns
2085///
2086/// Returns `true` if the file's time range intersects with the query range,
2087/// `false` otherwise. Returns `true` if the filename cannot be parsed.
2088///
2089/// # Examples
2090///
2091/// ```rust
2092/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
2093/// // Example with ISO format filenames
2094/// assert!(query_intersects_filename(
2095///     "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
2096///     Some(1609459200000000000),
2097///     Some(1609545600000000000)
2098/// ));
2099/// ```
2100fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
2101    if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
2102        (start.is_none() || start.unwrap() <= file_end)
2103            && (end.is_none() || file_start <= end.unwrap())
2104    } else {
2105        true
2106    }
2107}
2108
2109/// Parses timestamps from a Parquet filename.
2110///
2111/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
2112/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
2113///
2114/// # Parameters
2115///
2116/// - `filename`: The filename to parse (can be a full path).
2117///
2118/// # Returns
2119///
2120/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
2121/// `None` otherwise.
2122///
2123/// # Examples
2124///
2125/// ```rust
2126/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
2127/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
2128/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
2129/// ```
2130#[must_use]
2131pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
2132    let path = Path::new(filename);
2133    let base_name = path.file_name()?.to_str()?;
2134    let base_filename = base_name.strip_suffix(".parquet")?;
2135    let (first_part, second_part) = base_filename.split_once('_')?;
2136
2137    let first_iso = file_timestamp_to_iso_timestamp(first_part);
2138    let second_iso = file_timestamp_to_iso_timestamp(second_part);
2139
2140    let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
2141    let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
2142
2143    Some((first_ts, second_ts))
2144}
2145
2146/// Checks if a list of closed integer intervals are all mutually disjoint.
2147///
2148/// Two intervals are disjoint if they do not overlap. This function validates that
2149/// all intervals in the list are non-overlapping, which is a requirement for
2150/// maintaining data integrity in the catalog.
2151///
2152/// # Parameters
2153///
2154/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2155///
2156/// # Returns
2157///
2158/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
2159/// Returns `true` for empty lists or lists with a single interval.
2160///
2161/// # Examples
2162///
2163/// ```rust
2164/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
2165/// // Disjoint intervals
2166/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
2167///
2168/// // Overlapping intervals
2169/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
2170/// ```
2171#[must_use]
2172pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
2173    let n = intervals.len();
2174
2175    if n <= 1 {
2176        return true;
2177    }
2178
2179    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2180    sorted_intervals.sort_by_key(|&(start, _)| start);
2181
2182    for i in 0..(n - 1) {
2183        let (_, end1) = sorted_intervals[i];
2184        let (start2, _) = sorted_intervals[i + 1];
2185
2186        if end1 >= start2 {
2187            return false;
2188        }
2189    }
2190
2191    true
2192}
2193
2194/// Checks if intervals are contiguous (adjacent with no gaps).
2195///
2196/// Intervals are contiguous if, when sorted by start time, each interval's start
2197/// timestamp is exactly one more than the previous interval's end timestamp.
2198/// This ensures complete coverage of a time range with no gaps.
2199///
2200/// # Parameters
2201///
2202/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2203///
2204/// # Returns
2205///
2206/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
2207/// Returns `true` for empty lists or lists with a single interval.
2208///
2209/// # Examples
2210///
2211/// ```rust
2212/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
2213/// // Contiguous intervals
2214/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
2215///
2216/// // Non-contiguous intervals (gap between 5 and 8)
2217/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
2218/// ```
2219#[must_use]
2220pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
2221    let n = intervals.len();
2222    if n <= 1 {
2223        return true;
2224    }
2225
2226    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2227    sorted_intervals.sort_by_key(|&(start, _)| start);
2228
2229    for i in 0..(n - 1) {
2230        let (_, end1) = sorted_intervals[i];
2231        let (start2, _) = sorted_intervals[i + 1];
2232
2233        if end1 + 1 != start2 {
2234            return false;
2235        }
2236    }
2237
2238    true
2239}
2240
2241/// Finds the parts of a query interval that are not covered by existing data intervals.
2242///
2243/// This function calculates the "gaps" in data coverage by comparing a requested
2244/// time range against the intervals covered by existing data files. It's used to
2245/// determine what data needs to be fetched or backfilled.
2246///
2247/// # Parameters
2248///
2249/// - `start`: Start timestamp of the query interval (inclusive).
2250/// - `end`: End timestamp of the query interval (inclusive).
2251/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
2252///
2253/// # Returns
2254///
2255/// Returns a vector of (start, end) tuples representing the gaps in coverage.
2256/// Returns an empty vector if the query range is invalid or fully covered.
2257///
2258/// # Examples
2259///
2260/// ```rust
2261/// # use nautilus_persistence::backend::catalog::query_interval_diff;
2262/// // Query 1-100, have data for 10-30 and 60-80
2263/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
2264/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
2265/// ```
2266fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
2267    if start > end {
2268        return Vec::new();
2269    }
2270
2271    let interval_set = get_interval_set(closed_intervals);
2272    let query_range = (Bound::Included(start), Bound::Included(end));
2273    let query_diff = interval_set.get_interval_difference(&query_range);
2274    let mut result: Vec<(u64, u64)> = Vec::new();
2275
2276    for interval in query_diff {
2277        if let Some(tuple) = interval_to_tuple(interval, start, end) {
2278            result.push(tuple);
2279        }
2280    }
2281
2282    result
2283}
2284
2285/// Creates an interval tree from closed integer intervals.
2286///
2287/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
2288/// for use with the interval tree data structure, which is used for efficient
2289/// interval operations and gap detection.
2290///
2291/// # Parameters
2292///
2293/// - `intervals`: A slice of closed intervals as (start, end) tuples.
2294///
2295/// # Returns
2296///
2297/// Returns an [`IntervalTree`] containing the converted intervals.
2298///
2299/// # Notes
2300///
2301/// - Invalid intervals (where start > end) are skipped.
2302/// - Uses saturating addition to prevent overflow when converting to half-open intervals.
2303fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
2304    let mut tree = IntervalTree::default();
2305
2306    if intervals.is_empty() {
2307        return tree;
2308    }
2309
2310    for &(start, end) in intervals {
2311        if start > end {
2312            continue;
2313        }
2314
2315        tree.insert((
2316            Bound::Included(start),
2317            Bound::Excluded(end.saturating_add(1)),
2318        ));
2319    }
2320
2321    tree
2322}
2323
2324/// Converts an interval tree result back to a closed interval tuple.
2325///
2326/// This helper function converts the bounded interval representation used by
2327/// the interval tree back into the (start, end) tuple format used throughout
2328/// the catalog.
2329///
2330/// # Parameters
2331///
2332/// - `interval`: The bounded interval from the interval tree.
2333/// - `query_start`: The start of the original query range.
2334/// - `query_end`: The end of the original query range.
2335///
2336/// # Returns
2337///
2338/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
2339fn interval_to_tuple(
2340    interval: (Bound<&u64>, Bound<&u64>),
2341    query_start: u64,
2342    query_end: u64,
2343) -> Option<(u64, u64)> {
2344    let (bound_start, bound_end) = interval;
2345
2346    let start = match bound_start {
2347        Bound::Included(val) => *val,
2348        Bound::Excluded(val) => val.saturating_add(1),
2349        Bound::Unbounded => query_start,
2350    };
2351
2352    let end = match bound_end {
2353        Bound::Included(val) => *val,
2354        Bound::Excluded(val) => {
2355            if *val == 0 {
2356                return None; // Empty interval
2357            }
2358            val - 1
2359        }
2360        Bound::Unbounded => query_end,
2361    };
2362
2363    if start <= end {
2364        Some((start, end))
2365    } else {
2366        None
2367    }
2368}