nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a comprehensive data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores.
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.).
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying.
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance.
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation.
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │   └── INSTRUMENT_ID/
37//! │       └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │   └── INSTRUMENT_ID/
40//! │       └── start_ts-end_ts.parquet
41//! └── bars/
42//!     └── INSTRUMENT_ID/
43//!         └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//!     PathBuf::from("/path/to/data"),
55//!     None,        // storage_options
56//!     Some(5000),  // batch_size
57//!     None,        // compression (defaults to SNAPPY)
58//!     None,        // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66    fmt::Debug,
67    ops::Bound,
68    path::{Path, PathBuf},
69    sync::Arc,
70};
71
72use datafusion::arrow::record_batch::RecordBatch;
73use futures::StreamExt;
74use heck::ToSnakeCase;
75use itertools::Itertools;
76use log::info;
77use nautilus_core::{
78    UnixNanos,
79    datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
80};
81use nautilus_model::data::{
82    Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
83    QuoteTick, TradeTick, close::InstrumentClose, to_variant,
84};
85use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
86use object_store::{ObjectStore, path::Path as ObjectPath};
87use serde::Serialize;
88use unbounded_interval_tree::interval_tree::IntervalTree;
89
90use super::session::{self, DataBackendSession, QueryResult, build_query};
91use crate::parquet::write_batches_to_object_store;
92
93/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
94///
95/// The `ParquetDataCatalog` provides a comprehensive solution for managing large volumes of financial
96/// market data with efficient storage, querying, and consolidation capabilities. It supports various
97/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
98///
99/// # Features
100///
101/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression.
102/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate.
103/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance.
104/// - **Data Validation**: Ensures timestamp ordering and interval consistency.
105/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed.
106/// - **Type Safety**: Strongly typed data handling with compile-time guarantees.
107///
108/// # Data Organization
109///
110/// Data is organized hierarchically by data type and instrument:
111/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`.
112/// - Files are named with their timestamp ranges for efficient range queries.
113/// - Intervals are validated to be disjoint to prevent data overlap.
114///
115/// # Performance Considerations
116///
117/// - **Batch Size**: Controls memory usage during data processing.
118/// - **Compression**: SNAPPY compression provides good balance of speed and size.
119/// - **Row Group Size**: Affects query performance and memory usage.
120/// - **File Consolidation**: Reduces the number of files for better query performance.
121pub struct ParquetDataCatalog {
122    /// The base path for data storage within the object store.
123    pub base_path: String,
124    /// The original URI provided when creating the catalog.
125    pub original_uri: String,
126    /// The object store backend for data persistence.
127    pub object_store: Arc<dyn ObjectStore>,
128    /// The DataFusion session for query execution.
129    pub session: DataBackendSession,
130    /// The number of records to process in each batch.
131    pub batch_size: usize,
132    /// The compression algorithm used for Parquet files.
133    pub compression: parquet::basic::Compression,
134    /// The maximum number of rows in each Parquet row group.
135    pub max_row_group_size: usize,
136}
137
138impl Debug for ParquetDataCatalog {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct(stringify!(ParquetDataCatalog))
141            .field("base_path", &self.base_path)
142            .finish()
143    }
144}
145
146impl ParquetDataCatalog {
147    /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
148    ///
149    /// This is a convenience constructor that converts a local path to a URI format
150    /// and delegates to [`Self::from_uri`].
151    ///
152    /// # Parameters
153    ///
154    /// - `base_path`: The base directory path for data storage.
155    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
156    /// - `batch_size`: Number of records to process in each batch (default: 5000).
157    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
158    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
159    ///
160    /// # Panics
161    ///
162    /// Panics if the path cannot be converted to a valid URI or if the object store
163    /// cannot be created from the path.
164    ///
165    /// # Examples
166    ///
167    /// ```rust,no_run
168    /// use std::path::PathBuf;
169    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
170    ///
171    /// let catalog = ParquetDataCatalog::new(
172    ///     PathBuf::from("/tmp/nautilus_data"),
173    ///     None,        // no storage options
174    ///     Some(1000),  // smaller batch size
175    ///     None,        // default compression
176    ///     None,        // default row group size
177    /// );
178    /// ```
179    #[must_use]
180    pub fn new(
181        base_path: PathBuf,
182        storage_options: Option<std::collections::HashMap<String, String>>,
183        batch_size: Option<usize>,
184        compression: Option<parquet::basic::Compression>,
185        max_row_group_size: Option<usize>,
186    ) -> Self {
187        let path_str = base_path.to_string_lossy().to_string();
188        Self::from_uri(
189            &path_str,
190            storage_options,
191            batch_size,
192            compression,
193            max_row_group_size,
194        )
195        .expect("Failed to create catalog from path")
196    }
197
198    /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
199    ///
200    /// Supports various URI schemes including local file paths and multiple cloud storage backends
201    /// supported by the `object_store` crate.
202    ///
203    /// # Supported URI Schemes
204    ///
205    /// - **AWS S3**: `s3://bucket/path`.
206    /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`.
207    /// - **Azure Blob Storage**: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`.
208    /// - **HTTP/WebDAV**: `http://` or `https://`.
209    /// - **Local files**: `file://path` or plain paths.
210    ///
211    /// # Parameters
212    ///
213    /// - `uri`: The URI for the data storage location.
214    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
215    ///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
216    ///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
217    ///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
218    /// - `batch_size`: Number of records to process in each batch (default: 5000).
219    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
220    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if:
225    /// - The URI format is invalid or unsupported.
226    /// - The object store cannot be created or accessed.
227    /// - Authentication fails for cloud storage backends.
228    ///
229    /// # Examples
230    ///
231    /// ```rust,no_run
232    /// use std::collections::HashMap;
233    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
234    ///
235    /// // Local filesystem
236    /// let local_catalog = ParquetDataCatalog::from_uri(
237    ///     "/tmp/nautilus_data",
238    ///     None, None, None, None
239    /// )?;
240    ///
241    /// // S3 bucket
242    /// let s3_catalog = ParquetDataCatalog::from_uri(
243    ///     "s3://my-bucket/nautilus-data",
244    ///     None, None, None, None
245    /// )?;
246    ///
247    /// // Google Cloud Storage
248    /// let gcs_catalog = ParquetDataCatalog::from_uri(
249    ///     "gs://my-bucket/nautilus-data",
250    ///     None, None, None, None
251    /// )?;
252    ///
253    /// // Azure Blob Storage
254    /// let azure_catalog = ParquetDataCatalog::from_uri(
255    ///     "azure://account/container/nautilus-data",
256    ///     None, None, None, None
257    /// )?;
258    ///
259    /// // S3 with custom endpoint and credentials
260    /// let mut storage_options = HashMap::new();
261    /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
262    /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
263    /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
264    ///
265    /// let s3_catalog = ParquetDataCatalog::from_uri(
266    ///     "s3://my-bucket/nautilus-data",
267    ///     Some(storage_options),
268    ///     None, None, None,
269    /// )?;
270    /// # Ok::<(), anyhow::Error>(())
271    /// ```
272    pub fn from_uri(
273        uri: &str,
274        storage_options: Option<std::collections::HashMap<String, String>>,
275        batch_size: Option<usize>,
276        compression: Option<parquet::basic::Compression>,
277        max_row_group_size: Option<usize>,
278    ) -> anyhow::Result<Self> {
279        let batch_size = batch_size.unwrap_or(5000);
280        let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
281        let max_row_group_size = max_row_group_size.unwrap_or(5000);
282
283        let (object_store, base_path, original_uri) =
284            crate::parquet::create_object_store_from_path(uri, storage_options)?;
285
286        Ok(Self {
287            base_path,
288            original_uri,
289            object_store,
290            session: session::DataBackendSession::new(batch_size),
291            batch_size,
292            compression,
293            max_row_group_size,
294        })
295    }
296
297    /// Returns the base path of the catalog for testing purposes.
298    #[must_use]
299    pub fn get_base_path(&self) -> String {
300        self.base_path.clone()
301    }
302
303    /// Resets the backend session to clear any cached table registrations.
304    ///
305    /// This is useful during catalog operations when files are being modified
306    /// and we need to ensure fresh data is loaded.
307    pub fn reset_session(&mut self) {
308        self.session.clear_registered_tables();
309    }
310
311    /// Writes mixed data types to the catalog by separating them into type-specific collections.
312    ///
313    /// This method takes a heterogeneous collection of market data and separates it by type,
314    /// then writes each type to its appropriate location in the catalog. This is useful when
315    /// processing mixed data streams or bulk data imports.
316    ///
317    /// # Parameters
318    ///
319    /// - `data`: A vector of mixed [`Data`] enum variants.
320    /// - `start`: Optional start timestamp to override the data's natural range.
321    /// - `end`: Optional end timestamp to override the data's natural range.
322    ///
323    /// # Notes
324    ///
325    /// - Data is automatically sorted by type before writing.
326    /// - Each data type is written to its own directory structure.
327    /// - Instrument data handling is not yet implemented (TODO).
328    ///
329    /// # Examples
330    ///
331    /// ```rust,no_run
332    /// use nautilus_model::data::Data;
333    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
334    ///
335    /// let catalog = ParquetDataCatalog::new(/* ... */);
336    /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
337    ///
338    /// catalog.write_data_enum(mixed_data, None, None)?;
339    /// ```
340    pub fn write_data_enum(
341        &self,
342        data: Vec<Data>,
343        start: Option<UnixNanos>,
344        end: Option<UnixNanos>,
345    ) -> anyhow::Result<()> {
346        let mut deltas: Vec<OrderBookDelta> = Vec::new();
347        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
348        let mut quotes: Vec<QuoteTick> = Vec::new();
349        let mut trades: Vec<TradeTick> = Vec::new();
350        let mut bars: Vec<Bar> = Vec::new();
351        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
352        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
353        let mut closes: Vec<InstrumentClose> = Vec::new();
354
355        for d in data.iter().cloned() {
356            match d {
357                Data::Deltas(_) => continue,
358                Data::Delta(d) => {
359                    deltas.push(d);
360                }
361                Data::Depth10(d) => {
362                    depth10s.push(*d);
363                }
364                Data::Quote(d) => {
365                    quotes.push(d);
366                }
367                Data::Trade(d) => {
368                    trades.push(d);
369                }
370                Data::Bar(d) => {
371                    bars.push(d);
372                }
373                Data::MarkPriceUpdate(p) => {
374                    mark_prices.push(p);
375                }
376                Data::IndexPriceUpdate(p) => {
377                    index_prices.push(p);
378                }
379                Data::InstrumentClose(c) => {
380                    closes.push(c);
381                }
382            }
383        }
384
385        // TODO: need to handle instruments here
386
387        self.write_to_parquet(deltas, start, end, None)?;
388        self.write_to_parquet(depth10s, start, end, None)?;
389        self.write_to_parquet(quotes, start, end, None)?;
390        self.write_to_parquet(trades, start, end, None)?;
391        self.write_to_parquet(bars, start, end, None)?;
392        self.write_to_parquet(mark_prices, start, end, None)?;
393        self.write_to_parquet(index_prices, start, end, None)?;
394        self.write_to_parquet(closes, start, end, None)?;
395
396        Ok(())
397    }
398
399    /// Writes typed data to a Parquet file in the catalog.
400    ///
401    /// This is the core method for persisting market data to the catalog. It handles data
402    /// validation, batching, compression, and ensures proper file organization with
403    /// timestamp-based naming.
404    ///
405    /// # Type Parameters
406    ///
407    /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
408    ///
409    /// # Parameters
410    ///
411    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
412    /// - `start`: Optional start timestamp to override the natural data range.
413    /// - `end`: Optional end timestamp to override the natural data range.
414    ///
415    /// # Returns
416    ///
417    /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
418    ///
419    /// # Errors
420    ///
421    /// This function will return an error if:
422    /// - Data serialization to Arrow record batches fails.
423    /// - Object store write operations fail.
424    /// - File path construction fails.
425    /// - Timestamp interval validation fails after writing.
426    ///
427    /// # Panics
428    ///
429    /// Panics if:
430    /// - Data timestamps are not in ascending order.
431    /// - Record batches are empty after conversion.
432    /// - Required metadata is missing from the schema.
433    ///
434    /// # Examples
435    ///
436    /// ```rust,no_run
437    /// use nautilus_model::data::QuoteTick;
438    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
439    ///
440    /// let catalog = ParquetDataCatalog::new(/* ... */);
441    /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
442    ///
443    /// let path = catalog.write_to_parquet(quotes, None, None)?;
444    /// println!("Data written to: {:?}", path);
445    /// # Ok::<(), anyhow::Error>(())
446    /// ```
447    pub fn write_to_parquet<T>(
448        &self,
449        data: Vec<T>,
450        start: Option<UnixNanos>,
451        end: Option<UnixNanos>,
452        skip_disjoint_check: Option<bool>,
453    ) -> anyhow::Result<PathBuf>
454    where
455        T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
456    {
457        if data.is_empty() {
458            return Ok(PathBuf::new());
459        }
460
461        let type_name = std::any::type_name::<T>().to_snake_case();
462        Self::check_ascending_timestamps(&data, &type_name)?;
463
464        let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
465        let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
466
467        let batches = self.data_to_record_batches(data)?;
468        let schema = batches.first().expect("Batches are empty.").schema();
469        let instrument_id = schema.metadata.get("instrument_id").cloned();
470
471        let directory = self.make_path(T::path_prefix(), instrument_id)?;
472        let filename = timestamps_to_filename(start_ts, end_ts);
473        let path = PathBuf::from(format!("{directory}/{filename}"));
474
475        // Write all batches to parquet file
476        info!(
477            "Writing {} batches of {type_name} data to {path:?}",
478            batches.len()
479        );
480
481        // Convert path to object store path
482        let object_path = self.to_object_path(&path.to_string_lossy());
483
484        self.execute_async(async {
485            write_batches_to_object_store(
486                &batches,
487                self.object_store.clone(),
488                &object_path,
489                Some(self.compression),
490                Some(self.max_row_group_size),
491            )
492            .await
493        })?;
494
495        if !skip_disjoint_check.unwrap_or(false) {
496            let intervals = self.get_directory_intervals(&directory)?;
497
498            if !are_intervals_disjoint(&intervals) {
499                anyhow::bail!("Intervals are not disjoint after writing a new file");
500            }
501        }
502
503        Ok(path)
504    }
505
506    /// Writes typed data to a JSON file in the catalog.
507    ///
508    /// This method provides an alternative to Parquet format for data export and debugging.
509    /// JSON files are human-readable but less efficient for large datasets.
510    ///
511    /// # Type Parameters
512    ///
513    /// - `T`: The data type to write, must implement serialization and cataloging traits.
514    ///
515    /// # Parameters
516    ///
517    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
518    /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
519    /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
520    ///
521    /// # Returns
522    ///
523    /// Returns the [`PathBuf`] of the created JSON file.
524    ///
525    /// # Errors
526    ///
527    /// This function will return an error if:
528    /// - JSON serialization fails.
529    /// - Object store write operations fail.
530    /// - File path construction fails.
531    ///
532    /// # Panics
533    ///
534    /// Panics if data timestamps are not in ascending order.
535    ///
536    /// # Examples
537    ///
538    /// ```rust,no_run
539    /// use std::path::PathBuf;
540    /// use nautilus_model::data::TradeTick;
541    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
542    ///
543    /// let catalog = ParquetDataCatalog::new(/* ... */);
544    /// let trades: Vec<TradeTick> = vec![/* trade data */];
545    ///
546    /// let path = catalog.write_to_json(
547    ///     trades,
548    ///     Some(PathBuf::from("/custom/path")),
549    ///     true  // write metadata
550    /// )?;
551    /// # Ok::<(), anyhow::Error>(())
552    /// ```
553    pub fn write_to_json<T>(
554        &self,
555        data: Vec<T>,
556        path: Option<PathBuf>,
557        write_metadata: bool,
558    ) -> anyhow::Result<PathBuf>
559    where
560        T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
561    {
562        if data.is_empty() {
563            return Ok(PathBuf::new());
564        }
565
566        let type_name = std::any::type_name::<T>().to_snake_case();
567        Self::check_ascending_timestamps(&data, &type_name)?;
568
569        let start_ts = data.first().unwrap().ts_init();
570        let end_ts = data.last().unwrap().ts_init();
571
572        let directory =
573            path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
574        let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
575        let json_path = directory.join(&filename);
576
577        info!(
578            "Writing {} records of {type_name} data to {json_path:?}",
579            data.len()
580        );
581
582        if write_metadata {
583            let metadata = T::chunk_metadata(&data);
584            let metadata_path = json_path.with_extension("metadata.json");
585            info!("Writing metadata to {metadata_path:?}");
586
587            // Use object store for metadata file
588            let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
589            let metadata_json = serde_json::to_vec_pretty(&metadata)?;
590            self.execute_async(async {
591                self.object_store
592                    .put(&metadata_object_path, metadata_json.into())
593                    .await
594                    .map_err(anyhow::Error::from)
595            })?;
596        }
597
598        // Use object store for main JSON file
599        let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
600        let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
601        self.execute_async(async {
602            self.object_store
603                .put(&json_object_path, json_data.into())
604                .await
605                .map_err(anyhow::Error::from)
606        })?;
607
608        Ok(json_path)
609    }
610
611    /// Validates that data timestamps are in ascending order.
612    ///
613    /// # Parameters
614    ///
615    /// - `data`: Slice of data records to validate.
616    /// - `type_name`: Name of the data type for error messages.
617    ///
618    /// # Panics
619    ///
620    /// Panics if any timestamp is less than the previous timestamp.
621    pub fn check_ascending_timestamps<T: HasTsInit>(
622        data: &[T],
623        type_name: &str,
624    ) -> anyhow::Result<()> {
625        if !data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()) {
626            anyhow::bail!("{type_name} timestamps must be in ascending order");
627        }
628
629        Ok(())
630    }
631
632    /// Converts data into Arrow record batches for Parquet serialization.
633    ///
634    /// This method chunks the data according to the configured batch size and converts
635    /// each chunk into an Arrow record batch with appropriate metadata.
636    ///
637    /// # Type Parameters
638    ///
639    /// - `T`: The data type to convert, must implement required encoding traits.
640    ///
641    /// # Parameters
642    ///
643    /// - `data`: Vector of data records to convert.
644    ///
645    /// # Returns
646    ///
647    /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
648    ///
649    /// # Errors
650    ///
651    /// Returns an error if record batch encoding fails for any chunk.
652    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
653    where
654        T: HasTsInit + EncodeToRecordBatch,
655    {
656        let mut batches = Vec::new();
657
658        for chunk in &data.into_iter().chunks(self.batch_size) {
659            let data = chunk.collect_vec();
660            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
661            let record_batch = T::encode_batch(&metadata, &data)?;
662            batches.push(record_batch);
663        }
664
665        Ok(batches)
666    }
667
668    /// Extends the timestamp range of an existing Parquet file by renaming it.
669    ///
670    /// This method finds an existing file that is adjacent to the specified time range
671    /// and renames it to include the new range. This is useful when appending data
672    /// that extends the time coverage of existing files.
673    ///
674    /// # Parameters
675    ///
676    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
677    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
678    /// - `start`: Start timestamp of the new range to extend to.
679    /// - `end`: End timestamp of the new range to extend to.
680    ///
681    /// # Returns
682    ///
683    /// Returns `Ok(())` on success, or an error if the operation fails.
684    ///
685    /// # Errors
686    ///
687    /// This function will return an error if:
688    /// - The directory path cannot be constructed.
689    /// - No adjacent file is found to extend.
690    /// - File rename operations fail.
691    /// - Interval validation fails after extension.
692    ///
693    /// # Examples
694    ///
695    /// ```rust,no_run
696    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
697    /// use nautilus_core::UnixNanos;
698    ///
699    /// let catalog = ParquetDataCatalog::new(/* ... */);
700    ///
701    /// // Extend a file's range backwards or forwards
702    /// catalog.extend_file_name(
703    ///     "quotes",
704    ///     Some("BTCUSD".to_string()),
705    ///     UnixNanos::from(1609459200000000000),
706    ///     UnixNanos::from(1609545600000000000)
707    /// )?;
708    /// # Ok::<(), anyhow::Error>(())
709    /// ```
710    pub fn extend_file_name(
711        &self,
712        data_cls: &str,
713        instrument_id: Option<String>,
714        start: UnixNanos,
715        end: UnixNanos,
716    ) -> anyhow::Result<()> {
717        let directory = self.make_path(data_cls, instrument_id)?;
718        let intervals = self.get_directory_intervals(&directory)?;
719
720        let start = start.as_u64();
721        let end = end.as_u64();
722
723        for interval in intervals {
724            if interval.0 == end + 1 {
725                // Extend backwards: new file covers [start, interval.1]
726                self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
727                break;
728            } else if interval.1 == start - 1 {
729                // Extend forwards: new file covers [interval.0, end]
730                self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
731                break;
732            }
733        }
734
735        let intervals = self.get_directory_intervals(&directory)?;
736
737        if !are_intervals_disjoint(&intervals) {
738            anyhow::bail!("Intervals are not disjoint after extending a file");
739        }
740
741        Ok(())
742    }
743
744    /// Lists all Parquet files in a specified directory.
745    ///
746    /// This method scans a directory and returns the full paths of all files with the `.parquet`
747    /// extension. It works with both local filesystems and remote object stores, making it
748    /// suitable for various storage backends.
749    ///
750    /// # Parameters
751    ///
752    /// - `directory`: The directory path to scan for Parquet files.
753    ///
754    /// # Returns
755    ///
756    /// Returns a vector of full file paths (as strings) for all Parquet files found in the directory.
757    /// The paths are relative to the object store root and suitable for use with object store operations.
758    /// Returns an empty vector if the directory doesn't exist or contains no Parquet files.
759    ///
760    /// # Errors
761    ///
762    /// This function will return an error if:
763    /// - Object store listing operations fail.
764    /// - Directory access is denied.
765    /// - Network issues occur (for remote object stores).
766    ///
767    /// # Notes
768    ///
769    /// - Only files ending with `.parquet` are included.
770    /// - Subdirectories are not recursively scanned.
771    /// - File paths are returned in the order provided by the object store.
772    /// - Works with all supported object store backends (local, S3, GCS, Azure, etc.).
773    ///
774    /// # Examples
775    ///
776    /// ```rust,no_run
777    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
778    ///
779    /// let catalog = ParquetDataCatalog::new(/* ... */);
780    /// let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
781    ///
782    /// for file in files {
783    ///     println!("Found Parquet file: {}", file);
784    /// }
785    /// # Ok::<(), anyhow::Error>(())
786    /// ```
787    pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
788        self.execute_async(async {
789            let prefix = ObjectPath::from(format!("{directory}/"));
790            let mut stream = self.object_store.list(Some(&prefix));
791            let mut files = Vec::new();
792
793            while let Some(object) = stream.next().await {
794                let object = object?;
795                if object.location.as_ref().ends_with(".parquet") {
796                    files.push(object.location.to_string());
797                }
798            }
799            Ok::<Vec<String>, anyhow::Error>(files)
800        })
801    }
802
803    /// Helper method to reconstruct full URI for remote object store paths
804    #[must_use]
805    pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
806        // Check if this is a remote URI scheme that needs reconstruction
807        if self.is_remote_uri() {
808            // Extract the base URL (scheme + host) from the original URI
809            if let Ok(url) = url::Url::parse(&self.original_uri)
810                && let Some(host) = url.host_str()
811            {
812                return format!("{}://{}/{}", url.scheme(), host, path_str);
813            }
814        }
815
816        // For local paths, extract the directory from the original URI
817        if self.original_uri.starts_with("file://") {
818            // Extract the path from the file:// URI
819            if let Ok(url) = url::Url::parse(&self.original_uri)
820                && let Ok(base_path) = url.to_file_path()
821            {
822                // Use platform-appropriate path separator for display
823                // but object store paths always use forward slashes
824                let base_str = base_path.to_string_lossy();
825                return self.join_paths(&base_str, path_str);
826            }
827        }
828
829        // For local paths without file:// prefix, use the original URI as base
830        if self.base_path.is_empty() {
831            // If base_path is empty and not a file URI, try using original_uri as base
832            if self.original_uri.contains("://") {
833                // Fallback: return the path as-is
834                path_str.to_string()
835            } else {
836                self.join_paths(self.original_uri.trim_end_matches('/'), path_str)
837            }
838        } else {
839            let base = self.base_path.trim_end_matches('/');
840            self.join_paths(base, path_str)
841        }
842    }
843
844    /// Helper method to join paths using forward slashes (object store convention)
845    #[must_use]
846    fn join_paths(&self, base: &str, path: &str) -> String {
847        make_object_store_path(base, &[path])
848    }
849
850    /// Helper method to check if the original URI uses a remote object store scheme
851    #[must_use]
852    pub fn is_remote_uri(&self) -> bool {
853        self.original_uri.starts_with("s3://")
854            || self.original_uri.starts_with("gs://")
855            || self.original_uri.starts_with("gcs://")
856            || self.original_uri.starts_with("azure://")
857            || self.original_uri.starts_with("abfs://")
858            || self.original_uri.starts_with("http://")
859            || self.original_uri.starts_with("https://")
860    }
861
862    /// Executes a query against the catalog to retrieve market data of a specific type.
863    ///
864    /// This is the primary method for querying data from the catalog. It registers the appropriate
865    /// object store with the DataFusion session, finds all relevant Parquet files, and executes
866    /// the query across them. The method supports filtering by instrument IDs, time ranges, and
867    /// custom SQL WHERE clauses.
868    ///
869    /// # Type Parameters
870    ///
871    /// - `T`: The data type to query, must implement required traits for deserialization and cataloging.
872    ///
873    /// # Parameters
874    ///
875    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
876    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
877    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
878    /// - `where_clause`: Optional SQL WHERE clause for additional filtering (e.g., "price > 100").
879    ///
880    /// # Returns
881    ///
882    /// Returns a [`QueryResult`] containing the query execution context and data.
883    /// Use [`QueryResult::collect()`] to retrieve the actual data records.
884    ///
885    /// # Errors
886    ///
887    /// This function will return an error if:
888    /// - Object store registration fails for remote URIs.
889    /// - File discovery fails.
890    /// - DataFusion query execution fails.
891    /// - Data deserialization fails.
892    ///
893    /// # Performance Notes
894    ///
895    /// - Files are automatically filtered by timestamp ranges before querying.
896    /// - DataFusion optimizes queries across multiple Parquet files.
897    /// - Use specific instrument IDs and time ranges to improve performance.
898    /// - WHERE clauses are pushed down to the Parquet reader when possible.
899    ///
900    /// # Examples
901    ///
902    /// ```rust,no_run
903    /// use nautilus_model::data::QuoteTick;
904    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
905    /// use nautilus_core::UnixNanos;
906    ///
907    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
908    ///
909    /// // Query all quote data
910    /// let result = catalog.query::<QuoteTick>(None, None, None, None)?;
911    /// let quotes = result.collect();
912    ///
913    /// // Query specific instruments within a time range
914    /// let result = catalog.query::<QuoteTick>(
915    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
916    ///     Some(UnixNanos::from(1609459200000000000)),
917    ///     Some(UnixNanos::from(1609545600000000000)),
918    ///     None
919    /// )?;
920    ///
921    /// // Query with custom WHERE clause
922    /// let result = catalog.query::<QuoteTick>(
923    ///     Some(vec!["EURUSD".to_string()]),
924    ///     None,
925    ///     None,
926    ///     Some("bid_price > 1.2000")
927    /// )?;
928    /// # Ok::<(), anyhow::Error>(())
929    /// ```
930    pub fn query<T>(
931        &mut self,
932        instrument_ids: Option<Vec<String>>,
933        start: Option<UnixNanos>,
934        end: Option<UnixNanos>,
935        where_clause: Option<&str>,
936        files: Option<Vec<String>>,
937    ) -> anyhow::Result<QueryResult>
938    where
939        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
940    {
941        // Register the object store with the session for remote URIs
942        if self.is_remote_uri() {
943            let url = url::Url::parse(&self.original_uri)?;
944            let host = url
945                .host_str()
946                .ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
947            let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
948            self.session
949                .register_object_store(&base_url, self.object_store.clone());
950        }
951
952        let files_list = if let Some(files) = files {
953            files
954        } else {
955            self.query_files(T::path_prefix(), instrument_ids, start, end)?
956        };
957
958        for file_uri in &files_list {
959            // Extract identifier from file path and filename to create meaningful table names
960            let identifier = extract_identifier_from_path(file_uri);
961            let safe_sql_identifier = make_sql_safe_identifier(&identifier);
962            let safe_filename = extract_sql_safe_filename(file_uri);
963
964            // Create table name from path_prefix, identifier, and filename
965            let table_name = format!(
966                "{}_{}_{}",
967                T::path_prefix(),
968                safe_sql_identifier,
969                safe_filename
970            );
971            let query = build_query(&table_name, start, end, where_clause);
972
973            // Convert object store path to filesystem path for DataFusion
974            // Only apply reconstruction if the path is not already absolute
975            let resolved_path = if file_uri.starts_with('/') {
976                // Path is already absolute, use as-is
977                file_uri.clone()
978            } else {
979                // Path is relative, reconstruct full URI
980                self.reconstruct_full_uri(file_uri)
981            };
982            self.session
983                .add_file::<T>(&table_name, &resolved_path, Some(&query))?;
984        }
985
986        Ok(self.session.get_query_result())
987    }
988
989    /// Queries typed data from the catalog and returns results as a strongly-typed vector.
990    ///
991    /// This is a convenience method that wraps the generic `query` method and automatically
992    /// collects and converts the results into a vector of the specific data type. It handles
993    /// the type conversion from the generic [`Data`] enum to the concrete type `T`.
994    ///
995    /// # Type Parameters
996    ///
997    /// - `T`: The specific data type to query and return. Must implement required traits for
998    ///   deserialization, cataloging, and conversion from the [`Data`] enum.
999    ///
1000    /// # Parameters
1001    ///
1002    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
1003    ///   For exact matches, provide the full instrument ID. For bars, partial matches are supported.
1004    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1005    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1006    /// - `where_clause`: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax
1007    ///   with column names matching the Parquet schema (e.g., "`bid_price` > 1.2000", "volume > 1000").
1008    ///
1009    /// # Returns
1010    ///
1011    /// Returns a vector of the specific data type `T`, sorted by timestamp. The vector will be
1012    /// empty if no data matches the query criteria.
1013    ///
1014    /// # Errors
1015    ///
1016    /// This function will return an error if:
1017    /// - The underlying query execution fails.
1018    /// - Data type conversion fails.
1019    /// - Object store access fails.
1020    /// - Invalid WHERE clause syntax is provided.
1021    ///
1022    /// # Performance Considerations
1023    ///
1024    /// - Use specific instrument IDs and time ranges to minimize data scanning.
1025    /// - WHERE clauses are pushed down to Parquet readers when possible.
1026    /// - Results are automatically sorted by timestamp during collection.
1027    /// - Memory usage scales with the amount of data returned.
1028    ///
1029    /// # Examples
1030    ///
1031    /// ```rust,no_run
1032    /// use nautilus_model::data::{QuoteTick, TradeTick, Bar};
1033    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1034    /// use nautilus_core::UnixNanos;
1035    ///
1036    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1037    ///
1038    /// // Query all quotes for a specific instrument
1039    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1040    ///     Some(vec!["EURUSD".to_string()]),
1041    ///     None,
1042    ///     None,
1043    ///     None
1044    /// )?;
1045    ///
1046    /// // Query trades within a specific time range
1047    /// let trades: Vec<TradeTick> = catalog.query_typed_data(
1048    ///     Some(vec!["BTCUSD".to_string()]),
1049    ///     Some(UnixNanos::from(1609459200000000000)),
1050    ///     Some(UnixNanos::from(1609545600000000000)),
1051    ///     None
1052    /// )?;
1053    ///
1054    /// // Query bars with volume filter
1055    /// let bars: Vec<Bar> = catalog.query_typed_data(
1056    ///     Some(vec!["AAPL".to_string()]),
1057    ///     None,
1058    ///     None,
1059    ///     Some("volume > 1000000")
1060    /// )?;
1061    ///
1062    /// // Query multiple instruments with price filter
1063    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1064    ///     Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
1065    ///     None,
1066    ///     None,
1067    ///     Some("bid_price > 1.2000 AND ask_price < 1.3000")
1068    /// )?;
1069    /// # Ok::<(), anyhow::Error>(())
1070    /// ```
1071    pub fn query_typed_data<T>(
1072        &mut self,
1073        instrument_ids: Option<Vec<String>>,
1074        start: Option<UnixNanos>,
1075        end: Option<UnixNanos>,
1076        where_clause: Option<&str>,
1077        files: Option<Vec<String>>,
1078    ) -> anyhow::Result<Vec<T>>
1079    where
1080        T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
1081    {
1082        let query_result = self.query::<T>(instrument_ids, start, end, where_clause, files)?;
1083        let all_data = query_result.collect();
1084
1085        // Convert Data enum variants to specific type T using to_variant
1086        Ok(to_variant::<T>(all_data))
1087    }
1088
1089    /// Queries all Parquet files for a specific data type and optional instrument IDs.
1090    ///
1091    /// This method finds all Parquet files that match the specified criteria and returns
1092    /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1093    /// and timestamp range (if provided).
1094    ///
1095    /// # Parameters
1096    ///
1097    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1098    /// - `instrument_ids`: Optional list of instrument IDs to filter by.
1099    /// - `start`: Optional start timestamp to filter files by their time range.
1100    /// - `end`: Optional end timestamp to filter files by their time range.
1101    ///
1102    /// # Returns
1103    ///
1104    /// Returns a vector of file URI strings that match the query criteria,
1105    /// or an error if the query fails.
1106    ///
1107    /// # Errors
1108    ///
1109    /// This function will return an error if:
1110    /// - The directory path cannot be constructed.
1111    /// - Object store listing operations fail.
1112    /// - URI reconstruction fails.
1113    ///
1114    /// # Examples
1115    ///
1116    /// ```rust,no_run
1117    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1118    /// use nautilus_core::UnixNanos;
1119    ///
1120    /// let catalog = ParquetDataCatalog::new(/* ... */);
1121    ///
1122    /// // Query all quote files
1123    /// let files = catalog.query_files("quotes", None, None, None)?;
1124    ///
1125    /// // Query trade files for specific instruments within a time range
1126    /// let files = catalog.query_files(
1127    ///     "trades",
1128    ///     Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
1129    ///     Some(UnixNanos::from(1609459200000000000)),
1130    ///     Some(UnixNanos::from(1609545600000000000))
1131    /// )?;
1132    /// # Ok::<(), anyhow::Error>(())
1133    /// ```
1134    pub fn query_files(
1135        &self,
1136        data_cls: &str,
1137        instrument_ids: Option<Vec<String>>,
1138        start: Option<UnixNanos>,
1139        end: Option<UnixNanos>,
1140    ) -> anyhow::Result<Vec<String>> {
1141        let mut files = Vec::new();
1142
1143        let start_u64 = start.map(|s| s.as_u64());
1144        let end_u64 = end.map(|e| e.as_u64());
1145
1146        let base_dir = self.make_path(data_cls, None)?;
1147
1148        // Use recursive listing to match Python's glob behavior
1149        let list_result = self.execute_async(async {
1150            let prefix = ObjectPath::from(format!("{base_dir}/"));
1151            let mut stream = self.object_store.list(Some(&prefix));
1152            let mut objects = Vec::new();
1153            while let Some(object) = stream.next().await {
1154                objects.push(object?);
1155            }
1156            Ok::<Vec<_>, anyhow::Error>(objects)
1157        })?;
1158
1159        let mut file_paths: Vec<String> = list_result
1160            .into_iter()
1161            .filter_map(|object| {
1162                let path_str = object.location.to_string();
1163                if path_str.ends_with(".parquet") {
1164                    Some(path_str)
1165                } else {
1166                    None
1167                }
1168            })
1169            .collect();
1170
1171        // Apply identifier filtering if provided
1172        if let Some(identifiers) = instrument_ids {
1173            let safe_identifiers: Vec<String> = identifiers
1174                .iter()
1175                .map(|id| urisafe_instrument_id(id))
1176                .collect();
1177
1178            // Exact match by default for instrument_ids or bar_types
1179            let exact_match_file_paths: Vec<String> = file_paths
1180                .iter()
1181                .filter(|file_path| {
1182                    // Extract the directory name (second to last path component)
1183                    let path_parts: Vec<&str> = file_path.split('/').collect();
1184                    if path_parts.len() >= 2 {
1185                        let dir_name = path_parts[path_parts.len() - 2];
1186                        safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
1187                    } else {
1188                        false
1189                    }
1190                })
1191                .cloned()
1192                .collect();
1193
1194            if exact_match_file_paths.is_empty() && data_cls == "bars" {
1195                // Partial match of instrument_ids in bar_types for bars
1196                file_paths.retain(|file_path| {
1197                    let path_parts: Vec<&str> = file_path.split('/').collect();
1198                    if path_parts.len() >= 2 {
1199                        let dir_name = path_parts[path_parts.len() - 2];
1200                        safe_identifiers
1201                            .iter()
1202                            .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
1203                    } else {
1204                        false
1205                    }
1206                });
1207            } else {
1208                file_paths = exact_match_file_paths;
1209            }
1210        }
1211
1212        // Apply timestamp filtering
1213        file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
1214
1215        // Convert to full URIs
1216        for file_path in file_paths {
1217            let full_uri = self.reconstruct_full_uri(&file_path);
1218            files.push(full_uri);
1219        }
1220
1221        Ok(files)
1222    }
1223
1224    /// Finds the missing time intervals for a specific data type and instrument ID.
1225    ///
1226    /// This method compares a requested time range against the existing data coverage
1227    /// and returns the gaps that need to be filled. This is useful for determining
1228    /// what data needs to be fetched or backfilled.
1229    ///
1230    /// # Parameters
1231    ///
1232    /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
1233    /// - `end`: End timestamp of the requested range (Unix nanoseconds).
1234    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1235    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1236    ///
1237    /// # Returns
1238    ///
1239    /// Returns a vector of (start, end) tuples representing the missing intervals,
1240    /// or an error if the operation fails.
1241    ///
1242    /// # Errors
1243    ///
1244    /// This function will return an error if:
1245    /// - The directory path cannot be constructed.
1246    /// - Interval retrieval fails.
1247    /// - Gap calculation fails.
1248    ///
1249    /// # Examples
1250    ///
1251    /// ```rust,no_run
1252    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1253    ///
1254    /// let catalog = ParquetDataCatalog::new(/* ... */);
1255    ///
1256    /// // Find missing intervals for quote data
1257    /// let missing = catalog.get_missing_intervals_for_request(
1258    ///     1609459200000000000,  // start
1259    ///     1609545600000000000,  // end
1260    ///     "quotes",
1261    ///     Some("BTCUSD".to_string())
1262    /// )?;
1263    ///
1264    /// for (start, end) in missing {
1265    ///     println!("Missing data from {} to {}", start, end);
1266    /// }
1267    /// # Ok::<(), anyhow::Error>(())
1268    /// ```
1269    pub fn get_missing_intervals_for_request(
1270        &self,
1271        start: u64,
1272        end: u64,
1273        data_cls: &str,
1274        instrument_id: Option<String>,
1275    ) -> anyhow::Result<Vec<(u64, u64)>> {
1276        let intervals = self.get_intervals(data_cls, instrument_id)?;
1277
1278        Ok(query_interval_diff(start, end, &intervals))
1279    }
1280
1281    /// Gets the last (most recent) timestamp for a specific data type and instrument ID.
1282    ///
1283    /// This method finds the latest timestamp covered by existing data files for
1284    /// the specified data type and instrument. This is useful for determining
1285    /// the most recent data available or for incremental data updates.
1286    ///
1287    /// # Parameters
1288    ///
1289    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1290    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1291    ///
1292    /// # Returns
1293    ///
1294    /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
1295    /// or an error if the operation fails.
1296    ///
1297    /// # Errors
1298    ///
1299    /// This function will return an error if:
1300    /// - The directory path cannot be constructed.
1301    /// - Interval retrieval fails.
1302    ///
1303    /// # Examples
1304    ///
1305    /// ```rust,no_run
1306    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1307    ///
1308    /// let catalog = ParquetDataCatalog::new(/* ... */);
1309    ///
1310    /// // Get the last timestamp for quote data
1311    /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
1312    ///     println!("Last quote timestamp: {}", last_ts);
1313    /// } else {
1314    ///     println!("No quote data found");
1315    /// }
1316    /// # Ok::<(), anyhow::Error>(())
1317    /// ```
1318    pub fn query_last_timestamp(
1319        &self,
1320        data_cls: &str,
1321        instrument_id: Option<String>,
1322    ) -> anyhow::Result<Option<u64>> {
1323        let intervals = self.get_intervals(data_cls, instrument_id)?;
1324
1325        if intervals.is_empty() {
1326            return Ok(None);
1327        }
1328
1329        Ok(Some(intervals.last().unwrap().1))
1330    }
1331
1332    /// Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
1333    ///
1334    /// This method returns all time intervals covered by existing data files for the
1335    /// specified data type and instrument. The intervals are sorted by start time and
1336    /// represent the complete data coverage available.
1337    ///
1338    /// # Parameters
1339    ///
1340    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1341    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1342    ///
1343    /// # Returns
1344    ///
1345    /// Returns a vector of (start, end) tuples representing the covered intervals,
1346    /// sorted by start time, or an error if the operation fails.
1347    ///
1348    /// # Errors
1349    ///
1350    /// This function will return an error if:
1351    /// - The directory path cannot be constructed.
1352    /// - Directory listing fails.
1353    /// - Filename parsing fails.
1354    ///
1355    /// # Examples
1356    ///
1357    /// ```rust,no_run
1358    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1359    ///
1360    /// let catalog = ParquetDataCatalog::new(/* ... */);
1361    ///
1362    /// // Get all intervals for quote data
1363    /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
1364    /// for (start, end) in intervals {
1365    ///     println!("Data available from {} to {}", start, end);
1366    /// }
1367    /// # Ok::<(), anyhow::Error>(())
1368    /// ```
1369    pub fn get_intervals(
1370        &self,
1371        data_cls: &str,
1372        instrument_id: Option<String>,
1373    ) -> anyhow::Result<Vec<(u64, u64)>> {
1374        let directory = self.make_path(data_cls, instrument_id)?;
1375
1376        self.get_directory_intervals(&directory)
1377    }
1378
1379    /// Gets the time intervals covered by Parquet files in a specific directory.
1380    ///
1381    /// This method scans a directory for Parquet files and extracts the timestamp ranges
1382    /// from their filenames. It's used internally by other methods to determine data coverage
1383    /// and is essential for interval-based operations like gap detection and consolidation.
1384    ///
1385    /// # Parameters
1386    ///
1387    /// - `directory`: The directory path to scan for Parquet files.
1388    ///
1389    /// # Returns
1390    ///
1391    /// Returns a vector of (start, end) tuples representing the time intervals covered
1392    /// by files in the directory, sorted by start timestamp. Returns an empty vector
1393    /// if the directory doesn't exist or contains no valid Parquet files.
1394    ///
1395    /// # Errors
1396    ///
1397    /// This function will return an error if:
1398    /// - Object store listing operations fail.
1399    /// - Directory access is denied.
1400    ///
1401    /// # Notes
1402    ///
1403    /// - Only files with valid timestamp-based filenames are included.
1404    /// - Files with unparseable names are silently ignored.
1405    /// - The method works with both local and remote object stores.
1406    /// - Results are automatically sorted by start timestamp.
1407    ///
1408    /// # Examples
1409    ///
1410    /// ```rust,no_run
1411    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1412    ///
1413    /// let catalog = ParquetDataCatalog::new(/* ... */);
1414    /// let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
1415    ///
1416    /// for (start, end) in intervals {
1417    ///     println!("File covers {} to {}", start, end);
1418    /// }
1419    /// # Ok::<(), anyhow::Error>(())
1420    /// ```
1421    pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
1422        let mut intervals = Vec::new();
1423
1424        // Use object store for all operations
1425        let list_result = self.execute_async(async {
1426            let path = object_store::path::Path::from(directory);
1427            Ok(self
1428                .object_store
1429                .list(Some(&path))
1430                .collect::<Vec<_>>()
1431                .await)
1432        })?;
1433
1434        for result in list_result {
1435            match result {
1436                Ok(object) => {
1437                    let path_str = object.location.to_string();
1438                    if path_str.ends_with(".parquet")
1439                        && let Some(interval) = parse_filename_timestamps(&path_str)
1440                    {
1441                        intervals.push(interval);
1442                    }
1443                }
1444                Err(_) => {
1445                    // Directory doesn't exist or is empty, which is fine
1446                    break;
1447                }
1448            }
1449        }
1450
1451        intervals.sort_by_key(|&(start, _)| start);
1452
1453        Ok(intervals)
1454    }
1455
1456    /// Constructs a directory path for storing data of a specific type and instrument.
1457    ///
1458    /// This method builds the hierarchical directory structure used by the catalog to organize
1459    /// data by type and instrument. The path follows the pattern: `{base_path}/data/{type_name}/{instrument_id}`.
1460    /// Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
1461    ///
1462    /// # Parameters
1463    ///
1464    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1465    /// - `instrument_id`: Optional instrument ID. If provided, creates a subdirectory for the instrument.
1466    ///   If `None`, returns the path to the data type directory.
1467    ///
1468    /// # Returns
1469    ///
1470    /// Returns the constructed directory path as a string, or an error if path construction fails.
1471    ///
1472    /// # Errors
1473    ///
1474    /// This function will return an error if:
1475    /// - The instrument ID contains invalid characters that cannot be made URI-safe.
1476    /// - Path construction fails due to system limitations.
1477    ///
1478    /// # Path Structure
1479    ///
1480    /// - Without instrument ID: `{base_path}/data/{type_name}`.
1481    /// - With instrument ID: `{base_path}/data/{type_name}/{safe_instrument_id}`.
1482    /// - If `base_path` is empty: `data/{type_name}[/{safe_instrument_id}]`.
1483    ///
1484    /// # Examples
1485    ///
1486    /// ```rust,no_run
1487    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1488    ///
1489    /// let catalog = ParquetDataCatalog::new(/* ... */);
1490    ///
1491    /// // Path for all quote data
1492    /// let quotes_path = catalog.make_path("quotes", None)?;
1493    /// // Returns: "/base/path/data/quotes"
1494    ///
1495    /// // Path for specific instrument quotes
1496    /// let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
1497    /// // Returns: "/base/path/data/quotes/EURUSD" (slash removed)
1498    ///
1499    /// // Path for bar data with complex instrument ID
1500    /// let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
1501    /// // Returns: "/base/path/data/bars/BTCUSD-1H"
1502    /// # Ok::<(), anyhow::Error>(())
1503    /// ```
1504    pub fn make_path(
1505        &self,
1506        type_name: &str,
1507        instrument_id: Option<String>,
1508    ) -> anyhow::Result<String> {
1509        let mut components = vec!["data".to_string(), type_name.to_string()];
1510
1511        if let Some(id) = instrument_id {
1512            let safe_id = urisafe_instrument_id(&id);
1513            components.push(safe_id);
1514        }
1515
1516        let path = make_object_store_path_owned(&self.base_path, components);
1517        Ok(path)
1518    }
1519
1520    /// Helper method to rename a parquet file by moving it via object store operations
1521    fn rename_parquet_file(
1522        &self,
1523        directory: &str,
1524        old_start: u64,
1525        old_end: u64,
1526        new_start: u64,
1527        new_end: u64,
1528    ) -> anyhow::Result<()> {
1529        let old_filename =
1530            timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
1531        let old_path = format!("{directory}/{old_filename}");
1532        let old_object_path = self.to_object_path(&old_path);
1533
1534        let new_filename =
1535            timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
1536        let new_path = format!("{directory}/{new_filename}");
1537        let new_object_path = self.to_object_path(&new_path);
1538
1539        self.move_file(&old_object_path, &new_object_path)
1540    }
1541
1542    /// Converts a catalog path string to an [`ObjectPath`] for object store operations.
1543    ///
1544    /// This method handles the conversion between catalog-relative paths and object store paths,
1545    /// taking into account the catalog's base path configuration. It automatically strips the
1546    /// base path prefix when present to create the correct object store path.
1547    ///
1548    /// # Parameters
1549    ///
1550    /// - `path`: The catalog path string to convert. Can be absolute or relative.
1551    ///
1552    /// # Returns
1553    ///
1554    /// Returns an [`ObjectPath`] suitable for use with object store operations.
1555    ///
1556    /// # Path Handling
1557    ///
1558    /// - If `base_path` is empty, the path is used as-is.
1559    /// - If `base_path` is set, it's stripped from the path if present.
1560    /// - Trailing slashes and backslashes are automatically handled.
1561    /// - The resulting path is relative to the object store root.
1562    /// - All paths are normalized to use forward slashes (object store convention).
1563    ///
1564    /// # Examples
1565    ///
1566    /// ```rust,no_run
1567    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1568    ///
1569    /// let catalog = ParquetDataCatalog::new(/* ... */);
1570    ///
1571    /// // Convert a full catalog path
1572    /// let object_path = catalog.to_object_path("/base/data/quotes/file.parquet");
1573    /// // Returns: ObjectPath("data/quotes/file.parquet") if base_path is "/base"
1574    ///
1575    /// // Convert a relative path
1576    /// let object_path = catalog.to_object_path("data/trades/file.parquet");
1577    /// // Returns: ObjectPath("data/trades/file.parquet")
1578    /// ```
1579    #[must_use]
1580    pub fn to_object_path(&self, path: &str) -> ObjectPath {
1581        // Normalize path separators to forward slashes for object store
1582        let normalized_path = path.replace('\\', "/");
1583
1584        if self.base_path.is_empty() {
1585            return ObjectPath::from(normalized_path);
1586        }
1587
1588        // Normalize base path separators as well
1589        let normalized_base = self.base_path.replace('\\', "/");
1590        let base = normalized_base.trim_end_matches('/');
1591
1592        // Remove the catalog base prefix if present
1593        let without_base = normalized_path
1594            .strip_prefix(&format!("{base}/"))
1595            .or_else(|| normalized_path.strip_prefix(base))
1596            .unwrap_or(&normalized_path);
1597
1598        ObjectPath::from(without_base)
1599    }
1600
1601    /// Helper method to move a file using object store rename operation
1602    pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
1603        self.execute_async(async {
1604            self.object_store
1605                .rename(old_path, new_path)
1606                .await
1607                .map_err(anyhow::Error::from)
1608        })
1609    }
1610
1611    /// Helper method to execute async operations with a runtime
1612    pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
1613    where
1614        F: std::future::Future<Output = anyhow::Result<R>>,
1615    {
1616        let rt = nautilus_common::runtime::get_runtime();
1617        rt.block_on(future)
1618    }
1619}
1620
1621/// Trait for providing catalog path prefixes for different data types.
1622///
1623/// This trait enables type-safe organization of data within the catalog by providing
1624/// a standardized way to determine the directory structure for each data type.
1625/// Each data type maps to a specific subdirectory within the catalog's data folder.
1626///
1627/// # Implementation
1628///
1629/// Types implementing this trait should return a static string that represents
1630/// the directory name where data of that type should be stored.
1631///
1632/// # Examples
1633///
1634/// ```rust
1635/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
1636/// use nautilus_model::data::QuoteTick;
1637///
1638/// assert_eq!(QuoteTick::path_prefix(), "quotes");
1639/// ```
1640pub trait CatalogPathPrefix {
1641    /// Returns the path prefix (directory name) for this data type.
1642    ///
1643    /// # Returns
1644    ///
1645    /// A static string representing the directory name where this data type is stored.
1646    fn path_prefix() -> &'static str;
1647}
1648
1649/// Macro for implementing [`CatalogPathPrefix`] for data types.
1650///
1651/// This macro provides a convenient way to implement the trait for multiple types
1652/// with their corresponding path prefixes.
1653///
1654/// # Parameters
1655///
1656/// - `$type`: The data type to implement the trait for.
1657/// - `$path`: The path prefix string for that type.
1658macro_rules! impl_catalog_path_prefix {
1659    ($type:ty, $path:expr) => {
1660        impl CatalogPathPrefix for $type {
1661            fn path_prefix() -> &'static str {
1662                $path
1663            }
1664        }
1665    };
1666}
1667
1668// Standard implementations for financial data types
1669impl_catalog_path_prefix!(QuoteTick, "quotes");
1670impl_catalog_path_prefix!(TradeTick, "trades");
1671impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
1672impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
1673impl_catalog_path_prefix!(Bar, "bars");
1674impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
1675impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
1676impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
1677
1678/// Converts timestamps to a filename using ISO 8601 format.
1679///
1680/// This function converts two Unix nanosecond timestamps to a filename that uses
1681/// ISO 8601 format with filesystem-safe characters. The format matches the Python
1682/// implementation for consistency.
1683///
1684/// # Parameters
1685///
1686/// - `timestamp_1`: First timestamp in Unix nanoseconds.
1687/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
1688///
1689/// # Returns
1690///
1691/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
1692///
1693/// # Examples
1694///
1695/// ```rust
1696/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
1697/// # use nautilus_core::UnixNanos;
1698/// let filename = timestamps_to_filename(
1699///     UnixNanos::from(1609459200000000000),
1700///     UnixNanos::from(1609545600000000000)
1701/// );
1702/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
1703/// ```
1704#[must_use]
1705pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
1706    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
1707    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
1708
1709    format!("{datetime_1}_{datetime_2}.parquet")
1710}
1711
1712/// Converts an ISO 8601 timestamp to a filesystem-safe format.
1713///
1714/// This function replaces colons and dots with hyphens to make the timestamp
1715/// safe for use in filenames across different filesystems.
1716///
1717/// # Parameters
1718///
1719/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1720///
1721/// # Returns
1722///
1723/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1724///
1725/// # Examples
1726///
1727/// ```rust
1728/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
1729/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
1730/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
1731/// ```
1732fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
1733    iso_timestamp.replace([':', '.'], "-")
1734}
1735
1736/// Converts a filesystem-safe timestamp back to ISO 8601 format.
1737///
1738/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
1739/// converting filesystem-safe timestamps back to standard ISO 8601 format.
1740///
1741/// # Parameters
1742///
1743/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1744///
1745/// # Returns
1746///
1747/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1748///
1749/// # Examples
1750///
1751/// ```rust
1752/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
1753/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
1754/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
1755/// ```
1756fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
1757    let (date_part, time_part) = file_timestamp
1758        .split_once('T')
1759        .unwrap_or((file_timestamp, ""));
1760    let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
1761
1762    // Find the last hyphen to separate nanoseconds
1763    if let Some(last_hyphen_idx) = time_part.rfind('-') {
1764        let time_with_dot_for_nanos = format!(
1765            "{}.{}",
1766            &time_part[..last_hyphen_idx],
1767            &time_part[last_hyphen_idx + 1..]
1768        );
1769        let final_time_part = time_with_dot_for_nanos.replace('-', ":");
1770        format!("{date_part}T{final_time_part}Z")
1771    } else {
1772        // Fallback if no nanoseconds part found
1773        let final_time_part = time_part.replace('-', ":");
1774        format!("{date_part}T{final_time_part}Z")
1775    }
1776}
1777
1778/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
1779///
1780/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
1781/// It's used to convert parsed timestamps back to the internal representation.
1782///
1783/// # Parameters
1784///
1785/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1786///
1787/// # Returns
1788///
1789/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
1790///
1791/// # Examples
1792///
1793/// ```rust
1794/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
1795/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
1796/// assert_eq!(nanos, 1609459200000000000);
1797/// ```
1798fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
1799    Ok(iso8601_to_unix_nanos(iso_timestamp.to_string())?.into())
1800}
1801
1802/// Converts an instrument ID to a URI-safe format by removing forward slashes.
1803///
1804/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
1805/// suitable for use in file paths. This function removes these characters to
1806/// create a safe directory name.
1807///
1808/// # Parameters
1809///
1810/// - `instrument_id`: The original instrument ID string.
1811///
1812/// # Returns
1813///
1814/// A URI-safe version of the instrument ID with forward slashes removed.
1815///
1816/// # Examples
1817///
1818/// ```rust
1819/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
1820/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
1821/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
1822/// ```
1823fn urisafe_instrument_id(instrument_id: &str) -> String {
1824    instrument_id.replace('/', "")
1825}
1826
1827/// Extracts the identifier from a file path.
1828///
1829/// The identifier is typically the second-to-last path component (directory name).
1830/// For example, from "`data/quote_tick/EURUSD/file.parquet`", extracts "EURUSD".
1831#[must_use]
1832pub fn extract_identifier_from_path(file_path: &str) -> String {
1833    let path_parts: Vec<&str> = file_path.split('/').collect();
1834    if path_parts.len() >= 2 {
1835        path_parts[path_parts.len() - 2].to_string()
1836    } else {
1837        "unknown".to_string()
1838    }
1839}
1840
1841/// Makes an identifier safe for use in SQL table names.
1842///
1843/// Removes forward slashes, replaces dots, hyphens, and spaces with underscores, and converts to lowercase.
1844#[must_use]
1845pub fn make_sql_safe_identifier(identifier: &str) -> String {
1846    urisafe_instrument_id(identifier)
1847        .replace(['.', '-', ' '], "_")
1848        .to_lowercase()
1849}
1850
1851/// Extracts the filename from a file path and makes it SQL-safe.
1852///
1853/// For example, from "data/quote_tick/EURUSD/2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
1854/// extracts "`2021_01_01t00_00_00_000000000z_2021_01_02t00_00_00_000000000z`".
1855#[must_use]
1856pub fn extract_sql_safe_filename(file_path: &str) -> String {
1857    if file_path.is_empty() {
1858        return "unknown_file".to_string();
1859    }
1860
1861    let filename = file_path.split('/').next_back().unwrap_or("unknown_file");
1862
1863    // Remove .parquet extension
1864    let name_without_ext = if let Some(dot_pos) = filename.rfind(".parquet") {
1865        &filename[..dot_pos]
1866    } else {
1867        filename
1868    };
1869
1870    // Remove characters that can pose problems: hyphens, colons, etc.
1871    name_without_ext
1872        .replace(['-', ':', '.'], "_")
1873        .to_lowercase()
1874}
1875
1876/// Creates a platform-appropriate local path using `PathBuf`.
1877///
1878/// This function constructs file system paths using the platform's native path separators.
1879/// Use this for local file operations that need to work with the actual file system.
1880///
1881/// # Arguments
1882///
1883/// * `base_path` - The base directory path
1884/// * `components` - Path components to join
1885///
1886/// # Returns
1887///
1888/// A `PathBuf` with platform-appropriate separators
1889///
1890/// # Examples
1891///
1892/// ```rust
1893/// # use nautilus_persistence::backend::catalog::make_local_path;
1894/// let path = make_local_path("/base", &["data", "quotes", "EURUSD"]);
1895/// // On Unix: "/base/data/quotes/EURUSD"
1896/// // On Windows: "\base\data\quotes\EURUSD"
1897/// ```
1898pub fn make_local_path<P: AsRef<Path>>(base_path: P, components: &[&str]) -> PathBuf {
1899    let mut path = PathBuf::from(base_path.as_ref());
1900    for component in components {
1901        path.push(component);
1902    }
1903    path
1904}
1905
1906/// Creates an object store path using forward slashes.
1907///
1908/// Object stores (S3, GCS, etc.) always expect forward slashes regardless of platform.
1909/// Use this when creating paths for object store operations.
1910///
1911/// # Arguments
1912///
1913/// * `base_path` - The base path (can be empty)
1914/// * `components` - Path components to join
1915///
1916/// # Returns
1917///
1918/// A string path with forward slash separators
1919///
1920/// # Examples
1921///
1922/// ```rust
1923/// # use nautilus_persistence::backend::catalog::make_object_store_path;
1924/// let path = make_object_store_path("base", &["data", "quotes", "EURUSD"]);
1925/// assert_eq!(path, "base/data/quotes/EURUSD");
1926/// ```
1927#[must_use]
1928pub fn make_object_store_path(base_path: &str, components: &[&str]) -> String {
1929    let mut parts = Vec::new();
1930
1931    if !base_path.is_empty() {
1932        let normalized_base = base_path
1933            .replace('\\', "/")
1934            .trim_end_matches('/')
1935            .to_string();
1936        if !normalized_base.is_empty() {
1937            parts.push(normalized_base);
1938        }
1939    }
1940
1941    for component in components {
1942        let normalized_component = component
1943            .replace('\\', "/")
1944            .trim_start_matches('/')
1945            .trim_end_matches('/')
1946            .to_string();
1947        if !normalized_component.is_empty() {
1948            parts.push(normalized_component);
1949        }
1950    }
1951
1952    parts.join("/")
1953}
1954
1955/// Creates an object store path using forward slashes with owned strings.
1956///
1957/// This variant accepts owned strings to avoid lifetime issues.
1958///
1959/// # Arguments
1960///
1961/// * `base_path` - The base path (can be empty)
1962/// * `components` - Path components to join (owned strings)
1963///
1964/// # Returns
1965///
1966/// A string path with forward slash separators
1967#[must_use]
1968pub fn make_object_store_path_owned(base_path: &str, components: Vec<String>) -> String {
1969    let mut parts = Vec::new();
1970
1971    if !base_path.is_empty() {
1972        let normalized_base = base_path
1973            .replace('\\', "/")
1974            .trim_end_matches('/')
1975            .to_string();
1976        if !normalized_base.is_empty() {
1977            parts.push(normalized_base);
1978        }
1979    }
1980
1981    for component in components {
1982        let normalized_component = component
1983            .replace('\\', "/")
1984            .trim_start_matches('/')
1985            .trim_end_matches('/')
1986            .to_string();
1987        if !normalized_component.is_empty() {
1988            parts.push(normalized_component);
1989        }
1990    }
1991
1992    parts.join("/")
1993}
1994
1995/// Converts a local `PathBuf` to an object store path string.
1996///
1997/// This function normalizes a local file system path to the forward-slash format
1998/// expected by object stores, handling platform differences.
1999///
2000/// # Arguments
2001///
2002/// * `local_path` - The local `PathBuf` to convert
2003///
2004/// # Returns
2005///
2006/// A string with forward slash separators suitable for object store operations
2007///
2008/// # Examples
2009///
2010/// ```rust
2011/// # use std::path::PathBuf;
2012/// # use nautilus_persistence::backend::catalog::local_to_object_store_path;
2013/// let local_path = PathBuf::from("data").join("quotes").join("EURUSD");
2014/// let object_path = local_to_object_store_path(&local_path);
2015/// assert_eq!(object_path, "data/quotes/EURUSD");
2016/// ```
2017#[must_use]
2018pub fn local_to_object_store_path(local_path: &Path) -> String {
2019    local_path.to_string_lossy().replace('\\', "/")
2020}
2021
2022/// Extracts path components using platform-appropriate path parsing.
2023///
2024/// This function safely parses a path into its components, handling both
2025/// local file system paths and object store paths correctly.
2026///
2027/// # Arguments
2028///
2029/// * `path_str` - The path string to parse
2030///
2031/// # Returns
2032///
2033/// A vector of path components
2034///
2035/// # Examples
2036///
2037/// ```rust
2038/// # use nautilus_persistence::backend::catalog::extract_path_components;
2039/// let components = extract_path_components("data/quotes/EURUSD");
2040/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2041///
2042/// // Works with both separators
2043/// let components = extract_path_components("data\\quotes\\EURUSD");
2044/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2045/// ```
2046#[must_use]
2047pub fn extract_path_components(path_str: &str) -> Vec<String> {
2048    // Normalize separators and split
2049    let normalized = path_str.replace('\\', "/");
2050    normalized
2051        .split('/')
2052        .filter(|s| !s.is_empty())
2053        .map(ToString::to_string)
2054        .collect()
2055}
2056
2057/// Checks if a filename's timestamp range intersects with a query interval.
2058///
2059/// This function determines whether a Parquet file (identified by its timestamp-based
2060/// filename) contains data that falls within the specified query time range.
2061///
2062/// # Parameters
2063///
2064/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
2065/// - `start`: Optional start timestamp for the query range.
2066/// - `end`: Optional end timestamp for the query range.
2067///
2068/// # Returns
2069///
2070/// Returns `true` if the file's time range intersects with the query range,
2071/// `false` otherwise. Returns `true` if the filename cannot be parsed.
2072///
2073/// # Examples
2074///
2075/// ```rust
2076/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
2077/// // Example with ISO format filenames
2078/// assert!(query_intersects_filename(
2079///     "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
2080///     Some(1609459200000000000),
2081///     Some(1609545600000000000)
2082/// ));
2083/// ```
2084fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
2085    if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
2086        (start.is_none() || start.unwrap() <= file_end)
2087            && (end.is_none() || file_start <= end.unwrap())
2088    } else {
2089        true
2090    }
2091}
2092
2093/// Parses timestamps from a Parquet filename.
2094///
2095/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
2096/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
2097///
2098/// # Parameters
2099///
2100/// - `filename`: The filename to parse (can be a full path).
2101///
2102/// # Returns
2103///
2104/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
2105/// `None` otherwise.
2106///
2107/// # Examples
2108///
2109/// ```rust
2110/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
2111/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
2112/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
2113/// ```
2114#[must_use]
2115pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
2116    let path = Path::new(filename);
2117    let base_name = path.file_name()?.to_str()?;
2118    let base_filename = base_name.strip_suffix(".parquet")?;
2119    let (first_part, second_part) = base_filename.split_once('_')?;
2120
2121    let first_iso = file_timestamp_to_iso_timestamp(first_part);
2122    let second_iso = file_timestamp_to_iso_timestamp(second_part);
2123
2124    let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
2125    let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
2126
2127    Some((first_ts, second_ts))
2128}
2129
2130/// Checks if a list of closed integer intervals are all mutually disjoint.
2131///
2132/// Two intervals are disjoint if they do not overlap. This function validates that
2133/// all intervals in the list are non-overlapping, which is a requirement for
2134/// maintaining data integrity in the catalog.
2135///
2136/// # Parameters
2137///
2138/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2139///
2140/// # Returns
2141///
2142/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
2143/// Returns `true` for empty lists or lists with a single interval.
2144///
2145/// # Examples
2146///
2147/// ```rust
2148/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
2149/// // Disjoint intervals
2150/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
2151///
2152/// // Overlapping intervals
2153/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
2154/// ```
2155#[must_use]
2156pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
2157    let n = intervals.len();
2158
2159    if n <= 1 {
2160        return true;
2161    }
2162
2163    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2164    sorted_intervals.sort_by_key(|&(start, _)| start);
2165
2166    for i in 0..(n - 1) {
2167        let (_, end1) = sorted_intervals[i];
2168        let (start2, _) = sorted_intervals[i + 1];
2169
2170        if end1 >= start2 {
2171            return false;
2172        }
2173    }
2174
2175    true
2176}
2177
2178/// Checks if intervals are contiguous (adjacent with no gaps).
2179///
2180/// Intervals are contiguous if, when sorted by start time, each interval's start
2181/// timestamp is exactly one more than the previous interval's end timestamp.
2182/// This ensures complete coverage of a time range with no gaps.
2183///
2184/// # Parameters
2185///
2186/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2187///
2188/// # Returns
2189///
2190/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
2191/// Returns `true` for empty lists or lists with a single interval.
2192///
2193/// # Examples
2194///
2195/// ```rust
2196/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
2197/// // Contiguous intervals
2198/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
2199///
2200/// // Non-contiguous intervals (gap between 5 and 8)
2201/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
2202/// ```
2203#[must_use]
2204pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
2205    let n = intervals.len();
2206    if n <= 1 {
2207        return true;
2208    }
2209
2210    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2211    sorted_intervals.sort_by_key(|&(start, _)| start);
2212
2213    for i in 0..(n - 1) {
2214        let (_, end1) = sorted_intervals[i];
2215        let (start2, _) = sorted_intervals[i + 1];
2216
2217        if end1 + 1 != start2 {
2218            return false;
2219        }
2220    }
2221
2222    true
2223}
2224
2225/// Finds the parts of a query interval that are not covered by existing data intervals.
2226///
2227/// This function calculates the "gaps" in data coverage by comparing a requested
2228/// time range against the intervals covered by existing data files. It's used to
2229/// determine what data needs to be fetched or backfilled.
2230///
2231/// # Parameters
2232///
2233/// - `start`: Start timestamp of the query interval (inclusive).
2234/// - `end`: End timestamp of the query interval (inclusive).
2235/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
2236///
2237/// # Returns
2238///
2239/// Returns a vector of (start, end) tuples representing the gaps in coverage.
2240/// Returns an empty vector if the query range is invalid or fully covered.
2241///
2242/// # Examples
2243///
2244/// ```rust
2245/// # use nautilus_persistence::backend::catalog::query_interval_diff;
2246/// // Query 1-100, have data for 10-30 and 60-80
2247/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
2248/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
2249/// ```
2250fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
2251    if start > end {
2252        return Vec::new();
2253    }
2254
2255    let interval_set = get_interval_set(closed_intervals);
2256    let query_range = (Bound::Included(start), Bound::Included(end));
2257    let query_diff = interval_set.get_interval_difference(&query_range);
2258    let mut result: Vec<(u64, u64)> = Vec::new();
2259
2260    for interval in query_diff {
2261        if let Some(tuple) = interval_to_tuple(interval, start, end) {
2262            result.push(tuple);
2263        }
2264    }
2265
2266    result
2267}
2268
2269/// Creates an interval tree from closed integer intervals.
2270///
2271/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
2272/// for use with the interval tree data structure, which is used for efficient
2273/// interval operations and gap detection.
2274///
2275/// # Parameters
2276///
2277/// - `intervals`: A slice of closed intervals as (start, end) tuples.
2278///
2279/// # Returns
2280///
2281/// Returns an [`IntervalTree`] containing the converted intervals.
2282///
2283/// # Notes
2284///
2285/// - Invalid intervals (where start > end) are skipped.
2286/// - Uses saturating addition to prevent overflow when converting to half-open intervals.
2287fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
2288    let mut tree = IntervalTree::default();
2289
2290    if intervals.is_empty() {
2291        return tree;
2292    }
2293
2294    for &(start, end) in intervals {
2295        if start > end {
2296            continue;
2297        }
2298
2299        tree.insert((
2300            Bound::Included(start),
2301            Bound::Excluded(end.saturating_add(1)),
2302        ));
2303    }
2304
2305    tree
2306}
2307
2308/// Converts an interval tree result back to a closed interval tuple.
2309///
2310/// This helper function converts the bounded interval representation used by
2311/// the interval tree back into the (start, end) tuple format used throughout
2312/// the catalog.
2313///
2314/// # Parameters
2315///
2316/// - `interval`: The bounded interval from the interval tree.
2317/// - `query_start`: The start of the original query range.
2318/// - `query_end`: The end of the original query range.
2319///
2320/// # Returns
2321///
2322/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
2323fn interval_to_tuple(
2324    interval: (Bound<&u64>, Bound<&u64>),
2325    query_start: u64,
2326    query_end: u64,
2327) -> Option<(u64, u64)> {
2328    let (bound_start, bound_end) = interval;
2329
2330    let start = match bound_start {
2331        Bound::Included(val) => *val,
2332        Bound::Excluded(val) => val.saturating_add(1),
2333        Bound::Unbounded => query_start,
2334    };
2335
2336    let end = match bound_end {
2337        Bound::Included(val) => *val,
2338        Bound::Excluded(val) => {
2339            if *val == 0 {
2340                return None; // Empty interval
2341            }
2342            val - 1
2343        }
2344        Bound::Unbounded => query_end,
2345    };
2346
2347    if start <= end {
2348        Some((start, end))
2349    } else {
2350        None
2351    }
2352}