nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a comprehensive data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores.
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.).
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying.
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance.
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation.
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │   └── INSTRUMENT_ID/
37//! │       └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │   └── INSTRUMENT_ID/
40//! │       └── start_ts-end_ts.parquet
41//! └── bars/
42//!     └── INSTRUMENT_ID/
43//!         └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//!     PathBuf::from("/path/to/data"),
55//!     None,        // storage_options
56//!     Some(5000),  // batch_size
57//!     None,        // compression (defaults to SNAPPY)
58//!     None,        // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66    fmt::Debug,
67    ops::Bound,
68    path::{Path, PathBuf},
69    sync::Arc,
70};
71
72use datafusion::arrow::record_batch::RecordBatch;
73use futures::StreamExt;
74use heck::ToSnakeCase;
75use itertools::Itertools;
76use nautilus_core::{
77    UnixNanos,
78    datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
79};
80use nautilus_model::data::{
81    Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
82    QuoteTick, TradeTick, close::InstrumentClose, to_variant,
83};
84use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
85use object_store::{ObjectStore, path::Path as ObjectPath};
86use serde::Serialize;
87use unbounded_interval_tree::interval_tree::IntervalTree;
88
89use super::session::{self, DataBackendSession, QueryResult, build_query};
90use crate::parquet::write_batches_to_object_store;
91
92/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
93///
94/// The `ParquetDataCatalog` provides a comprehensive solution for managing large volumes of financial
95/// market data with efficient storage, querying, and consolidation capabilities. It supports various
96/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
97///
98/// # Features
99///
100/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression.
101/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate.
102/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance.
103/// - **Data Validation**: Ensures timestamp ordering and interval consistency.
104/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed.
105/// - **Type Safety**: Strongly typed data handling with compile-time guarantees.
106///
107/// # Data Organization
108///
109/// Data is organized hierarchically by data type and instrument:
110/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`.
111/// - Files are named with their timestamp ranges for efficient range queries.
112/// - Intervals are validated to be disjoint to prevent data overlap.
113///
114/// # Performance Considerations
115///
116/// - **Batch Size**: Controls memory usage during data processing.
117/// - **Compression**: SNAPPY compression provides good balance of speed and size.
118/// - **Row Group Size**: Affects query performance and memory usage.
119/// - **File Consolidation**: Reduces the number of files for better query performance.
120pub struct ParquetDataCatalog {
121    /// The base path for data storage within the object store.
122    pub base_path: String,
123    /// The original URI provided when creating the catalog.
124    pub original_uri: String,
125    /// The object store backend for data persistence.
126    pub object_store: Arc<dyn ObjectStore>,
127    /// The DataFusion session for query execution.
128    pub session: DataBackendSession,
129    /// The number of records to process in each batch.
130    pub batch_size: usize,
131    /// The compression algorithm used for Parquet files.
132    pub compression: parquet::basic::Compression,
133    /// The maximum number of rows in each Parquet row group.
134    pub max_row_group_size: usize,
135}
136
137impl Debug for ParquetDataCatalog {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct(stringify!(ParquetDataCatalog))
140            .field("base_path", &self.base_path)
141            .finish()
142    }
143}
144
145impl ParquetDataCatalog {
146    /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
147    ///
148    /// This is a convenience constructor that converts a local path to a URI format
149    /// and delegates to [`Self::from_uri`].
150    ///
151    /// # Parameters
152    ///
153    /// - `base_path`: The base directory path for data storage.
154    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
155    /// - `batch_size`: Number of records to process in each batch (default: 5000).
156    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
157    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
158    ///
159    /// # Panics
160    ///
161    /// Panics if the path cannot be converted to a valid URI or if the object store
162    /// cannot be created from the path.
163    ///
164    /// # Examples
165    ///
166    /// ```rust,no_run
167    /// use std::path::PathBuf;
168    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
169    ///
170    /// let catalog = ParquetDataCatalog::new(
171    ///     PathBuf::from("/tmp/nautilus_data"),
172    ///     None,        // no storage options
173    ///     Some(1000),  // smaller batch size
174    ///     None,        // default compression
175    ///     None,        // default row group size
176    /// );
177    /// ```
178    #[must_use]
179    pub fn new(
180        base_path: PathBuf,
181        storage_options: Option<std::collections::HashMap<String, String>>,
182        batch_size: Option<usize>,
183        compression: Option<parquet::basic::Compression>,
184        max_row_group_size: Option<usize>,
185    ) -> Self {
186        let path_str = base_path.to_string_lossy().to_string();
187        Self::from_uri(
188            &path_str,
189            storage_options,
190            batch_size,
191            compression,
192            max_row_group_size,
193        )
194        .expect("Failed to create catalog from path")
195    }
196
197    /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
198    ///
199    /// Supports various URI schemes including local file paths and multiple cloud storage backends
200    /// supported by the `object_store` crate.
201    ///
202    /// # Supported URI Schemes
203    ///
204    /// - **AWS S3**: `s3://bucket/path`.
205    /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`.
206    /// - **Azure Blob Storage**: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`.
207    /// - **HTTP/WebDAV**: `http://` or `https://`.
208    /// - **Local files**: `file://path` or plain paths.
209    ///
210    /// # Parameters
211    ///
212    /// - `uri`: The URI for the data storage location.
213    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
214    ///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
215    ///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
216    ///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
217    /// - `batch_size`: Number of records to process in each batch (default: 5000).
218    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
219    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if:
224    /// - The URI format is invalid or unsupported.
225    /// - The object store cannot be created or accessed.
226    /// - Authentication fails for cloud storage backends.
227    ///
228    /// # Examples
229    ///
230    /// ```rust,no_run
231    /// use std::collections::HashMap;
232    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
233    ///
234    /// // Local filesystem
235    /// let local_catalog = ParquetDataCatalog::from_uri(
236    ///     "/tmp/nautilus_data",
237    ///     None, None, None, None
238    /// )?;
239    ///
240    /// // S3 bucket
241    /// let s3_catalog = ParquetDataCatalog::from_uri(
242    ///     "s3://my-bucket/nautilus-data",
243    ///     None, None, None, None
244    /// )?;
245    ///
246    /// // Google Cloud Storage
247    /// let gcs_catalog = ParquetDataCatalog::from_uri(
248    ///     "gs://my-bucket/nautilus-data",
249    ///     None, None, None, None
250    /// )?;
251    ///
252    /// // Azure Blob Storage
253    /// let azure_catalog = ParquetDataCatalog::from_uri(
254    ///     "azure://account/container/nautilus-data",
255    ///     None, None, None, None
256    /// )?;
257    ///
258    /// // S3 with custom endpoint and credentials
259    /// let mut storage_options = HashMap::new();
260    /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
261    /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
262    /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
263    ///
264    /// let s3_catalog = ParquetDataCatalog::from_uri(
265    ///     "s3://my-bucket/nautilus-data",
266    ///     Some(storage_options),
267    ///     None, None, None,
268    /// )?;
269    /// # Ok::<(), anyhow::Error>(())
270    /// ```
271    pub fn from_uri(
272        uri: &str,
273        storage_options: Option<std::collections::HashMap<String, String>>,
274        batch_size: Option<usize>,
275        compression: Option<parquet::basic::Compression>,
276        max_row_group_size: Option<usize>,
277    ) -> anyhow::Result<Self> {
278        let batch_size = batch_size.unwrap_or(5000);
279        let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
280        let max_row_group_size = max_row_group_size.unwrap_or(5000);
281
282        let (object_store, base_path, original_uri) =
283            crate::parquet::create_object_store_from_path(uri, storage_options)?;
284
285        Ok(Self {
286            base_path,
287            original_uri,
288            object_store,
289            session: session::DataBackendSession::new(batch_size),
290            batch_size,
291            compression,
292            max_row_group_size,
293        })
294    }
295
296    /// Returns the base path of the catalog for testing purposes.
297    #[must_use]
298    pub fn get_base_path(&self) -> String {
299        self.base_path.clone()
300    }
301
302    /// Resets the backend session to clear any cached table registrations.
303    ///
304    /// This is useful during catalog operations when files are being modified
305    /// and we need to ensure fresh data is loaded.
306    pub fn reset_session(&mut self) {
307        self.session.clear_registered_tables();
308    }
309
310    /// Writes mixed data types to the catalog by separating them into type-specific collections.
311    ///
312    /// This method takes a heterogeneous collection of market data and separates it by type,
313    /// then writes each type to its appropriate location in the catalog. This is useful when
314    /// processing mixed data streams or bulk data imports.
315    ///
316    /// # Parameters
317    ///
318    /// - `data`: A vector of mixed [`Data`] enum variants.
319    /// - `start`: Optional start timestamp to override the data's natural range.
320    /// - `end`: Optional end timestamp to override the data's natural range.
321    ///
322    /// # Notes
323    ///
324    /// - Data is automatically sorted by type before writing.
325    /// - Each data type is written to its own directory structure.
326    /// - Instrument data handling is not yet implemented (TODO).
327    ///
328    /// # Examples
329    ///
330    /// ```rust,no_run
331    /// use nautilus_model::data::Data;
332    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
333    ///
334    /// let catalog = ParquetDataCatalog::new(/* ... */);
335    /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
336    ///
337    /// catalog.write_data_enum(mixed_data, None, None)?;
338    /// ```
339    pub fn write_data_enum(
340        &self,
341        data: Vec<Data>,
342        start: Option<UnixNanos>,
343        end: Option<UnixNanos>,
344    ) -> anyhow::Result<()> {
345        let mut deltas: Vec<OrderBookDelta> = Vec::new();
346        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
347        let mut quotes: Vec<QuoteTick> = Vec::new();
348        let mut trades: Vec<TradeTick> = Vec::new();
349        let mut bars: Vec<Bar> = Vec::new();
350        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
351        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
352        let mut closes: Vec<InstrumentClose> = Vec::new();
353
354        for d in data.iter().cloned() {
355            match d {
356                Data::Deltas(_) => continue,
357                Data::Delta(d) => {
358                    deltas.push(d);
359                }
360                Data::Depth10(d) => {
361                    depth10s.push(*d);
362                }
363                Data::Quote(d) => {
364                    quotes.push(d);
365                }
366                Data::Trade(d) => {
367                    trades.push(d);
368                }
369                Data::Bar(d) => {
370                    bars.push(d);
371                }
372                Data::MarkPriceUpdate(p) => {
373                    mark_prices.push(p);
374                }
375                Data::IndexPriceUpdate(p) => {
376                    index_prices.push(p);
377                }
378                Data::InstrumentClose(c) => {
379                    closes.push(c);
380                }
381            }
382        }
383
384        // TODO: need to handle instruments here
385
386        self.write_to_parquet(deltas, start, end, None)?;
387        self.write_to_parquet(depth10s, start, end, None)?;
388        self.write_to_parquet(quotes, start, end, None)?;
389        self.write_to_parquet(trades, start, end, None)?;
390        self.write_to_parquet(bars, start, end, None)?;
391        self.write_to_parquet(mark_prices, start, end, None)?;
392        self.write_to_parquet(index_prices, start, end, None)?;
393        self.write_to_parquet(closes, start, end, None)?;
394
395        Ok(())
396    }
397
398    /// Writes typed data to a Parquet file in the catalog.
399    ///
400    /// This is the core method for persisting market data to the catalog. It handles data
401    /// validation, batching, compression, and ensures proper file organization with
402    /// timestamp-based naming.
403    ///
404    /// # Type Parameters
405    ///
406    /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
407    ///
408    /// # Parameters
409    ///
410    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
411    /// - `start`: Optional start timestamp to override the natural data range.
412    /// - `end`: Optional end timestamp to override the natural data range.
413    ///
414    /// # Returns
415    ///
416    /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if:
421    /// - Data serialization to Arrow record batches fails.
422    /// - Object store write operations fail.
423    /// - File path construction fails.
424    /// - Timestamp interval validation fails after writing.
425    ///
426    /// # Panics
427    ///
428    /// Panics if:
429    /// - Data timestamps are not in ascending order.
430    /// - Record batches are empty after conversion.
431    /// - Required metadata is missing from the schema.
432    ///
433    /// # Examples
434    ///
435    /// ```rust,no_run
436    /// use nautilus_model::data::QuoteTick;
437    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
438    ///
439    /// let catalog = ParquetDataCatalog::new(/* ... */);
440    /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
441    ///
442    /// let path = catalog.write_to_parquet(quotes, None, None)?;
443    /// println!("Data written to: {:?}", path);
444    /// # Ok::<(), anyhow::Error>(())
445    /// ```
446    pub fn write_to_parquet<T>(
447        &self,
448        data: Vec<T>,
449        start: Option<UnixNanos>,
450        end: Option<UnixNanos>,
451        skip_disjoint_check: Option<bool>,
452    ) -> anyhow::Result<PathBuf>
453    where
454        T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
455    {
456        if data.is_empty() {
457            return Ok(PathBuf::new());
458        }
459
460        let type_name = std::any::type_name::<T>().to_snake_case();
461        Self::check_ascending_timestamps(&data, &type_name)?;
462
463        let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
464        let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
465
466        let batches = self.data_to_record_batches(data)?;
467        let schema = batches.first().expect("Batches are empty.").schema();
468        let instrument_id = schema.metadata.get("instrument_id").cloned();
469
470        let directory = self.make_path(T::path_prefix(), instrument_id)?;
471        let filename = timestamps_to_filename(start_ts, end_ts);
472        let path = PathBuf::from(format!("{directory}/{filename}"));
473
474        // Write all batches to parquet file
475        log::info!(
476            "Writing {} batches of {type_name} data to {path:?}",
477            batches.len()
478        );
479
480        // Convert path to object store path
481        let object_path = self.to_object_path(&path.to_string_lossy());
482
483        self.execute_async(async {
484            write_batches_to_object_store(
485                &batches,
486                self.object_store.clone(),
487                &object_path,
488                Some(self.compression),
489                Some(self.max_row_group_size),
490            )
491            .await
492        })?;
493
494        if !skip_disjoint_check.unwrap_or(false) {
495            let intervals = self.get_directory_intervals(&directory)?;
496
497            if !are_intervals_disjoint(&intervals) {
498                anyhow::bail!("Intervals are not disjoint after writing a new file");
499            }
500        }
501
502        Ok(path)
503    }
504
505    /// Writes typed data to a JSON file in the catalog.
506    ///
507    /// This method provides an alternative to Parquet format for data export and debugging.
508    /// JSON files are human-readable but less efficient for large datasets.
509    ///
510    /// # Type Parameters
511    ///
512    /// - `T`: The data type to write, must implement serialization and cataloging traits.
513    ///
514    /// # Parameters
515    ///
516    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
517    /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
518    /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
519    ///
520    /// # Returns
521    ///
522    /// Returns the [`PathBuf`] of the created JSON file.
523    ///
524    /// # Errors
525    ///
526    /// Returns an error if:
527    /// - JSON serialization fails.
528    /// - Object store write operations fail.
529    /// - File path construction fails.
530    ///
531    /// # Panics
532    ///
533    /// Panics if data timestamps are not in ascending order.
534    ///
535    /// # Examples
536    ///
537    /// ```rust,no_run
538    /// use std::path::PathBuf;
539    /// use nautilus_model::data::TradeTick;
540    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
541    ///
542    /// let catalog = ParquetDataCatalog::new(/* ... */);
543    /// let trades: Vec<TradeTick> = vec![/* trade data */];
544    ///
545    /// let path = catalog.write_to_json(
546    ///     trades,
547    ///     Some(PathBuf::from("/custom/path")),
548    ///     true  // write metadata
549    /// )?;
550    /// # Ok::<(), anyhow::Error>(())
551    /// ```
552    pub fn write_to_json<T>(
553        &self,
554        data: Vec<T>,
555        path: Option<PathBuf>,
556        write_metadata: bool,
557    ) -> anyhow::Result<PathBuf>
558    where
559        T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
560    {
561        if data.is_empty() {
562            return Ok(PathBuf::new());
563        }
564
565        let type_name = std::any::type_name::<T>().to_snake_case();
566        Self::check_ascending_timestamps(&data, &type_name)?;
567
568        let start_ts = data.first().unwrap().ts_init();
569        let end_ts = data.last().unwrap().ts_init();
570
571        let directory =
572            path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
573        let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
574        let json_path = directory.join(&filename);
575
576        log::info!(
577            "Writing {} records of {type_name} data to {json_path:?}",
578            data.len()
579        );
580
581        if write_metadata {
582            let metadata = T::chunk_metadata(&data);
583            let metadata_path = json_path.with_extension("metadata.json");
584            log::info!("Writing metadata to {metadata_path:?}");
585
586            // Use object store for metadata file
587            let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
588            let metadata_json = serde_json::to_vec_pretty(&metadata)?;
589            self.execute_async(async {
590                self.object_store
591                    .put(&metadata_object_path, metadata_json.into())
592                    .await
593                    .map_err(anyhow::Error::from)
594            })?;
595        }
596
597        // Use object store for main JSON file
598        let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
599        let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
600        self.execute_async(async {
601            self.object_store
602                .put(&json_object_path, json_data.into())
603                .await
604                .map_err(anyhow::Error::from)
605        })?;
606
607        Ok(json_path)
608    }
609
610    /// Validates that data timestamps are in ascending order.
611    ///
612    /// # Parameters
613    ///
614    /// - `data`: Slice of data records to validate.
615    /// - `type_name`: Name of the data type for error messages.
616    ///
617    /// # Panics
618    ///
619    /// Panics if any timestamp is less than the previous timestamp.
620    pub fn check_ascending_timestamps<T: HasTsInit>(
621        data: &[T],
622        type_name: &str,
623    ) -> anyhow::Result<()> {
624        if !data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()) {
625            anyhow::bail!("{type_name} timestamps must be in ascending order");
626        }
627
628        Ok(())
629    }
630
631    /// Converts data into Arrow record batches for Parquet serialization.
632    ///
633    /// This method chunks the data according to the configured batch size and converts
634    /// each chunk into an Arrow record batch with appropriate metadata.
635    ///
636    /// # Type Parameters
637    ///
638    /// - `T`: The data type to convert, must implement required encoding traits.
639    ///
640    /// # Parameters
641    ///
642    /// - `data`: Vector of data records to convert.
643    ///
644    /// # Returns
645    ///
646    /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
647    ///
648    /// # Errors
649    ///
650    /// Returns an error if record batch encoding fails for any chunk.
651    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
652    where
653        T: HasTsInit + EncodeToRecordBatch,
654    {
655        let mut batches = Vec::new();
656
657        for chunk in &data.into_iter().chunks(self.batch_size) {
658            let data = chunk.collect_vec();
659            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
660            let record_batch = T::encode_batch(&metadata, &data)?;
661            batches.push(record_batch);
662        }
663
664        Ok(batches)
665    }
666
667    /// Extends the timestamp range of an existing Parquet file by renaming it.
668    ///
669    /// This method finds an existing file that is adjacent to the specified time range
670    /// and renames it to include the new range. This is useful when appending data
671    /// that extends the time coverage of existing files.
672    ///
673    /// # Parameters
674    ///
675    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
676    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
677    /// - `start`: Start timestamp of the new range to extend to.
678    /// - `end`: End timestamp of the new range to extend to.
679    ///
680    /// # Returns
681    ///
682    /// Returns `Ok(())` on success, or an error if the operation fails.
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if:
687    /// - The directory path cannot be constructed.
688    /// - No adjacent file is found to extend.
689    /// - File rename operations fail.
690    /// - Interval validation fails after extension.
691    ///
692    /// # Examples
693    ///
694    /// ```rust,no_run
695    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
696    /// use nautilus_core::UnixNanos;
697    ///
698    /// let catalog = ParquetDataCatalog::new(/* ... */);
699    ///
700    /// // Extend a file's range backwards or forwards
701    /// catalog.extend_file_name(
702    ///     "quotes",
703    ///     Some("BTCUSD".to_string()),
704    ///     UnixNanos::from(1609459200000000000),
705    ///     UnixNanos::from(1609545600000000000)
706    /// )?;
707    /// # Ok::<(), anyhow::Error>(())
708    /// ```
709    pub fn extend_file_name(
710        &self,
711        data_cls: &str,
712        instrument_id: Option<String>,
713        start: UnixNanos,
714        end: UnixNanos,
715    ) -> anyhow::Result<()> {
716        let directory = self.make_path(data_cls, instrument_id)?;
717        let intervals = self.get_directory_intervals(&directory)?;
718
719        let start = start.as_u64();
720        let end = end.as_u64();
721
722        for interval in intervals {
723            if interval.0 == end + 1 {
724                // Extend backwards: new file covers [start, interval.1]
725                self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
726                break;
727            } else if interval.1 == start - 1 {
728                // Extend forwards: new file covers [interval.0, end]
729                self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
730                break;
731            }
732        }
733
734        let intervals = self.get_directory_intervals(&directory)?;
735
736        if !are_intervals_disjoint(&intervals) {
737            anyhow::bail!("Intervals are not disjoint after extending a file");
738        }
739
740        Ok(())
741    }
742
743    /// Lists all Parquet files in a specified directory.
744    ///
745    /// This method scans a directory and returns the full paths of all files with the `.parquet`
746    /// extension. It works with both local filesystems and remote object stores, making it
747    /// suitable for various storage backends.
748    ///
749    /// # Parameters
750    ///
751    /// - `directory`: The directory path to scan for Parquet files.
752    ///
753    /// # Returns
754    ///
755    /// Returns a vector of full file paths (as strings) for all Parquet files found in the directory.
756    /// The paths are relative to the object store root and suitable for use with object store operations.
757    /// Returns an empty vector if the directory doesn't exist or contains no Parquet files.
758    ///
759    /// # Errors
760    ///
761    /// Returns an error if:
762    /// - Object store listing operations fail.
763    /// - Directory access is denied.
764    /// - Network issues occur (for remote object stores).
765    ///
766    /// # Notes
767    ///
768    /// - Only files ending with `.parquet` are included.
769    /// - Subdirectories are not recursively scanned.
770    /// - File paths are returned in the order provided by the object store.
771    /// - Works with all supported object store backends (local, S3, GCS, Azure, etc.).
772    ///
773    /// # Examples
774    ///
775    /// ```rust,no_run
776    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
777    ///
778    /// let catalog = ParquetDataCatalog::new(/* ... */);
779    /// let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
780    ///
781    /// for file in files {
782    ///     println!("Found Parquet file: {}", file);
783    /// }
784    /// # Ok::<(), anyhow::Error>(())
785    /// ```
786    pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
787        self.execute_async(async {
788            let prefix = ObjectPath::from(format!("{directory}/"));
789            let mut stream = self.object_store.list(Some(&prefix));
790            let mut files = Vec::new();
791
792            while let Some(object) = stream.next().await {
793                let object = object?;
794                if object.location.as_ref().ends_with(".parquet") {
795                    files.push(object.location.to_string());
796                }
797            }
798            Ok::<Vec<String>, anyhow::Error>(files)
799        })
800    }
801
802    /// Helper method to reconstruct full URI for remote object store paths
803    #[must_use]
804    pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
805        // Check if this is a remote URI scheme that needs reconstruction
806        if self.is_remote_uri() {
807            // Extract the base URL (scheme + host) from the original URI
808            if let Ok(url) = url::Url::parse(&self.original_uri)
809                && let Some(host) = url.host_str()
810            {
811                return format!("{}://{}/{}", url.scheme(), host, path_str);
812            }
813        }
814
815        // For local paths, extract the directory from the original URI
816        if self.original_uri.starts_with("file://") {
817            // Extract the path from the file:// URI
818            if let Ok(url) = url::Url::parse(&self.original_uri)
819                && let Ok(base_path) = url.to_file_path()
820            {
821                // Use platform-appropriate path separator for display
822                // but object store paths always use forward slashes
823                let base_str = base_path.to_string_lossy();
824                return self.join_paths(&base_str, path_str);
825            }
826        }
827
828        // For local paths without file:// prefix, use the original URI as base
829        if self.base_path.is_empty() {
830            // If base_path is empty and not a file URI, try using original_uri as base
831            if self.original_uri.contains("://") {
832                // Fallback: return the path as-is
833                path_str.to_string()
834            } else {
835                self.join_paths(self.original_uri.trim_end_matches('/'), path_str)
836            }
837        } else {
838            let base = self.base_path.trim_end_matches('/');
839            self.join_paths(base, path_str)
840        }
841    }
842
843    /// Helper method to join paths using forward slashes (object store convention)
844    #[must_use]
845    fn join_paths(&self, base: &str, path: &str) -> String {
846        make_object_store_path(base, &[path])
847    }
848
849    /// Helper method to check if the original URI uses a remote object store scheme
850    #[must_use]
851    pub fn is_remote_uri(&self) -> bool {
852        self.original_uri.starts_with("s3://")
853            || self.original_uri.starts_with("gs://")
854            || self.original_uri.starts_with("gcs://")
855            || self.original_uri.starts_with("azure://")
856            || self.original_uri.starts_with("abfs://")
857            || self.original_uri.starts_with("http://")
858            || self.original_uri.starts_with("https://")
859    }
860
861    /// Executes a query against the catalog to retrieve market data of a specific type.
862    ///
863    /// This is the primary method for querying data from the catalog. It registers the appropriate
864    /// object store with the DataFusion session, finds all relevant Parquet files, and executes
865    /// the query across them. The method supports filtering by instrument IDs, time ranges, and
866    /// custom SQL WHERE clauses.
867    ///
868    /// # Type Parameters
869    ///
870    /// - `T`: The data type to query, must implement required traits for deserialization and cataloging.
871    ///
872    /// # Parameters
873    ///
874    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
875    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
876    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
877    /// - `where_clause`: Optional SQL WHERE clause for additional filtering (e.g., "price > 100").
878    ///
879    /// # Returns
880    ///
881    /// Returns a [`QueryResult`] containing the query execution context and data.
882    /// Use [`QueryResult::collect()`] to retrieve the actual data records.
883    ///
884    /// # Errors
885    ///
886    /// Returns an error if:
887    /// - Object store registration fails for remote URIs.
888    /// - File discovery fails.
889    /// - DataFusion query execution fails.
890    /// - Data deserialization fails.
891    ///
892    /// # Performance Notes
893    ///
894    /// - Files are automatically filtered by timestamp ranges before querying.
895    /// - DataFusion optimizes queries across multiple Parquet files.
896    /// - Use specific instrument IDs and time ranges to improve performance.
897    /// - WHERE clauses are pushed down to the Parquet reader when possible.
898    ///
899    /// # Examples
900    ///
901    /// ```rust,no_run
902    /// use nautilus_model::data::QuoteTick;
903    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
904    /// use nautilus_core::UnixNanos;
905    ///
906    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
907    ///
908    /// // Query all quote data
909    /// let result = catalog.query::<QuoteTick>(None, None, None, None)?;
910    /// let quotes = result.collect();
911    ///
912    /// // Query specific instruments within a time range
913    /// let result = catalog.query::<QuoteTick>(
914    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
915    ///     Some(UnixNanos::from(1609459200000000000)),
916    ///     Some(UnixNanos::from(1609545600000000000)),
917    ///     None
918    /// )?;
919    ///
920    /// // Query with custom WHERE clause
921    /// let result = catalog.query::<QuoteTick>(
922    ///     Some(vec!["EURUSD".to_string()]),
923    ///     None,
924    ///     None,
925    ///     Some("bid_price > 1.2000")
926    /// )?;
927    /// # Ok::<(), anyhow::Error>(())
928    /// ```
929    pub fn query<T>(
930        &mut self,
931        instrument_ids: Option<Vec<String>>,
932        start: Option<UnixNanos>,
933        end: Option<UnixNanos>,
934        where_clause: Option<&str>,
935        files: Option<Vec<String>>,
936    ) -> anyhow::Result<QueryResult>
937    where
938        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
939    {
940        // Register the object store with the session for remote URIs
941        if self.is_remote_uri() {
942            let url = url::Url::parse(&self.original_uri)?;
943            let host = url
944                .host_str()
945                .ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
946            let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
947            self.session
948                .register_object_store(&base_url, self.object_store.clone());
949        }
950
951        let files_list = if let Some(files) = files {
952            files
953        } else {
954            self.query_files(T::path_prefix(), instrument_ids, start, end)?
955        };
956
957        for file_uri in &files_list {
958            // Extract identifier from file path and filename to create meaningful table names
959            let identifier = extract_identifier_from_path(file_uri);
960            let safe_sql_identifier = make_sql_safe_identifier(&identifier);
961            let safe_filename = extract_sql_safe_filename(file_uri);
962
963            // Create table name from path_prefix, identifier, and filename
964            let table_name = format!(
965                "{}_{}_{}",
966                T::path_prefix(),
967                safe_sql_identifier,
968                safe_filename
969            );
970            let query = build_query(&table_name, start, end, where_clause);
971
972            // Convert object store path to filesystem path for DataFusion
973            // Only apply reconstruction if the path is not already absolute
974            let resolved_path = if file_uri.starts_with('/') {
975                // Path is already absolute, use as-is
976                file_uri.clone()
977            } else {
978                // Path is relative, reconstruct full URI
979                self.reconstruct_full_uri(file_uri)
980            };
981            self.session
982                .add_file::<T>(&table_name, &resolved_path, Some(&query))?;
983        }
984
985        Ok(self.session.get_query_result())
986    }
987
988    /// Queries typed data from the catalog and returns results as a strongly-typed vector.
989    ///
990    /// This is a convenience method that wraps the generic `query` method and automatically
991    /// collects and converts the results into a vector of the specific data type. It handles
992    /// the type conversion from the generic [`Data`] enum to the concrete type `T`.
993    ///
994    /// # Type Parameters
995    ///
996    /// - `T`: The specific data type to query and return. Must implement required traits for
997    ///   deserialization, cataloging, and conversion from the [`Data`] enum.
998    ///
999    /// # Parameters
1000    ///
1001    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
1002    ///   For exact matches, provide the full instrument ID. For bars, partial matches are supported.
1003    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1004    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1005    /// - `where_clause`: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax
1006    ///   with column names matching the Parquet schema (e.g., "`bid_price` > 1.2000", "volume > 1000").
1007    ///
1008    /// # Returns
1009    ///
1010    /// Returns a vector of the specific data type `T`, sorted by timestamp. The vector will be
1011    /// empty if no data matches the query criteria.
1012    ///
1013    /// # Errors
1014    ///
1015    /// Returns an error if:
1016    /// - The underlying query execution fails.
1017    /// - Data type conversion fails.
1018    /// - Object store access fails.
1019    /// - Invalid WHERE clause syntax is provided.
1020    ///
1021    /// # Performance Considerations
1022    ///
1023    /// - Use specific instrument IDs and time ranges to minimize data scanning.
1024    /// - WHERE clauses are pushed down to Parquet readers when possible.
1025    /// - Results are automatically sorted by timestamp during collection.
1026    /// - Memory usage scales with the amount of data returned.
1027    ///
1028    /// # Examples
1029    ///
1030    /// ```rust,no_run
1031    /// use nautilus_model::data::{QuoteTick, TradeTick, Bar};
1032    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1033    /// use nautilus_core::UnixNanos;
1034    ///
1035    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1036    ///
1037    /// // Query all quotes for a specific instrument
1038    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1039    ///     Some(vec!["EURUSD".to_string()]),
1040    ///     None,
1041    ///     None,
1042    ///     None
1043    /// )?;
1044    ///
1045    /// // Query trades within a specific time range
1046    /// let trades: Vec<TradeTick> = catalog.query_typed_data(
1047    ///     Some(vec!["BTCUSD".to_string()]),
1048    ///     Some(UnixNanos::from(1609459200000000000)),
1049    ///     Some(UnixNanos::from(1609545600000000000)),
1050    ///     None
1051    /// )?;
1052    ///
1053    /// // Query bars with volume filter
1054    /// let bars: Vec<Bar> = catalog.query_typed_data(
1055    ///     Some(vec!["AAPL".to_string()]),
1056    ///     None,
1057    ///     None,
1058    ///     Some("volume > 1000000")
1059    /// )?;
1060    ///
1061    /// // Query multiple instruments with price filter
1062    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1063    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
1064    ///     None,
1065    ///     None,
1066    ///     Some("bid_price > 1.2000 AND ask_price < 1.3000")
1067    /// )?;
1068    /// # Ok::<(), anyhow::Error>(())
1069    /// ```
1070    pub fn query_typed_data<T>(
1071        &mut self,
1072        instrument_ids: Option<Vec<String>>,
1073        start: Option<UnixNanos>,
1074        end: Option<UnixNanos>,
1075        where_clause: Option<&str>,
1076        files: Option<Vec<String>>,
1077    ) -> anyhow::Result<Vec<T>>
1078    where
1079        T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
1080    {
1081        let query_result = self.query::<T>(instrument_ids, start, end, where_clause, files)?;
1082        let all_data = query_result.collect();
1083
1084        // Convert Data enum variants to specific type T using to_variant
1085        Ok(to_variant::<T>(all_data))
1086    }
1087
1088    /// Queries all Parquet files for a specific data type and optional instrument IDs.
1089    ///
1090    /// This method finds all Parquet files that match the specified criteria and returns
1091    /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1092    /// and timestamp range (if provided).
1093    ///
1094    /// # Parameters
1095    ///
1096    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1097    /// - `instrument_ids`: Optional list of instrument IDs to filter by.
1098    /// - `start`: Optional start timestamp to filter files by their time range.
1099    /// - `end`: Optional end timestamp to filter files by their time range.
1100    ///
1101    /// # Returns
1102    ///
1103    /// Returns a vector of file URI strings that match the query criteria,
1104    /// or an error if the query fails.
1105    ///
1106    /// # Errors
1107    ///
1108    /// Returns an error if:
1109    /// - The directory path cannot be constructed.
1110    /// - Object store listing operations fail.
1111    /// - URI reconstruction fails.
1112    ///
1113    /// # Examples
1114    ///
1115    /// ```rust,no_run
1116    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1117    /// use nautilus_core::UnixNanos;
1118    ///
1119    /// let catalog = ParquetDataCatalog::new(/* ... */);
1120    ///
1121    /// // Query all quote files
1122    /// let files = catalog.query_files("quotes", None, None, None)?;
1123    ///
1124    /// // Query trade files for specific instruments within a time range
1125    /// let files = catalog.query_files(
1126    ///     "trades",
1127    ///     Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
1128    ///     Some(UnixNanos::from(1609459200000000000)),
1129    ///     Some(UnixNanos::from(1609545600000000000))
1130    /// )?;
1131    /// # Ok::<(), anyhow::Error>(())
1132    /// ```
1133    pub fn query_files(
1134        &self,
1135        data_cls: &str,
1136        instrument_ids: Option<Vec<String>>,
1137        start: Option<UnixNanos>,
1138        end: Option<UnixNanos>,
1139    ) -> anyhow::Result<Vec<String>> {
1140        let mut files = Vec::new();
1141
1142        let start_u64 = start.map(|s| s.as_u64());
1143        let end_u64 = end.map(|e| e.as_u64());
1144
1145        let base_dir = self.make_path(data_cls, None)?;
1146
1147        // Use recursive listing to match Python's glob behavior
1148        let list_result = self.execute_async(async {
1149            let prefix = ObjectPath::from(format!("{base_dir}/"));
1150            let mut stream = self.object_store.list(Some(&prefix));
1151            let mut objects = Vec::new();
1152            while let Some(object) = stream.next().await {
1153                objects.push(object?);
1154            }
1155            Ok::<Vec<_>, anyhow::Error>(objects)
1156        })?;
1157
1158        let mut file_paths: Vec<String> = list_result
1159            .into_iter()
1160            .filter_map(|object| {
1161                let path_str = object.location.to_string();
1162                if path_str.ends_with(".parquet") {
1163                    Some(path_str)
1164                } else {
1165                    None
1166                }
1167            })
1168            .collect();
1169
1170        // Apply identifier filtering if provided
1171        if let Some(identifiers) = instrument_ids {
1172            let safe_identifiers: Vec<String> = identifiers
1173                .iter()
1174                .map(|id| urisafe_instrument_id(id))
1175                .collect();
1176
1177            // Exact match by default for instrument_ids or bar_types
1178            let exact_match_file_paths: Vec<String> = file_paths
1179                .iter()
1180                .filter(|file_path| {
1181                    // Extract the directory name (second to last path component)
1182                    let path_parts: Vec<&str> = file_path.split('/').collect();
1183                    if path_parts.len() >= 2 {
1184                        let dir_name = path_parts[path_parts.len() - 2];
1185                        safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
1186                    } else {
1187                        false
1188                    }
1189                })
1190                .cloned()
1191                .collect();
1192
1193            if exact_match_file_paths.is_empty() && data_cls == "bars" {
1194                // Partial match of instrument_ids in bar_types for bars
1195                file_paths.retain(|file_path| {
1196                    let path_parts: Vec<&str> = file_path.split('/').collect();
1197                    if path_parts.len() >= 2 {
1198                        let dir_name = path_parts[path_parts.len() - 2];
1199                        safe_identifiers
1200                            .iter()
1201                            .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
1202                    } else {
1203                        false
1204                    }
1205                });
1206            } else {
1207                file_paths = exact_match_file_paths;
1208            }
1209        }
1210
1211        // Apply timestamp filtering
1212        file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
1213
1214        // Convert to full URIs
1215        for file_path in file_paths {
1216            let full_uri = self.reconstruct_full_uri(&file_path);
1217            files.push(full_uri);
1218        }
1219
1220        Ok(files)
1221    }
1222
1223    /// Finds the missing time intervals for a specific data type and instrument ID.
1224    ///
1225    /// This method compares a requested time range against the existing data coverage
1226    /// and returns the gaps that need to be filled. This is useful for determining
1227    /// what data needs to be fetched or backfilled.
1228    ///
1229    /// # Parameters
1230    ///
1231    /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
1232    /// - `end`: End timestamp of the requested range (Unix nanoseconds).
1233    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1234    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1235    ///
1236    /// # Returns
1237    ///
1238    /// Returns a vector of (start, end) tuples representing the missing intervals,
1239    /// or an error if the operation fails.
1240    ///
1241    /// # Errors
1242    ///
1243    /// Returns an error if:
1244    /// - The directory path cannot be constructed.
1245    /// - Interval retrieval fails.
1246    /// - Gap calculation fails.
1247    ///
1248    /// # Examples
1249    ///
1250    /// ```rust,no_run
1251    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1252    ///
1253    /// let catalog = ParquetDataCatalog::new(/* ... */);
1254    ///
1255    /// // Find missing intervals for quote data
1256    /// let missing = catalog.get_missing_intervals_for_request(
1257    ///     1609459200000000000,  // start
1258    ///     1609545600000000000,  // end
1259    ///     "quotes",
1260    ///     Some("BTCUSD".to_string())
1261    /// )?;
1262    ///
1263    /// for (start, end) in missing {
1264    ///     println!("Missing data from {} to {}", start, end);
1265    /// }
1266    /// # Ok::<(), anyhow::Error>(())
1267    /// ```
1268    pub fn get_missing_intervals_for_request(
1269        &self,
1270        start: u64,
1271        end: u64,
1272        data_cls: &str,
1273        instrument_id: Option<String>,
1274    ) -> anyhow::Result<Vec<(u64, u64)>> {
1275        let intervals = self.get_intervals(data_cls, instrument_id)?;
1276
1277        Ok(query_interval_diff(start, end, &intervals))
1278    }
1279
1280    /// Gets the last (most recent) timestamp for a specific data type and instrument ID.
1281    ///
1282    /// This method finds the latest timestamp covered by existing data files for
1283    /// the specified data type and instrument. This is useful for determining
1284    /// the most recent data available or for incremental data updates.
1285    ///
1286    /// # Parameters
1287    ///
1288    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1289    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1290    ///
1291    /// # Returns
1292    ///
1293    /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
1294    /// or an error if the operation fails.
1295    ///
1296    /// # Errors
1297    ///
1298    /// Returns an error if:
1299    /// - The directory path cannot be constructed.
1300    /// - Interval retrieval fails.
1301    ///
1302    /// # Examples
1303    ///
1304    /// ```rust,no_run
1305    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1306    ///
1307    /// let catalog = ParquetDataCatalog::new(/* ... */);
1308    ///
1309    /// // Get the last timestamp for quote data
1310    /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
1311    ///     println!("Last quote timestamp: {}", last_ts);
1312    /// } else {
1313    ///     println!("No quote data found");
1314    /// }
1315    /// # Ok::<(), anyhow::Error>(())
1316    /// ```
1317    pub fn query_last_timestamp(
1318        &self,
1319        data_cls: &str,
1320        instrument_id: Option<String>,
1321    ) -> anyhow::Result<Option<u64>> {
1322        let intervals = self.get_intervals(data_cls, instrument_id)?;
1323
1324        if intervals.is_empty() {
1325            return Ok(None);
1326        }
1327
1328        Ok(Some(intervals.last().unwrap().1))
1329    }
1330
1331    /// Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
1332    ///
1333    /// This method returns all time intervals covered by existing data files for the
1334    /// specified data type and instrument. The intervals are sorted by start time and
1335    /// represent the complete data coverage available.
1336    ///
1337    /// # Parameters
1338    ///
1339    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1340    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1341    ///
1342    /// # Returns
1343    ///
1344    /// Returns a vector of (start, end) tuples representing the covered intervals,
1345    /// sorted by start time, or an error if the operation fails.
1346    ///
1347    /// # Errors
1348    ///
1349    /// Returns an error if:
1350    /// - The directory path cannot be constructed.
1351    /// - Directory listing fails.
1352    /// - Filename parsing fails.
1353    ///
1354    /// # Examples
1355    ///
1356    /// ```rust,no_run
1357    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1358    ///
1359    /// let catalog = ParquetDataCatalog::new(/* ... */);
1360    ///
1361    /// // Get all intervals for quote data
1362    /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
1363    /// for (start, end) in intervals {
1364    ///     println!("Data available from {} to {}", start, end);
1365    /// }
1366    /// # Ok::<(), anyhow::Error>(())
1367    /// ```
1368    pub fn get_intervals(
1369        &self,
1370        data_cls: &str,
1371        instrument_id: Option<String>,
1372    ) -> anyhow::Result<Vec<(u64, u64)>> {
1373        let directory = self.make_path(data_cls, instrument_id)?;
1374
1375        self.get_directory_intervals(&directory)
1376    }
1377
1378    /// Gets the time intervals covered by Parquet files in a specific directory.
1379    ///
1380    /// This method scans a directory for Parquet files and extracts the timestamp ranges
1381    /// from their filenames. It's used internally by other methods to determine data coverage
1382    /// and is essential for interval-based operations like gap detection and consolidation.
1383    ///
1384    /// # Parameters
1385    ///
1386    /// - `directory`: The directory path to scan for Parquet files.
1387    ///
1388    /// # Returns
1389    ///
1390    /// Returns a vector of (start, end) tuples representing the time intervals covered
1391    /// by files in the directory, sorted by start timestamp. Returns an empty vector
1392    /// if the directory doesn't exist or contains no valid Parquet files.
1393    ///
1394    /// # Errors
1395    ///
1396    /// Returns an error if:
1397    /// - Object store listing operations fail.
1398    /// - Directory access is denied.
1399    ///
1400    /// # Notes
1401    ///
1402    /// - Only files with valid timestamp-based filenames are included.
1403    /// - Files with unparseable names are silently ignored.
1404    /// - The method works with both local and remote object stores.
1405    /// - Results are automatically sorted by start timestamp.
1406    ///
1407    /// # Examples
1408    ///
1409    /// ```rust,no_run
1410    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1411    ///
1412    /// let catalog = ParquetDataCatalog::new(/* ... */);
1413    /// let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
1414    ///
1415    /// for (start, end) in intervals {
1416    ///     println!("File covers {} to {}", start, end);
1417    /// }
1418    /// # Ok::<(), anyhow::Error>(())
1419    /// ```
1420    pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
1421        let mut intervals = Vec::new();
1422
1423        // Use object store for all operations
1424        let list_result = self.execute_async(async {
1425            let path = object_store::path::Path::from(directory);
1426            Ok(self
1427                .object_store
1428                .list(Some(&path))
1429                .collect::<Vec<_>>()
1430                .await)
1431        })?;
1432
1433        for result in list_result {
1434            match result {
1435                Ok(object) => {
1436                    let path_str = object.location.to_string();
1437                    if path_str.ends_with(".parquet")
1438                        && let Some(interval) = parse_filename_timestamps(&path_str)
1439                    {
1440                        intervals.push(interval);
1441                    }
1442                }
1443                Err(_) => {
1444                    // Directory doesn't exist or is empty, which is fine
1445                    break;
1446                }
1447            }
1448        }
1449
1450        intervals.sort_by_key(|&(start, _)| start);
1451
1452        Ok(intervals)
1453    }
1454
1455    /// Constructs a directory path for storing data of a specific type and instrument.
1456    ///
1457    /// This method builds the hierarchical directory structure used by the catalog to organize
1458    /// data by type and instrument. The path follows the pattern: `{base_path}/data/{type_name}/{instrument_id}`.
1459    /// Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
1460    ///
1461    /// # Parameters
1462    ///
1463    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1464    /// - `instrument_id`: Optional instrument ID. If provided, creates a subdirectory for the instrument.
1465    ///   If `None`, returns the path to the data type directory.
1466    ///
1467    /// # Returns
1468    ///
1469    /// Returns the constructed directory path as a string, or an error if path construction fails.
1470    ///
1471    /// # Errors
1472    ///
1473    /// Returns an error if:
1474    /// - The instrument ID contains invalid characters that cannot be made URI-safe.
1475    /// - Path construction fails due to system limitations.
1476    ///
1477    /// # Path Structure
1478    ///
1479    /// - Without instrument ID: `{base_path}/data/{type_name}`.
1480    /// - With instrument ID: `{base_path}/data/{type_name}/{safe_instrument_id}`.
1481    /// - If `base_path` is empty: `data/{type_name}[/{safe_instrument_id}]`.
1482    ///
1483    /// # Examples
1484    ///
1485    /// ```rust,no_run
1486    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1487    ///
1488    /// let catalog = ParquetDataCatalog::new(/* ... */);
1489    ///
1490    /// // Path for all quote data
1491    /// let quotes_path = catalog.make_path("quotes", None)?;
1492    /// // Returns: "/base/path/data/quotes"
1493    ///
1494    /// // Path for specific instrument quotes
1495    /// let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
1496    /// // Returns: "/base/path/data/quotes/EURUSD" (slash removed)
1497    ///
1498    /// // Path for bar data with complex instrument ID
1499    /// let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
1500    /// // Returns: "/base/path/data/bars/BTCUSD-1H"
1501    /// # Ok::<(), anyhow::Error>(())
1502    /// ```
1503    pub fn make_path(
1504        &self,
1505        type_name: &str,
1506        instrument_id: Option<String>,
1507    ) -> anyhow::Result<String> {
1508        let mut components = vec!["data".to_string(), type_name.to_string()];
1509
1510        if let Some(id) = instrument_id {
1511            let safe_id = urisafe_instrument_id(&id);
1512            components.push(safe_id);
1513        }
1514
1515        let path = make_object_store_path_owned(&self.base_path, components);
1516        Ok(path)
1517    }
1518
1519    /// Helper method to rename a parquet file by moving it via object store operations
1520    fn rename_parquet_file(
1521        &self,
1522        directory: &str,
1523        old_start: u64,
1524        old_end: u64,
1525        new_start: u64,
1526        new_end: u64,
1527    ) -> anyhow::Result<()> {
1528        let old_filename =
1529            timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
1530        let old_path = format!("{directory}/{old_filename}");
1531        let old_object_path = self.to_object_path(&old_path);
1532
1533        let new_filename =
1534            timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
1535        let new_path = format!("{directory}/{new_filename}");
1536        let new_object_path = self.to_object_path(&new_path);
1537
1538        self.move_file(&old_object_path, &new_object_path)
1539    }
1540
1541    /// Converts a catalog path string to an [`ObjectPath`] for object store operations.
1542    ///
1543    /// This method handles the conversion between catalog-relative paths and object store paths,
1544    /// taking into account the catalog's base path configuration. It automatically strips the
1545    /// base path prefix when present to create the correct object store path.
1546    ///
1547    /// # Parameters
1548    ///
1549    /// - `path`: The catalog path string to convert. Can be absolute or relative.
1550    ///
1551    /// # Returns
1552    ///
1553    /// Returns an [`ObjectPath`] suitable for use with object store operations.
1554    ///
1555    /// # Path Handling
1556    ///
1557    /// - If `base_path` is empty, the path is used as-is.
1558    /// - If `base_path` is set, it's stripped from the path if present.
1559    /// - Trailing slashes and backslashes are automatically handled.
1560    /// - The resulting path is relative to the object store root.
1561    /// - All paths are normalized to use forward slashes (object store convention).
1562    ///
1563    /// # Examples
1564    ///
1565    /// ```rust,no_run
1566    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1567    ///
1568    /// let catalog = ParquetDataCatalog::new(/* ... */);
1569    ///
1570    /// // Convert a full catalog path
1571    /// let object_path = catalog.to_object_path("/base/data/quotes/file.parquet");
1572    /// // Returns: ObjectPath("data/quotes/file.parquet") if base_path is "/base"
1573    ///
1574    /// // Convert a relative path
1575    /// let object_path = catalog.to_object_path("data/trades/file.parquet");
1576    /// // Returns: ObjectPath("data/trades/file.parquet")
1577    /// ```
1578    #[must_use]
1579    pub fn to_object_path(&self, path: &str) -> ObjectPath {
1580        // Normalize path separators to forward slashes for object store
1581        let normalized_path = path.replace('\\', "/");
1582
1583        if self.base_path.is_empty() {
1584            return ObjectPath::from(normalized_path);
1585        }
1586
1587        // Normalize base path separators as well
1588        let normalized_base = self.base_path.replace('\\', "/");
1589        let base = normalized_base.trim_end_matches('/');
1590
1591        // Remove the catalog base prefix if present
1592        let without_base = normalized_path
1593            .strip_prefix(&format!("{base}/"))
1594            .or_else(|| normalized_path.strip_prefix(base))
1595            .unwrap_or(&normalized_path);
1596
1597        ObjectPath::from(without_base)
1598    }
1599
1600    /// Helper method to move a file using object store rename operation
1601    pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
1602        self.execute_async(async {
1603            self.object_store
1604                .rename(old_path, new_path)
1605                .await
1606                .map_err(anyhow::Error::from)
1607        })
1608    }
1609
1610    /// Helper method to execute async operations with a runtime
1611    pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
1612    where
1613        F: std::future::Future<Output = anyhow::Result<R>>,
1614    {
1615        let rt = nautilus_common::runtime::get_runtime();
1616        rt.block_on(future)
1617    }
1618}
1619
1620/// Trait for providing catalog path prefixes for different data types.
1621///
1622/// This trait enables type-safe organization of data within the catalog by providing
1623/// a standardized way to determine the directory structure for each data type.
1624/// Each data type maps to a specific subdirectory within the catalog's data folder.
1625///
1626/// # Implementation
1627///
1628/// Types implementing this trait should return a static string that represents
1629/// the directory name where data of that type should be stored.
1630///
1631/// # Examples
1632///
1633/// ```rust
1634/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
1635/// use nautilus_model::data::QuoteTick;
1636///
1637/// assert_eq!(QuoteTick::path_prefix(), "quotes");
1638/// ```
1639pub trait CatalogPathPrefix {
1640    /// Returns the path prefix (directory name) for this data type.
1641    ///
1642    /// # Returns
1643    ///
1644    /// A static string representing the directory name where this data type is stored.
1645    fn path_prefix() -> &'static str;
1646}
1647
1648/// Macro for implementing [`CatalogPathPrefix`] for data types.
1649///
1650/// This macro provides a convenient way to implement the trait for multiple types
1651/// with their corresponding path prefixes.
1652///
1653/// # Parameters
1654///
1655/// - `$type`: The data type to implement the trait for.
1656/// - `$path`: The path prefix string for that type.
1657macro_rules! impl_catalog_path_prefix {
1658    ($type:ty, $path:expr) => {
1659        impl CatalogPathPrefix for $type {
1660            fn path_prefix() -> &'static str {
1661                $path
1662            }
1663        }
1664    };
1665}
1666
1667// Standard implementations for financial data types
1668impl_catalog_path_prefix!(QuoteTick, "quotes");
1669impl_catalog_path_prefix!(TradeTick, "trades");
1670impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
1671impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
1672impl_catalog_path_prefix!(Bar, "bars");
1673impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
1674impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
1675impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
1676
1677/// Converts timestamps to a filename using ISO 8601 format.
1678///
1679/// This function converts two Unix nanosecond timestamps to a filename that uses
1680/// ISO 8601 format with filesystem-safe characters. The format matches the Python
1681/// implementation for consistency.
1682///
1683/// # Parameters
1684///
1685/// - `timestamp_1`: First timestamp in Unix nanoseconds.
1686/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
1687///
1688/// # Returns
1689///
1690/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
1691///
1692/// # Examples
1693///
1694/// ```rust
1695/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
1696/// # use nautilus_core::UnixNanos;
1697/// let filename = timestamps_to_filename(
1698///     UnixNanos::from(1609459200000000000),
1699///     UnixNanos::from(1609545600000000000)
1700/// );
1701/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
1702/// ```
1703#[must_use]
1704pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
1705    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
1706    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
1707
1708    format!("{datetime_1}_{datetime_2}.parquet")
1709}
1710
1711/// Converts an ISO 8601 timestamp to a filesystem-safe format.
1712///
1713/// This function replaces colons and dots with hyphens to make the timestamp
1714/// safe for use in filenames across different filesystems.
1715///
1716/// # Parameters
1717///
1718/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1719///
1720/// # Returns
1721///
1722/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1723///
1724/// # Examples
1725///
1726/// ```rust
1727/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
1728/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
1729/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
1730/// ```
1731fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
1732    iso_timestamp.replace([':', '.'], "-")
1733}
1734
1735/// Converts a filesystem-safe timestamp back to ISO 8601 format.
1736///
1737/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
1738/// converting filesystem-safe timestamps back to standard ISO 8601 format.
1739///
1740/// # Parameters
1741///
1742/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1743///
1744/// # Returns
1745///
1746/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1747///
1748/// # Examples
1749///
1750/// ```rust
1751/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
1752/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
1753/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
1754/// ```
1755fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
1756    let (date_part, time_part) = file_timestamp
1757        .split_once('T')
1758        .unwrap_or((file_timestamp, ""));
1759    let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
1760
1761    // Find the last hyphen to separate nanoseconds
1762    if let Some(last_hyphen_idx) = time_part.rfind('-') {
1763        let time_with_dot_for_nanos = format!(
1764            "{}.{}",
1765            &time_part[..last_hyphen_idx],
1766            &time_part[last_hyphen_idx + 1..]
1767        );
1768        let final_time_part = time_with_dot_for_nanos.replace('-', ":");
1769        format!("{date_part}T{final_time_part}Z")
1770    } else {
1771        // Fallback if no nanoseconds part found
1772        let final_time_part = time_part.replace('-', ":");
1773        format!("{date_part}T{final_time_part}Z")
1774    }
1775}
1776
1777/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
1778///
1779/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
1780/// It's used to convert parsed timestamps back to the internal representation.
1781///
1782/// # Parameters
1783///
1784/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1785///
1786/// # Returns
1787///
1788/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
1789///
1790/// # Examples
1791///
1792/// ```rust
1793/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
1794/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
1795/// assert_eq!(nanos, 1609459200000000000);
1796/// ```
1797fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
1798    Ok(iso8601_to_unix_nanos(iso_timestamp.to_string())?.into())
1799}
1800
1801/// Converts an instrument ID to a URI-safe format by removing forward slashes.
1802///
1803/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
1804/// suitable for use in file paths. This function removes these characters to
1805/// create a safe directory name.
1806///
1807/// # Parameters
1808///
1809/// - `instrument_id`: The original instrument ID string.
1810///
1811/// # Returns
1812///
1813/// A URI-safe version of the instrument ID with forward slashes removed.
1814///
1815/// # Examples
1816///
1817/// ```rust
1818/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
1819/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
1820/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
1821/// ```
1822fn urisafe_instrument_id(instrument_id: &str) -> String {
1823    instrument_id.replace('/', "")
1824}
1825
1826/// Extracts the identifier from a file path.
1827///
1828/// The identifier is typically the second-to-last path component (directory name).
1829/// For example, from "`data/quote_tick/EURUSD/file.parquet`", extracts "EURUSD".
1830#[must_use]
1831pub fn extract_identifier_from_path(file_path: &str) -> String {
1832    let path_parts: Vec<&str> = file_path.split('/').collect();
1833    if path_parts.len() >= 2 {
1834        path_parts[path_parts.len() - 2].to_string()
1835    } else {
1836        "unknown".to_string()
1837    }
1838}
1839
1840/// Makes an identifier safe for use in SQL table names.
1841///
1842/// Removes forward slashes, replaces dots, hyphens, and spaces with underscores, and converts to lowercase.
1843#[must_use]
1844pub fn make_sql_safe_identifier(identifier: &str) -> String {
1845    urisafe_instrument_id(identifier)
1846        .replace(['.', '-', ' ', '%'], "_")
1847        .to_lowercase()
1848}
1849
1850/// Extracts the filename from a file path and makes it SQL-safe.
1851///
1852/// For example, from "data/quote_tick/EURUSD/2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
1853/// extracts "`2021_01_01t00_00_00_000000000z_2021_01_02t00_00_00_000000000z`".
1854#[must_use]
1855pub fn extract_sql_safe_filename(file_path: &str) -> String {
1856    if file_path.is_empty() {
1857        return "unknown_file".to_string();
1858    }
1859
1860    let filename = file_path.split('/').next_back().unwrap_or("unknown_file");
1861
1862    // Remove .parquet extension
1863    let name_without_ext = if let Some(dot_pos) = filename.rfind(".parquet") {
1864        &filename[..dot_pos]
1865    } else {
1866        filename
1867    };
1868
1869    // Remove characters that can pose problems: hyphens, colons, etc.
1870    name_without_ext
1871        .replace(['-', ':', '.'], "_")
1872        .to_lowercase()
1873}
1874
1875/// Creates a platform-appropriate local path using `PathBuf`.
1876///
1877/// This function constructs file system paths using the platform's native path separators.
1878/// Use this for local file operations that need to work with the actual file system.
1879///
1880/// # Arguments
1881///
1882/// * `base_path` - The base directory path
1883/// * `components` - Path components to join
1884///
1885/// # Returns
1886///
1887/// A `PathBuf` with platform-appropriate separators
1888///
1889/// # Examples
1890///
1891/// ```rust
1892/// # use nautilus_persistence::backend::catalog::make_local_path;
1893/// let path = make_local_path("/base", &["data", "quotes", "EURUSD"]);
1894/// // On Unix: "/base/data/quotes/EURUSD"
1895/// // On Windows: "\base\data\quotes\EURUSD"
1896/// ```
1897pub fn make_local_path<P: AsRef<Path>>(base_path: P, components: &[&str]) -> PathBuf {
1898    let mut path = PathBuf::from(base_path.as_ref());
1899    for component in components {
1900        path.push(component);
1901    }
1902    path
1903}
1904
1905/// Creates an object store path using forward slashes.
1906///
1907/// Object stores (S3, GCS, etc.) always expect forward slashes regardless of platform.
1908/// Use this when creating paths for object store operations.
1909///
1910/// # Arguments
1911///
1912/// * `base_path` - The base path (can be empty)
1913/// * `components` - Path components to join
1914///
1915/// # Returns
1916///
1917/// A string path with forward slash separators
1918///
1919/// # Examples
1920///
1921/// ```rust
1922/// # use nautilus_persistence::backend::catalog::make_object_store_path;
1923/// let path = make_object_store_path("base", &["data", "quotes", "EURUSD"]);
1924/// assert_eq!(path, "base/data/quotes/EURUSD");
1925/// ```
1926#[must_use]
1927pub fn make_object_store_path(base_path: &str, components: &[&str]) -> String {
1928    let mut parts = Vec::new();
1929
1930    if !base_path.is_empty() {
1931        let normalized_base = base_path
1932            .replace('\\', "/")
1933            .trim_end_matches('/')
1934            .to_string();
1935        if !normalized_base.is_empty() {
1936            parts.push(normalized_base);
1937        }
1938    }
1939
1940    for component in components {
1941        let normalized_component = component
1942            .replace('\\', "/")
1943            .trim_start_matches('/')
1944            .trim_end_matches('/')
1945            .to_string();
1946        if !normalized_component.is_empty() {
1947            parts.push(normalized_component);
1948        }
1949    }
1950
1951    parts.join("/")
1952}
1953
1954/// Creates an object store path using forward slashes with owned strings.
1955///
1956/// This variant accepts owned strings to avoid lifetime issues.
1957///
1958/// # Arguments
1959///
1960/// * `base_path` - The base path (can be empty)
1961/// * `components` - Path components to join (owned strings)
1962///
1963/// # Returns
1964///
1965/// A string path with forward slash separators
1966#[must_use]
1967pub fn make_object_store_path_owned(base_path: &str, components: Vec<String>) -> String {
1968    let mut parts = Vec::new();
1969
1970    if !base_path.is_empty() {
1971        let normalized_base = base_path
1972            .replace('\\', "/")
1973            .trim_end_matches('/')
1974            .to_string();
1975        if !normalized_base.is_empty() {
1976            parts.push(normalized_base);
1977        }
1978    }
1979
1980    for component in components {
1981        let normalized_component = component
1982            .replace('\\', "/")
1983            .trim_start_matches('/')
1984            .trim_end_matches('/')
1985            .to_string();
1986        if !normalized_component.is_empty() {
1987            parts.push(normalized_component);
1988        }
1989    }
1990
1991    parts.join("/")
1992}
1993
1994/// Converts a local `PathBuf` to an object store path string.
1995///
1996/// This function normalizes a local file system path to the forward-slash format
1997/// expected by object stores, handling platform differences.
1998///
1999/// # Arguments
2000///
2001/// * `local_path` - The local `PathBuf` to convert
2002///
2003/// # Returns
2004///
2005/// A string with forward slash separators suitable for object store operations
2006///
2007/// # Examples
2008///
2009/// ```rust
2010/// # use std::path::PathBuf;
2011/// # use nautilus_persistence::backend::catalog::local_to_object_store_path;
2012/// let local_path = PathBuf::from("data").join("quotes").join("EURUSD");
2013/// let object_path = local_to_object_store_path(&local_path);
2014/// assert_eq!(object_path, "data/quotes/EURUSD");
2015/// ```
2016#[must_use]
2017pub fn local_to_object_store_path(local_path: &Path) -> String {
2018    local_path.to_string_lossy().replace('\\', "/")
2019}
2020
2021/// Extracts path components using platform-appropriate path parsing.
2022///
2023/// This function safely parses a path into its components, handling both
2024/// local file system paths and object store paths correctly.
2025///
2026/// # Arguments
2027///
2028/// * `path_str` - The path string to parse
2029///
2030/// # Returns
2031///
2032/// A vector of path components
2033///
2034/// # Examples
2035///
2036/// ```rust
2037/// # use nautilus_persistence::backend::catalog::extract_path_components;
2038/// let components = extract_path_components("data/quotes/EURUSD");
2039/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2040///
2041/// // Works with both separators
2042/// let components = extract_path_components("data\\quotes\\EURUSD");
2043/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2044/// ```
2045#[must_use]
2046pub fn extract_path_components(path_str: &str) -> Vec<String> {
2047    // Normalize separators and split
2048    let normalized = path_str.replace('\\', "/");
2049    normalized
2050        .split('/')
2051        .filter(|s| !s.is_empty())
2052        .map(ToString::to_string)
2053        .collect()
2054}
2055
2056/// Checks if a filename's timestamp range intersects with a query interval.
2057///
2058/// This function determines whether a Parquet file (identified by its timestamp-based
2059/// filename) contains data that falls within the specified query time range.
2060///
2061/// # Parameters
2062///
2063/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
2064/// - `start`: Optional start timestamp for the query range.
2065/// - `end`: Optional end timestamp for the query range.
2066///
2067/// # Returns
2068///
2069/// Returns `true` if the file's time range intersects with the query range,
2070/// `false` otherwise. Returns `true` if the filename cannot be parsed.
2071///
2072/// # Examples
2073///
2074/// ```rust
2075/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
2076/// // Example with ISO format filenames
2077/// assert!(query_intersects_filename(
2078///     "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
2079///     Some(1609459200000000000),
2080///     Some(1609545600000000000)
2081/// ));
2082/// ```
2083fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
2084    if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
2085        (start.is_none() || start.unwrap() <= file_end)
2086            && (end.is_none() || file_start <= end.unwrap())
2087    } else {
2088        true
2089    }
2090}
2091
2092/// Parses timestamps from a Parquet filename.
2093///
2094/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
2095/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
2096///
2097/// # Parameters
2098///
2099/// - `filename`: The filename to parse (can be a full path).
2100///
2101/// # Returns
2102///
2103/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
2104/// `None` otherwise.
2105///
2106/// # Examples
2107///
2108/// ```rust
2109/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
2110/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
2111/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
2112/// ```
2113#[must_use]
2114pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
2115    let path = Path::new(filename);
2116    let base_name = path.file_name()?.to_str()?;
2117    let base_filename = base_name.strip_suffix(".parquet")?;
2118    let (first_part, second_part) = base_filename.split_once('_')?;
2119
2120    let first_iso = file_timestamp_to_iso_timestamp(first_part);
2121    let second_iso = file_timestamp_to_iso_timestamp(second_part);
2122
2123    let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
2124    let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
2125
2126    Some((first_ts, second_ts))
2127}
2128
2129/// Checks if a list of closed integer intervals are all mutually disjoint.
2130///
2131/// Two intervals are disjoint if they do not overlap. This function validates that
2132/// all intervals in the list are non-overlapping, which is a requirement for
2133/// maintaining data integrity in the catalog.
2134///
2135/// # Parameters
2136///
2137/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2138///
2139/// # Returns
2140///
2141/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
2142/// Returns `true` for empty lists or lists with a single interval.
2143///
2144/// # Examples
2145///
2146/// ```rust
2147/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
2148/// // Disjoint intervals
2149/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
2150///
2151/// // Overlapping intervals
2152/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
2153/// ```
2154#[must_use]
2155pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
2156    let n = intervals.len();
2157
2158    if n <= 1 {
2159        return true;
2160    }
2161
2162    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2163    sorted_intervals.sort_by_key(|&(start, _)| start);
2164
2165    for i in 0..(n - 1) {
2166        let (_, end1) = sorted_intervals[i];
2167        let (start2, _) = sorted_intervals[i + 1];
2168
2169        if end1 >= start2 {
2170            return false;
2171        }
2172    }
2173
2174    true
2175}
2176
2177/// Checks if intervals are contiguous (adjacent with no gaps).
2178///
2179/// Intervals are contiguous if, when sorted by start time, each interval's start
2180/// timestamp is exactly one more than the previous interval's end timestamp.
2181/// This ensures complete coverage of a time range with no gaps.
2182///
2183/// # Parameters
2184///
2185/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2186///
2187/// # Returns
2188///
2189/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
2190/// Returns `true` for empty lists or lists with a single interval.
2191///
2192/// # Examples
2193///
2194/// ```rust
2195/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
2196/// // Contiguous intervals
2197/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
2198///
2199/// // Non-contiguous intervals (gap between 5 and 8)
2200/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
2201/// ```
2202#[must_use]
2203pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
2204    let n = intervals.len();
2205    if n <= 1 {
2206        return true;
2207    }
2208
2209    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2210    sorted_intervals.sort_by_key(|&(start, _)| start);
2211
2212    for i in 0..(n - 1) {
2213        let (_, end1) = sorted_intervals[i];
2214        let (start2, _) = sorted_intervals[i + 1];
2215
2216        if end1 + 1 != start2 {
2217            return false;
2218        }
2219    }
2220
2221    true
2222}
2223
2224/// Finds the parts of a query interval that are not covered by existing data intervals.
2225///
2226/// This function calculates the "gaps" in data coverage by comparing a requested
2227/// time range against the intervals covered by existing data files. It's used to
2228/// determine what data needs to be fetched or backfilled.
2229///
2230/// # Parameters
2231///
2232/// - `start`: Start timestamp of the query interval (inclusive).
2233/// - `end`: End timestamp of the query interval (inclusive).
2234/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
2235///
2236/// # Returns
2237///
2238/// Returns a vector of (start, end) tuples representing the gaps in coverage.
2239/// Returns an empty vector if the query range is invalid or fully covered.
2240///
2241/// # Examples
2242///
2243/// ```rust
2244/// # use nautilus_persistence::backend::catalog::query_interval_diff;
2245/// // Query 1-100, have data for 10-30 and 60-80
2246/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
2247/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
2248/// ```
2249fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
2250    if start > end {
2251        return Vec::new();
2252    }
2253
2254    let interval_set = get_interval_set(closed_intervals);
2255    let query_range = (Bound::Included(start), Bound::Included(end));
2256    let query_diff = interval_set.get_interval_difference(&query_range);
2257    let mut result: Vec<(u64, u64)> = Vec::new();
2258
2259    for interval in query_diff {
2260        if let Some(tuple) = interval_to_tuple(interval, start, end) {
2261            result.push(tuple);
2262        }
2263    }
2264
2265    result
2266}
2267
2268/// Creates an interval tree from closed integer intervals.
2269///
2270/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
2271/// for use with the interval tree data structure, which is used for efficient
2272/// interval operations and gap detection.
2273///
2274/// # Parameters
2275///
2276/// - `intervals`: A slice of closed intervals as (start, end) tuples.
2277///
2278/// # Returns
2279///
2280/// Returns an [`IntervalTree`] containing the converted intervals.
2281///
2282/// # Notes
2283///
2284/// - Invalid intervals (where start > end) are skipped.
2285/// - Uses saturating addition to prevent overflow when converting to half-open intervals.
2286fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
2287    let mut tree = IntervalTree::default();
2288
2289    if intervals.is_empty() {
2290        return tree;
2291    }
2292
2293    for &(start, end) in intervals {
2294        if start > end {
2295            continue;
2296        }
2297
2298        tree.insert((
2299            Bound::Included(start),
2300            Bound::Excluded(end.saturating_add(1)),
2301        ));
2302    }
2303
2304    tree
2305}
2306
2307/// Converts an interval tree result back to a closed interval tuple.
2308///
2309/// This helper function converts the bounded interval representation used by
2310/// the interval tree back into the (start, end) tuple format used throughout
2311/// the catalog.
2312///
2313/// # Parameters
2314///
2315/// - `interval`: The bounded interval from the interval tree.
2316/// - `query_start`: The start of the original query range.
2317/// - `query_end`: The end of the original query range.
2318///
2319/// # Returns
2320///
2321/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
2322fn interval_to_tuple(
2323    interval: (Bound<&u64>, Bound<&u64>),
2324    query_start: u64,
2325    query_end: u64,
2326) -> Option<(u64, u64)> {
2327    let (bound_start, bound_end) = interval;
2328
2329    let start = match bound_start {
2330        Bound::Included(val) => *val,
2331        Bound::Excluded(val) => val.saturating_add(1),
2332        Bound::Unbounded => query_start,
2333    };
2334
2335    let end = match bound_end {
2336        Bound::Included(val) => *val,
2337        Bound::Excluded(val) => {
2338            if *val == 0 {
2339                return None; // Empty interval
2340            }
2341            val - 1
2342        }
2343        Bound::Unbounded => query_end,
2344    };
2345
2346    if start <= end {
2347        Some((start, end))
2348    } else {
2349        None
2350    }
2351}