nautilus_persistence/backend/catalog.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a comprehensive data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores.
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.).
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying.
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance.
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation.
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │ └── INSTRUMENT_ID/
37//! │ └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │ └── INSTRUMENT_ID/
40//! │ └── start_ts-end_ts.parquet
41//! └── bars/
42//! └── INSTRUMENT_ID/
43//! └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//! PathBuf::from("/path/to/data"),
55//! None, // storage_options
56//! Some(5000), // batch_size
57//! None, // compression (defaults to SNAPPY)
58//! None, // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66 fmt::Debug,
67 ops::Bound,
68 path::{Path, PathBuf},
69 sync::Arc,
70};
71
72use ahash::AHashMap;
73use datafusion::arrow::record_batch::RecordBatch;
74use futures::StreamExt;
75use heck::ToSnakeCase;
76use itertools::Itertools;
77use nautilus_common::live::get_runtime;
78use nautilus_core::{
79 UnixNanos,
80 datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
81};
82use nautilus_model::data::{
83 Bar, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
84 QuoteTick, TradeTick, close::InstrumentClose, to_variant,
85};
86use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
87use object_store::{ObjectStore, path::Path as ObjectPath};
88use serde::Serialize;
89use unbounded_interval_tree::interval_tree::IntervalTree;
90
91use super::session::{self, DataBackendSession, QueryResult, build_query};
92use crate::parquet::write_batches_to_object_store;
93
94/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
95///
96/// The `ParquetDataCatalog` provides a comprehensive solution for managing large volumes of financial
97/// market data with efficient storage, querying, and consolidation capabilities. It supports various
98/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
99///
100/// # Features
101///
102/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression.
103/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate.
104/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance.
105/// - **Data Validation**: Ensures timestamp ordering and interval consistency.
106/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed.
107/// - **Type Safety**: Strongly typed data handling with compile-time guarantees.
108///
109/// # Data Organization
110///
111/// Data is organized hierarchically by data type and instrument:
112/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`.
113/// - Files are named with their timestamp ranges for efficient range queries.
114/// - Intervals are validated to be disjoint to prevent data overlap.
115///
116/// # Performance Considerations
117///
118/// - **Batch Size**: Controls memory usage during data processing.
119/// - **Compression**: SNAPPY compression provides good balance of speed and size.
120/// - **Row Group Size**: Affects query performance and memory usage.
121/// - **File Consolidation**: Reduces the number of files for better query performance.
122pub struct ParquetDataCatalog {
123 /// The base path for data storage within the object store.
124 pub base_path: String,
125 /// The original URI provided when creating the catalog.
126 pub original_uri: String,
127 /// The object store backend for data persistence.
128 pub object_store: Arc<dyn ObjectStore>,
129 /// The DataFusion session for query execution.
130 pub session: DataBackendSession,
131 /// The number of records to process in each batch.
132 pub batch_size: usize,
133 /// The compression algorithm used for Parquet files.
134 pub compression: parquet::basic::Compression,
135 /// The maximum number of rows in each Parquet row group.
136 pub max_row_group_size: usize,
137}
138
139impl Debug for ParquetDataCatalog {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct(stringify!(ParquetDataCatalog))
142 .field("base_path", &self.base_path)
143 .finish()
144 }
145}
146
147impl ParquetDataCatalog {
148 /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
149 ///
150 /// This is a convenience constructor that converts a local path to a URI format
151 /// and delegates to [`Self::from_uri`].
152 ///
153 /// # Parameters
154 ///
155 /// - `base_path`: The base directory path for data storage.
156 /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
157 /// - `batch_size`: Number of records to process in each batch (default: 5000).
158 /// - `compression`: Parquet compression algorithm (default: SNAPPY).
159 /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
160 ///
161 /// # Panics
162 ///
163 /// Panics if the path cannot be converted to a valid URI or if the object store
164 /// cannot be created from the path.
165 ///
166 /// # Examples
167 ///
168 /// ```rust,no_run
169 /// use std::path::PathBuf;
170 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
171 ///
172 /// let catalog = ParquetDataCatalog::new(
173 /// PathBuf::from("/tmp/nautilus_data"),
174 /// None, // no storage options
175 /// Some(1000), // smaller batch size
176 /// None, // default compression
177 /// None, // default row group size
178 /// );
179 /// ```
180 #[must_use]
181 pub fn new(
182 base_path: PathBuf,
183 storage_options: Option<AHashMap<String, String>>,
184 batch_size: Option<usize>,
185 compression: Option<parquet::basic::Compression>,
186 max_row_group_size: Option<usize>,
187 ) -> Self {
188 let path_str = base_path.to_string_lossy().to_string();
189 Self::from_uri(
190 &path_str,
191 storage_options,
192 batch_size,
193 compression,
194 max_row_group_size,
195 )
196 .expect("Failed to create catalog from path")
197 }
198
199 /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
200 ///
201 /// Supports various URI schemes including local file paths and multiple cloud storage backends
202 /// supported by the `object_store` crate.
203 ///
204 /// # Supported URI Schemes
205 ///
206 /// - **AWS S3**: `s3://bucket/path`.
207 /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`.
208 /// - **Azure Blob Storage**: `az://container/path` or `abfs://container@account.dfs.core.windows.net/path`.
209 /// - **HTTP/WebDAV**: `http://` or `https://`.
210 /// - **Local files**: `file://path` or plain paths.
211 ///
212 /// # Parameters
213 ///
214 /// - `uri`: The URI for the data storage location.
215 /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
216 /// - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
217 /// - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
218 /// - For Azure: `account_name`, `account_key`, `sas_token`, etc.
219 /// - `batch_size`: Number of records to process in each batch (default: 5000).
220 /// - `compression`: Parquet compression algorithm (default: SNAPPY).
221 /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
222 ///
223 /// # Errors
224 ///
225 /// Returns an error if:
226 /// - The URI format is invalid or unsupported.
227 /// - The object store cannot be created or accessed.
228 /// - Authentication fails for cloud storage backends.
229 ///
230 /// # Examples
231 ///
232 /// ```rust,no_run
233 /// use ahash::AHashMap;
234 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
235 ///
236 /// // Local filesystem
237 /// let local_catalog = ParquetDataCatalog::from_uri(
238 /// "/tmp/nautilus_data",
239 /// None, None, None, None
240 /// )?;
241 ///
242 /// // S3 bucket
243 /// let s3_catalog = ParquetDataCatalog::from_uri(
244 /// "s3://my-bucket/nautilus-data",
245 /// None, None, None, None
246 /// )?;
247 ///
248 /// // Google Cloud Storage
249 /// let gcs_catalog = ParquetDataCatalog::from_uri(
250 /// "gs://my-bucket/nautilus-data",
251 /// None, None, None, None
252 /// )?;
253 ///
254 /// // Azure Blob Storage
255 /// let azure_catalog = ParquetDataCatalog::from_uri(
256 /// "az://container/nautilus-data",
257 /// storage_options, None, None, None
258 /// )?;
259 ///
260 /// // S3 with custom endpoint and credentials
261 /// let mut storage_options = HashMap::new();
262 /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
263 /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
264 /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
265 ///
266 /// let s3_catalog = ParquetDataCatalog::from_uri(
267 /// "s3://my-bucket/nautilus-data",
268 /// Some(storage_options),
269 /// None, None, None,
270 /// )?;
271 /// # Ok::<(), anyhow::Error>(())
272 /// ```
273 pub fn from_uri(
274 uri: &str,
275 storage_options: Option<AHashMap<String, String>>,
276 batch_size: Option<usize>,
277 compression: Option<parquet::basic::Compression>,
278 max_row_group_size: Option<usize>,
279 ) -> anyhow::Result<Self> {
280 let batch_size = batch_size.unwrap_or(5000);
281 let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
282 let max_row_group_size = max_row_group_size.unwrap_or(5000);
283
284 let (object_store, base_path, original_uri) =
285 crate::parquet::create_object_store_from_path(uri, storage_options)?;
286
287 Ok(Self {
288 base_path,
289 original_uri,
290 object_store,
291 session: session::DataBackendSession::new(batch_size),
292 batch_size,
293 compression,
294 max_row_group_size,
295 })
296 }
297
298 /// Returns the base path of the catalog for testing purposes.
299 #[must_use]
300 pub fn get_base_path(&self) -> String {
301 self.base_path.clone()
302 }
303
304 /// Resets the backend session to clear any cached table registrations.
305 ///
306 /// This is useful during catalog operations when files are being modified
307 /// and we need to ensure fresh data is loaded.
308 pub fn reset_session(&mut self) {
309 self.session.clear_registered_tables();
310 }
311
312 /// Writes mixed data types to the catalog by separating them into type-specific collections.
313 ///
314 /// This method takes a heterogeneous collection of market data and separates it by type,
315 /// then writes each type to its appropriate location in the catalog. This is useful when
316 /// processing mixed data streams or bulk data imports.
317 ///
318 /// # Parameters
319 ///
320 /// - `data`: A vector of mixed [`Data`] enum variants.
321 /// - `start`: Optional start timestamp to override the data's natural range.
322 /// - `end`: Optional end timestamp to override the data's natural range.
323 ///
324 /// # Notes
325 ///
326 /// - Data is automatically sorted by type before writing.
327 /// - Each data type is written to its own directory structure.
328 /// - Instrument data handling is not yet implemented (TODO).
329 ///
330 /// # Examples
331 ///
332 /// ```rust,no_run
333 /// use nautilus_model::data::Data;
334 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
335 ///
336 /// let catalog = ParquetDataCatalog::new(/* ... */);
337 /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
338 ///
339 /// catalog.write_data_enum(mixed_data, None, None)?;
340 /// ```
341 pub fn write_data_enum(
342 &self,
343 data: Vec<Data>,
344 start: Option<UnixNanos>,
345 end: Option<UnixNanos>,
346 ) -> anyhow::Result<()> {
347 let mut deltas: Vec<OrderBookDelta> = Vec::new();
348 let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
349 let mut quotes: Vec<QuoteTick> = Vec::new();
350 let mut trades: Vec<TradeTick> = Vec::new();
351 let mut bars: Vec<Bar> = Vec::new();
352 let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
353 let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
354 let mut closes: Vec<InstrumentClose> = Vec::new();
355
356 for d in data.iter().cloned() {
357 match d {
358 Data::Deltas(_) => continue,
359 Data::Delta(d) => {
360 deltas.push(d);
361 }
362 Data::Depth10(d) => {
363 depth10s.push(*d);
364 }
365 Data::Quote(d) => {
366 quotes.push(d);
367 }
368 Data::Trade(d) => {
369 trades.push(d);
370 }
371 Data::Bar(d) => {
372 bars.push(d);
373 }
374 Data::MarkPriceUpdate(p) => {
375 mark_prices.push(p);
376 }
377 Data::IndexPriceUpdate(p) => {
378 index_prices.push(p);
379 }
380 Data::InstrumentClose(c) => {
381 closes.push(c);
382 }
383 }
384 }
385
386 // TODO: need to handle instruments here
387
388 self.write_to_parquet(deltas, start, end, None)?;
389 self.write_to_parquet(depth10s, start, end, None)?;
390 self.write_to_parquet(quotes, start, end, None)?;
391 self.write_to_parquet(trades, start, end, None)?;
392 self.write_to_parquet(bars, start, end, None)?;
393 self.write_to_parquet(mark_prices, start, end, None)?;
394 self.write_to_parquet(index_prices, start, end, None)?;
395 self.write_to_parquet(closes, start, end, None)?;
396
397 Ok(())
398 }
399
400 /// Writes typed data to a Parquet file in the catalog.
401 ///
402 /// This is the core method for persisting market data to the catalog. It handles data
403 /// validation, batching, compression, and ensures proper file organization with
404 /// timestamp-based naming.
405 ///
406 /// # Type Parameters
407 ///
408 /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
409 ///
410 /// # Parameters
411 ///
412 /// - `data`: Vector of data records to write (must be in ascending timestamp order).
413 /// - `start`: Optional start timestamp to override the natural data range.
414 /// - `end`: Optional end timestamp to override the natural data range.
415 ///
416 /// # Returns
417 ///
418 /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
419 ///
420 /// # Errors
421 ///
422 /// Returns an error if:
423 /// - Data serialization to Arrow record batches fails.
424 /// - Object store write operations fail.
425 /// - File path construction fails.
426 /// - Timestamp interval validation fails after writing.
427 ///
428 /// # Panics
429 ///
430 /// Panics if:
431 /// - Data timestamps are not in ascending order.
432 /// - Record batches are empty after conversion.
433 /// - Required metadata is missing from the schema.
434 ///
435 /// # Examples
436 ///
437 /// ```rust,no_run
438 /// use nautilus_model::data::QuoteTick;
439 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
440 ///
441 /// let catalog = ParquetDataCatalog::new(/* ... */);
442 /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
443 ///
444 /// let path = catalog.write_to_parquet(quotes, None, None)?;
445 /// println!("Data written to: {:?}", path);
446 /// # Ok::<(), anyhow::Error>(())
447 /// ```
448 pub fn write_to_parquet<T>(
449 &self,
450 data: Vec<T>,
451 start: Option<UnixNanos>,
452 end: Option<UnixNanos>,
453 skip_disjoint_check: Option<bool>,
454 ) -> anyhow::Result<PathBuf>
455 where
456 T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
457 {
458 if data.is_empty() {
459 return Ok(PathBuf::new());
460 }
461
462 let type_name = std::any::type_name::<T>().to_snake_case();
463 Self::check_ascending_timestamps(&data, &type_name)?;
464
465 let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
466 let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
467
468 let batches = self.data_to_record_batches(data)?;
469 let schema = batches.first().expect("Batches are empty.").schema();
470 let instrument_id = schema.metadata.get("instrument_id").cloned();
471
472 let directory = self.make_path(T::path_prefix(), instrument_id)?;
473 let filename = timestamps_to_filename(start_ts, end_ts);
474 let path = PathBuf::from(format!("{directory}/{filename}"));
475
476 // Write all batches to parquet file
477 log::info!(
478 "Writing {} batches of {type_name} data to {path:?}",
479 batches.len()
480 );
481
482 // Convert path to object store path
483 let object_path = self.to_object_path(&path.to_string_lossy());
484
485 self.execute_async(async {
486 write_batches_to_object_store(
487 &batches,
488 self.object_store.clone(),
489 &object_path,
490 Some(self.compression),
491 Some(self.max_row_group_size),
492 )
493 .await
494 })?;
495
496 if !skip_disjoint_check.unwrap_or(false) {
497 let intervals = self.get_directory_intervals(&directory)?;
498
499 if !are_intervals_disjoint(&intervals) {
500 anyhow::bail!("Intervals are not disjoint after writing a new file");
501 }
502 }
503
504 Ok(path)
505 }
506
507 /// Writes typed data to a JSON file in the catalog.
508 ///
509 /// This method provides an alternative to Parquet format for data export and debugging.
510 /// JSON files are human-readable but less efficient for large datasets.
511 ///
512 /// # Type Parameters
513 ///
514 /// - `T`: The data type to write, must implement serialization and cataloging traits.
515 ///
516 /// # Parameters
517 ///
518 /// - `data`: Vector of data records to write (must be in ascending timestamp order).
519 /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
520 /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
521 ///
522 /// # Returns
523 ///
524 /// Returns the [`PathBuf`] of the created JSON file.
525 ///
526 /// # Errors
527 ///
528 /// Returns an error if:
529 /// - JSON serialization fails.
530 /// - Object store write operations fail.
531 /// - File path construction fails.
532 ///
533 /// # Panics
534 ///
535 /// Panics if data timestamps are not in ascending order.
536 ///
537 /// # Examples
538 ///
539 /// ```rust,no_run
540 /// use std::path::PathBuf;
541 /// use nautilus_model::data::TradeTick;
542 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
543 ///
544 /// let catalog = ParquetDataCatalog::new(/* ... */);
545 /// let trades: Vec<TradeTick> = vec![/* trade data */];
546 ///
547 /// let path = catalog.write_to_json(
548 /// trades,
549 /// Some(PathBuf::from("/custom/path")),
550 /// true // write metadata
551 /// )?;
552 /// # Ok::<(), anyhow::Error>(())
553 /// ```
554 pub fn write_to_json<T>(
555 &self,
556 data: Vec<T>,
557 path: Option<PathBuf>,
558 write_metadata: bool,
559 ) -> anyhow::Result<PathBuf>
560 where
561 T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
562 {
563 if data.is_empty() {
564 return Ok(PathBuf::new());
565 }
566
567 let type_name = std::any::type_name::<T>().to_snake_case();
568 Self::check_ascending_timestamps(&data, &type_name)?;
569
570 let start_ts = data.first().unwrap().ts_init();
571 let end_ts = data.last().unwrap().ts_init();
572
573 let directory =
574 path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
575 let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
576 let json_path = directory.join(&filename);
577
578 log::info!(
579 "Writing {} records of {type_name} data to {json_path:?}",
580 data.len()
581 );
582
583 if write_metadata {
584 let metadata = T::chunk_metadata(&data);
585 let metadata_path = json_path.with_extension("metadata.json");
586 log::info!("Writing metadata to {metadata_path:?}");
587
588 // Use object store for metadata file
589 let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
590 let metadata_json = serde_json::to_vec_pretty(&metadata)?;
591 self.execute_async(async {
592 self.object_store
593 .put(&metadata_object_path, metadata_json.into())
594 .await
595 .map_err(anyhow::Error::from)
596 })?;
597 }
598
599 // Use object store for main JSON file
600 let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
601 let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
602 self.execute_async(async {
603 self.object_store
604 .put(&json_object_path, json_data.into())
605 .await
606 .map_err(anyhow::Error::from)
607 })?;
608
609 Ok(json_path)
610 }
611
612 /// Validates that data timestamps are in ascending order.
613 ///
614 /// # Parameters
615 ///
616 /// - `data`: Slice of data records to validate.
617 /// - `type_name`: Name of the data type for error messages.
618 ///
619 /// # Panics
620 ///
621 /// Panics if any timestamp is less than the previous timestamp.
622 pub fn check_ascending_timestamps<T: HasTsInit>(
623 data: &[T],
624 type_name: &str,
625 ) -> anyhow::Result<()> {
626 if !data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()) {
627 anyhow::bail!("{type_name} timestamps must be in ascending order");
628 }
629
630 Ok(())
631 }
632
633 /// Converts data into Arrow record batches for Parquet serialization.
634 ///
635 /// This method chunks the data according to the configured batch size and converts
636 /// each chunk into an Arrow record batch with appropriate metadata.
637 ///
638 /// # Type Parameters
639 ///
640 /// - `T`: The data type to convert, must implement required encoding traits.
641 ///
642 /// # Parameters
643 ///
644 /// - `data`: Vector of data records to convert.
645 ///
646 /// # Returns
647 ///
648 /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
649 ///
650 /// # Errors
651 ///
652 /// Returns an error if record batch encoding fails for any chunk.
653 pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
654 where
655 T: HasTsInit + EncodeToRecordBatch,
656 {
657 let mut batches = Vec::new();
658
659 for chunk in &data.into_iter().chunks(self.batch_size) {
660 let data = chunk.collect_vec();
661 let metadata = EncodeToRecordBatch::chunk_metadata(&data);
662 let record_batch = T::encode_batch(&metadata, &data)?;
663 batches.push(record_batch);
664 }
665
666 Ok(batches)
667 }
668
669 /// Extends the timestamp range of an existing Parquet file by renaming it.
670 ///
671 /// This method finds an existing file that is adjacent to the specified time range
672 /// and renames it to include the new range. This is useful when appending data
673 /// that extends the time coverage of existing files.
674 ///
675 /// # Parameters
676 ///
677 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
678 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
679 /// - `start`: Start timestamp of the new range to extend to.
680 /// - `end`: End timestamp of the new range to extend to.
681 ///
682 /// # Returns
683 ///
684 /// Returns `Ok(())` on success, or an error if the operation fails.
685 ///
686 /// # Errors
687 ///
688 /// Returns an error if:
689 /// - The directory path cannot be constructed.
690 /// - No adjacent file is found to extend.
691 /// - File rename operations fail.
692 /// - Interval validation fails after extension.
693 ///
694 /// # Examples
695 ///
696 /// ```rust,no_run
697 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
698 /// use nautilus_core::UnixNanos;
699 ///
700 /// let catalog = ParquetDataCatalog::new(/* ... */);
701 ///
702 /// // Extend a file's range backwards or forwards
703 /// catalog.extend_file_name(
704 /// "quotes",
705 /// Some("BTCUSD".to_string()),
706 /// UnixNanos::from(1609459200000000000),
707 /// UnixNanos::from(1609545600000000000)
708 /// )?;
709 /// # Ok::<(), anyhow::Error>(())
710 /// ```
711 pub fn extend_file_name(
712 &self,
713 data_cls: &str,
714 instrument_id: Option<String>,
715 start: UnixNanos,
716 end: UnixNanos,
717 ) -> anyhow::Result<()> {
718 let directory = self.make_path(data_cls, instrument_id)?;
719 let intervals = self.get_directory_intervals(&directory)?;
720
721 let start = start.as_u64();
722 let end = end.as_u64();
723
724 for interval in intervals {
725 if interval.0 == end + 1 {
726 // Extend backwards: new file covers [start, interval.1]
727 self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
728 break;
729 } else if interval.1 == start - 1 {
730 // Extend forwards: new file covers [interval.0, end]
731 self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
732 break;
733 }
734 }
735
736 let intervals = self.get_directory_intervals(&directory)?;
737
738 if !are_intervals_disjoint(&intervals) {
739 anyhow::bail!("Intervals are not disjoint after extending a file");
740 }
741
742 Ok(())
743 }
744
745 /// Lists all Parquet files in a specified directory.
746 ///
747 /// This method scans a directory and returns the full paths of all files with the `.parquet`
748 /// extension. It works with both local filesystems and remote object stores, making it
749 /// suitable for various storage backends.
750 ///
751 /// # Parameters
752 ///
753 /// - `directory`: The directory path to scan for Parquet files.
754 ///
755 /// # Returns
756 ///
757 /// Returns a vector of full file paths (as strings) for all Parquet files found in the directory.
758 /// The paths are relative to the object store root and suitable for use with object store operations.
759 /// Returns an empty vector if the directory doesn't exist or contains no Parquet files.
760 ///
761 /// # Errors
762 ///
763 /// Returns an error if:
764 /// - Object store listing operations fail.
765 /// - Directory access is denied.
766 /// - Network issues occur (for remote object stores).
767 ///
768 /// # Notes
769 ///
770 /// - Only files ending with `.parquet` are included.
771 /// - Subdirectories are not recursively scanned.
772 /// - File paths are returned in the order provided by the object store.
773 /// - Works with all supported object store backends (local, S3, GCS, Azure, etc.).
774 ///
775 /// # Examples
776 ///
777 /// ```rust,no_run
778 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
779 ///
780 /// let catalog = ParquetDataCatalog::new(/* ... */);
781 /// let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
782 ///
783 /// for file in files {
784 /// println!("Found Parquet file: {}", file);
785 /// }
786 /// # Ok::<(), anyhow::Error>(())
787 /// ```
788 pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
789 self.execute_async(async {
790 let prefix = ObjectPath::from(format!("{directory}/"));
791 let mut stream = self.object_store.list(Some(&prefix));
792 let mut files = Vec::new();
793
794 while let Some(object) = stream.next().await {
795 let object = object?;
796 if object.location.as_ref().ends_with(".parquet") {
797 files.push(object.location.to_string());
798 }
799 }
800 Ok::<Vec<String>, anyhow::Error>(files)
801 })
802 }
803
804 /// Helper method to reconstruct full URI for remote object store paths
805 #[must_use]
806 pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
807 // Check if this is a remote URI scheme that needs reconstruction
808 if self.is_remote_uri() {
809 // Extract the base URL (scheme + host) from the original URI
810 if let Ok(url) = url::Url::parse(&self.original_uri)
811 && let Some(host) = url.host_str()
812 {
813 return format!("{}://{}/{}", url.scheme(), host, path_str);
814 }
815 }
816
817 // For local paths, extract the directory from the original URI
818 if self.original_uri.starts_with("file://") {
819 // Extract the path from the file:// URI
820 if let Ok(url) = url::Url::parse(&self.original_uri)
821 && let Ok(base_path) = url.to_file_path()
822 {
823 // Use platform-appropriate path separator for display
824 // but object store paths always use forward slashes
825 let base_str = base_path.to_string_lossy();
826 return self.join_paths(&base_str, path_str);
827 }
828 }
829
830 // For local paths without file:// prefix, use the original URI as base
831 if self.base_path.is_empty() {
832 // If base_path is empty and not a file URI, try using original_uri as base
833 if self.original_uri.contains("://") {
834 // Fallback: return the path as-is
835 path_str.to_string()
836 } else {
837 self.join_paths(self.original_uri.trim_end_matches('/'), path_str)
838 }
839 } else {
840 let base = self.base_path.trim_end_matches('/');
841 self.join_paths(base, path_str)
842 }
843 }
844
845 /// Helper method to join paths using forward slashes (object store convention)
846 #[must_use]
847 fn join_paths(&self, base: &str, path: &str) -> String {
848 make_object_store_path(base, &[path])
849 }
850
851 /// Helper method to check if the original URI uses a remote object store scheme
852 #[must_use]
853 pub fn is_remote_uri(&self) -> bool {
854 self.original_uri.starts_with("s3://")
855 || self.original_uri.starts_with("gs://")
856 || self.original_uri.starts_with("gcs://")
857 || self.original_uri.starts_with("az://")
858 || self.original_uri.starts_with("abfs://")
859 || self.original_uri.starts_with("http://")
860 || self.original_uri.starts_with("https://")
861 }
862
863 /// Executes a query against the catalog to retrieve market data of a specific type.
864 ///
865 /// This is the primary method for querying data from the catalog. It registers the appropriate
866 /// object store with the DataFusion session, finds all relevant Parquet files, and executes
867 /// the query across them. The method supports filtering by instrument IDs, time ranges, and
868 /// custom SQL WHERE clauses.
869 ///
870 /// # Type Parameters
871 ///
872 /// - `T`: The data type to query, must implement required traits for deserialization and cataloging.
873 ///
874 /// # Parameters
875 ///
876 /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
877 /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
878 /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
879 /// - `where_clause`: Optional SQL WHERE clause for additional filtering (e.g., "price > 100").
880 ///
881 /// # Returns
882 ///
883 /// Returns a [`QueryResult`] containing the query execution context and data.
884 /// Use [`QueryResult::collect()`] to retrieve the actual data records.
885 ///
886 /// # Errors
887 ///
888 /// Returns an error if:
889 /// - Object store registration fails for remote URIs.
890 /// - File discovery fails.
891 /// - DataFusion query execution fails.
892 /// - Data deserialization fails.
893 ///
894 /// # Performance Notes
895 ///
896 /// - Files are automatically filtered by timestamp ranges before querying.
897 /// - DataFusion optimizes queries across multiple Parquet files.
898 /// - Use specific instrument IDs and time ranges to improve performance.
899 /// - WHERE clauses are pushed down to the Parquet reader when possible.
900 ///
901 /// # Examples
902 ///
903 /// ```rust,no_run
904 /// use nautilus_model::data::QuoteTick;
905 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
906 /// use nautilus_core::UnixNanos;
907 ///
908 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
909 ///
910 /// // Query all quote data
911 /// let result = catalog.query::<QuoteTick>(None, None, None, None)?;
912 /// let quotes = result.collect();
913 ///
914 /// // Query specific instruments within a time range
915 /// let result = catalog.query::<QuoteTick>(
916 /// Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
917 /// Some(UnixNanos::from(1609459200000000000)),
918 /// Some(UnixNanos::from(1609545600000000000)),
919 /// None
920 /// )?;
921 ///
922 /// // Query with custom WHERE clause
923 /// let result = catalog.query::<QuoteTick>(
924 /// Some(vec!["EURUSD".to_string()]),
925 /// None,
926 /// None,
927 /// Some("bid_price > 1.2000")
928 /// )?;
929 /// # Ok::<(), anyhow::Error>(())
930 /// ```
931 pub fn query<T>(
932 &mut self,
933 instrument_ids: Option<Vec<String>>,
934 start: Option<UnixNanos>,
935 end: Option<UnixNanos>,
936 where_clause: Option<&str>,
937 files: Option<Vec<String>>,
938 ) -> anyhow::Result<QueryResult>
939 where
940 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
941 {
942 // Register the object store with the session for remote URIs
943 if self.is_remote_uri() {
944 let url = url::Url::parse(&self.original_uri)?;
945 let host = url
946 .host_str()
947 .ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
948 let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
949 self.session
950 .register_object_store(&base_url, self.object_store.clone());
951 }
952
953 let files_list = if let Some(files) = files {
954 files
955 } else {
956 self.query_files(T::path_prefix(), instrument_ids, start, end)?
957 };
958
959 for file_uri in &files_list {
960 // Extract identifier from file path and filename to create meaningful table names
961 let identifier = extract_identifier_from_path(file_uri);
962 let safe_sql_identifier = make_sql_safe_identifier(&identifier);
963 let safe_filename = extract_sql_safe_filename(file_uri);
964
965 // Create table name from path_prefix, identifier, and filename
966 let table_name = format!(
967 "{}_{}_{}",
968 T::path_prefix(),
969 safe_sql_identifier,
970 safe_filename
971 );
972 let query = build_query(&table_name, start, end, where_clause);
973
974 // Convert object store path to filesystem path for DataFusion
975 // Only apply reconstruction if the path is not already absolute
976 let resolved_path = if file_uri.starts_with('/') {
977 // Path is already absolute, use as-is
978 file_uri.clone()
979 } else {
980 // Path is relative, reconstruct full URI
981 self.reconstruct_full_uri(file_uri)
982 };
983 self.session
984 .add_file::<T>(&table_name, &resolved_path, Some(&query))?;
985 }
986
987 Ok(self.session.get_query_result())
988 }
989
990 /// Queries typed data from the catalog and returns results as a strongly-typed vector.
991 ///
992 /// This is a convenience method that wraps the generic `query` method and automatically
993 /// collects and converts the results into a vector of the specific data type. It handles
994 /// the type conversion from the generic [`Data`] enum to the concrete type `T`.
995 ///
996 /// # Type Parameters
997 ///
998 /// - `T`: The specific data type to query and return. Must implement required traits for
999 /// deserialization, cataloging, and conversion from the [`Data`] enum.
1000 ///
1001 /// # Parameters
1002 ///
1003 /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
1004 /// For exact matches, provide the full instrument ID. For bars, partial matches are supported.
1005 /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1006 /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1007 /// - `where_clause`: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax
1008 /// with column names matching the Parquet schema (e.g., "`bid_price` > 1.2000", "volume > 1000").
1009 ///
1010 /// # Returns
1011 ///
1012 /// Returns a vector of the specific data type `T`, sorted by timestamp. The vector will be
1013 /// empty if no data matches the query criteria.
1014 ///
1015 /// # Errors
1016 ///
1017 /// Returns an error if:
1018 /// - The underlying query execution fails.
1019 /// - Data type conversion fails.
1020 /// - Object store access fails.
1021 /// - Invalid WHERE clause syntax is provided.
1022 ///
1023 /// # Performance Considerations
1024 ///
1025 /// - Use specific instrument IDs and time ranges to minimize data scanning.
1026 /// - WHERE clauses are pushed down to Parquet readers when possible.
1027 /// - Results are automatically sorted by timestamp during collection.
1028 /// - Memory usage scales with the amount of data returned.
1029 ///
1030 /// # Examples
1031 ///
1032 /// ```rust,no_run
1033 /// use nautilus_model::data::{QuoteTick, TradeTick, Bar};
1034 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1035 /// use nautilus_core::UnixNanos;
1036 ///
1037 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1038 ///
1039 /// // Query all quotes for a specific instrument
1040 /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1041 /// Some(vec!["EURUSD".to_string()]),
1042 /// None,
1043 /// None,
1044 /// None
1045 /// )?;
1046 ///
1047 /// // Query trades within a specific time range
1048 /// let trades: Vec<TradeTick> = catalog.query_typed_data(
1049 /// Some(vec!["BTCUSD".to_string()]),
1050 /// Some(UnixNanos::from(1609459200000000000)),
1051 /// Some(UnixNanos::from(1609545600000000000)),
1052 /// None
1053 /// )?;
1054 ///
1055 /// // Query bars with volume filter
1056 /// let bars: Vec<Bar> = catalog.query_typed_data(
1057 /// Some(vec!["AAPL".to_string()]),
1058 /// None,
1059 /// None,
1060 /// Some("volume > 1000000")
1061 /// )?;
1062 ///
1063 /// // Query multiple instruments with price filter
1064 /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1065 /// Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
1066 /// None,
1067 /// None,
1068 /// Some("bid_price > 1.2000 AND ask_price < 1.3000")
1069 /// )?;
1070 /// # Ok::<(), anyhow::Error>(())
1071 /// ```
1072 pub fn query_typed_data<T>(
1073 &mut self,
1074 instrument_ids: Option<Vec<String>>,
1075 start: Option<UnixNanos>,
1076 end: Option<UnixNanos>,
1077 where_clause: Option<&str>,
1078 files: Option<Vec<String>>,
1079 ) -> anyhow::Result<Vec<T>>
1080 where
1081 T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
1082 {
1083 // Reset session to allow repeated queries (streams are consumed on each query)
1084 self.reset_session();
1085
1086 let query_result = self.query::<T>(instrument_ids, start, end, where_clause, files)?;
1087 let all_data = query_result.collect();
1088
1089 // Convert Data enum variants to specific type T using to_variant
1090 Ok(to_variant::<T>(all_data))
1091 }
1092
1093 /// Queries all Parquet files for a specific data type and optional instrument IDs.
1094 ///
1095 /// This method finds all Parquet files that match the specified criteria and returns
1096 /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1097 /// and timestamp range (if provided).
1098 ///
1099 /// # Parameters
1100 ///
1101 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1102 /// - `instrument_ids`: Optional list of instrument IDs to filter by.
1103 /// - `start`: Optional start timestamp to filter files by their time range.
1104 /// - `end`: Optional end timestamp to filter files by their time range.
1105 ///
1106 /// # Returns
1107 ///
1108 /// Returns a vector of file URI strings that match the query criteria,
1109 /// or an error if the query fails.
1110 ///
1111 /// # Errors
1112 ///
1113 /// Returns an error if:
1114 /// - The directory path cannot be constructed.
1115 /// - Object store listing operations fail.
1116 /// - URI reconstruction fails.
1117 ///
1118 /// # Examples
1119 ///
1120 /// ```rust,no_run
1121 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1122 /// use nautilus_core::UnixNanos;
1123 ///
1124 /// let catalog = ParquetDataCatalog::new(/* ... */);
1125 ///
1126 /// // Query all quote files
1127 /// let files = catalog.query_files("quotes", None, None, None)?;
1128 ///
1129 /// // Query trade files for specific instruments within a time range
1130 /// let files = catalog.query_files(
1131 /// "trades",
1132 /// Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
1133 /// Some(UnixNanos::from(1609459200000000000)),
1134 /// Some(UnixNanos::from(1609545600000000000))
1135 /// )?;
1136 /// # Ok::<(), anyhow::Error>(())
1137 /// ```
1138 pub fn query_files(
1139 &self,
1140 data_cls: &str,
1141 instrument_ids: Option<Vec<String>>,
1142 start: Option<UnixNanos>,
1143 end: Option<UnixNanos>,
1144 ) -> anyhow::Result<Vec<String>> {
1145 let mut files = Vec::new();
1146
1147 let start_u64 = start.map(|s| s.as_u64());
1148 let end_u64 = end.map(|e| e.as_u64());
1149
1150 let base_dir = self.make_path(data_cls, None)?;
1151
1152 // Use recursive listing to match Python's glob behavior
1153 let list_result = self.execute_async(async {
1154 let prefix = ObjectPath::from(format!("{base_dir}/"));
1155 let mut stream = self.object_store.list(Some(&prefix));
1156 let mut objects = Vec::new();
1157 while let Some(object) = stream.next().await {
1158 objects.push(object?);
1159 }
1160 Ok::<Vec<_>, anyhow::Error>(objects)
1161 })?;
1162
1163 let mut file_paths: Vec<String> = list_result
1164 .into_iter()
1165 .filter_map(|object| {
1166 let path_str = object.location.to_string();
1167 if path_str.ends_with(".parquet") {
1168 Some(path_str)
1169 } else {
1170 None
1171 }
1172 })
1173 .collect();
1174
1175 // Apply identifier filtering if provided
1176 if let Some(identifiers) = instrument_ids {
1177 let safe_identifiers: Vec<String> = identifiers
1178 .iter()
1179 .map(|id| urisafe_instrument_id(id))
1180 .collect();
1181
1182 // Exact match by default for instrument_ids or bar_types
1183 let exact_match_file_paths: Vec<String> = file_paths
1184 .iter()
1185 .filter(|file_path| {
1186 // Extract the directory name (second to last path component)
1187 let path_parts: Vec<&str> = file_path.split('/').collect();
1188 if path_parts.len() >= 2 {
1189 let dir_name = path_parts[path_parts.len() - 2];
1190 safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
1191 } else {
1192 false
1193 }
1194 })
1195 .cloned()
1196 .collect();
1197
1198 if exact_match_file_paths.is_empty() && data_cls == "bars" {
1199 // Partial match of instrument_ids in bar_types for bars
1200 file_paths.retain(|file_path| {
1201 let path_parts: Vec<&str> = file_path.split('/').collect();
1202 if path_parts.len() >= 2 {
1203 let dir_name = path_parts[path_parts.len() - 2];
1204 safe_identifiers
1205 .iter()
1206 .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
1207 } else {
1208 false
1209 }
1210 });
1211 } else {
1212 file_paths = exact_match_file_paths;
1213 }
1214 }
1215
1216 // Apply timestamp filtering
1217 file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
1218
1219 // Convert to full URIs
1220 for file_path in file_paths {
1221 let full_uri = self.reconstruct_full_uri(&file_path);
1222 files.push(full_uri);
1223 }
1224
1225 Ok(files)
1226 }
1227
1228 /// Finds the missing time intervals for a specific data type and instrument ID.
1229 ///
1230 /// This method compares a requested time range against the existing data coverage
1231 /// and returns the gaps that need to be filled. This is useful for determining
1232 /// what data needs to be fetched or backfilled.
1233 ///
1234 /// # Parameters
1235 ///
1236 /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
1237 /// - `end`: End timestamp of the requested range (Unix nanoseconds).
1238 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1239 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1240 ///
1241 /// # Returns
1242 ///
1243 /// Returns a vector of (start, end) tuples representing the missing intervals,
1244 /// or an error if the operation fails.
1245 ///
1246 /// # Errors
1247 ///
1248 /// Returns an error if:
1249 /// - The directory path cannot be constructed.
1250 /// - Interval retrieval fails.
1251 /// - Gap calculation fails.
1252 ///
1253 /// # Examples
1254 ///
1255 /// ```rust,no_run
1256 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1257 ///
1258 /// let catalog = ParquetDataCatalog::new(/* ... */);
1259 ///
1260 /// // Find missing intervals for quote data
1261 /// let missing = catalog.get_missing_intervals_for_request(
1262 /// 1609459200000000000, // start
1263 /// 1609545600000000000, // end
1264 /// "quotes",
1265 /// Some("BTCUSD".to_string())
1266 /// )?;
1267 ///
1268 /// for (start, end) in missing {
1269 /// println!("Missing data from {} to {}", start, end);
1270 /// }
1271 /// # Ok::<(), anyhow::Error>(())
1272 /// ```
1273 pub fn get_missing_intervals_for_request(
1274 &self,
1275 start: u64,
1276 end: u64,
1277 data_cls: &str,
1278 instrument_id: Option<String>,
1279 ) -> anyhow::Result<Vec<(u64, u64)>> {
1280 let intervals = self.get_intervals(data_cls, instrument_id)?;
1281
1282 Ok(query_interval_diff(start, end, &intervals))
1283 }
1284
1285 /// Gets the last (most recent) timestamp for a specific data type and instrument ID.
1286 ///
1287 /// This method finds the latest timestamp covered by existing data files for
1288 /// the specified data type and instrument. This is useful for determining
1289 /// the most recent data available or for incremental data updates.
1290 ///
1291 /// # Parameters
1292 ///
1293 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1294 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1295 ///
1296 /// # Returns
1297 ///
1298 /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
1299 /// or an error if the operation fails.
1300 ///
1301 /// # Errors
1302 ///
1303 /// Returns an error if:
1304 /// - The directory path cannot be constructed.
1305 /// - Interval retrieval fails.
1306 ///
1307 /// # Examples
1308 ///
1309 /// ```rust,no_run
1310 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1311 ///
1312 /// let catalog = ParquetDataCatalog::new(/* ... */);
1313 ///
1314 /// // Get the last timestamp for quote data
1315 /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
1316 /// println!("Last quote timestamp: {}", last_ts);
1317 /// } else {
1318 /// println!("No quote data found");
1319 /// }
1320 /// # Ok::<(), anyhow::Error>(())
1321 /// ```
1322 pub fn query_last_timestamp(
1323 &self,
1324 data_cls: &str,
1325 instrument_id: Option<String>,
1326 ) -> anyhow::Result<Option<u64>> {
1327 let intervals = self.get_intervals(data_cls, instrument_id)?;
1328
1329 if intervals.is_empty() {
1330 return Ok(None);
1331 }
1332
1333 Ok(Some(intervals.last().unwrap().1))
1334 }
1335
1336 /// Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
1337 ///
1338 /// This method returns all time intervals covered by existing data files for the
1339 /// specified data type and instrument. The intervals are sorted by start time and
1340 /// represent the complete data coverage available.
1341 ///
1342 /// # Parameters
1343 ///
1344 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1345 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
1346 ///
1347 /// # Returns
1348 ///
1349 /// Returns a vector of (start, end) tuples representing the covered intervals,
1350 /// sorted by start time, or an error if the operation fails.
1351 ///
1352 /// # Errors
1353 ///
1354 /// Returns an error if:
1355 /// - The directory path cannot be constructed.
1356 /// - Directory listing fails.
1357 /// - Filename parsing fails.
1358 ///
1359 /// # Examples
1360 ///
1361 /// ```rust,no_run
1362 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1363 ///
1364 /// let catalog = ParquetDataCatalog::new(/* ... */);
1365 ///
1366 /// // Get all intervals for quote data
1367 /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
1368 /// for (start, end) in intervals {
1369 /// println!("Data available from {} to {}", start, end);
1370 /// }
1371 /// # Ok::<(), anyhow::Error>(())
1372 /// ```
1373 pub fn get_intervals(
1374 &self,
1375 data_cls: &str,
1376 instrument_id: Option<String>,
1377 ) -> anyhow::Result<Vec<(u64, u64)>> {
1378 let directory = self.make_path(data_cls, instrument_id)?;
1379
1380 self.get_directory_intervals(&directory)
1381 }
1382
1383 /// Gets the time intervals covered by Parquet files in a specific directory.
1384 ///
1385 /// This method scans a directory for Parquet files and extracts the timestamp ranges
1386 /// from their filenames. It's used internally by other methods to determine data coverage
1387 /// and is essential for interval-based operations like gap detection and consolidation.
1388 ///
1389 /// # Parameters
1390 ///
1391 /// - `directory`: The directory path to scan for Parquet files.
1392 ///
1393 /// # Returns
1394 ///
1395 /// Returns a vector of (start, end) tuples representing the time intervals covered
1396 /// by files in the directory, sorted by start timestamp. Returns an empty vector
1397 /// if the directory doesn't exist or contains no valid Parquet files.
1398 ///
1399 /// # Errors
1400 ///
1401 /// Returns an error if:
1402 /// - Object store listing operations fail.
1403 /// - Directory access is denied.
1404 ///
1405 /// # Notes
1406 ///
1407 /// - Only files with valid timestamp-based filenames are included.
1408 /// - Files with unparsable names are silently ignored.
1409 /// - The method works with both local and remote object stores.
1410 /// - Results are automatically sorted by start timestamp.
1411 ///
1412 /// # Examples
1413 ///
1414 /// ```rust,no_run
1415 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1416 ///
1417 /// let catalog = ParquetDataCatalog::new(/* ... */);
1418 /// let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
1419 ///
1420 /// for (start, end) in intervals {
1421 /// println!("File covers {} to {}", start, end);
1422 /// }
1423 /// # Ok::<(), anyhow::Error>(())
1424 /// ```
1425 pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
1426 let mut intervals = Vec::new();
1427
1428 // Use object store for all operations
1429 let list_result = self.execute_async(async {
1430 let path = object_store::path::Path::from(directory);
1431 Ok(self
1432 .object_store
1433 .list(Some(&path))
1434 .collect::<Vec<_>>()
1435 .await)
1436 })?;
1437
1438 for result in list_result {
1439 match result {
1440 Ok(object) => {
1441 let path_str = object.location.to_string();
1442 if path_str.ends_with(".parquet")
1443 && let Some(interval) = parse_filename_timestamps(&path_str)
1444 {
1445 intervals.push(interval);
1446 }
1447 }
1448 Err(_) => {
1449 // Directory doesn't exist or is empty, which is fine
1450 break;
1451 }
1452 }
1453 }
1454
1455 intervals.sort_by_key(|&(start, _)| start);
1456
1457 Ok(intervals)
1458 }
1459
1460 /// Constructs a directory path for storing data of a specific type and instrument.
1461 ///
1462 /// This method builds the hierarchical directory structure used by the catalog to organize
1463 /// data by type and instrument. The path follows the pattern: `{base_path}/data/{type_name}/{instrument_id}`.
1464 /// Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
1465 ///
1466 /// # Parameters
1467 ///
1468 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1469 /// - `instrument_id`: Optional instrument ID. If provided, creates a subdirectory for the instrument.
1470 /// If `None`, returns the path to the data type directory.
1471 ///
1472 /// # Returns
1473 ///
1474 /// Returns the constructed directory path as a string, or an error if path construction fails.
1475 ///
1476 /// # Errors
1477 ///
1478 /// Returns an error if:
1479 /// - The instrument ID contains invalid characters that cannot be made URI-safe.
1480 /// - Path construction fails due to system limitations.
1481 ///
1482 /// # Path Structure
1483 ///
1484 /// - Without instrument ID: `{base_path}/data/{type_name}`.
1485 /// - With instrument ID: `{base_path}/data/{type_name}/{safe_instrument_id}`.
1486 /// - If `base_path` is empty: `data/{type_name}[/{safe_instrument_id}]`.
1487 ///
1488 /// # Examples
1489 ///
1490 /// ```rust,no_run
1491 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1492 ///
1493 /// let catalog = ParquetDataCatalog::new(/* ... */);
1494 ///
1495 /// // Path for all quote data
1496 /// let quotes_path = catalog.make_path("quotes", None)?;
1497 /// // Returns: "/base/path/data/quotes"
1498 ///
1499 /// // Path for specific instrument quotes
1500 /// let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
1501 /// // Returns: "/base/path/data/quotes/EURUSD" (slash removed)
1502 ///
1503 /// // Path for bar data with complex instrument ID
1504 /// let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
1505 /// // Returns: "/base/path/data/bars/BTCUSD-1H"
1506 /// # Ok::<(), anyhow::Error>(())
1507 /// ```
1508 pub fn make_path(
1509 &self,
1510 type_name: &str,
1511 instrument_id: Option<String>,
1512 ) -> anyhow::Result<String> {
1513 let mut components = vec!["data".to_string(), type_name.to_string()];
1514
1515 if let Some(id) = instrument_id {
1516 let safe_id = urisafe_instrument_id(&id);
1517 components.push(safe_id);
1518 }
1519
1520 let path = make_object_store_path_owned(&self.base_path, components);
1521 Ok(path)
1522 }
1523
1524 /// Helper method to rename a parquet file by moving it via object store operations
1525 fn rename_parquet_file(
1526 &self,
1527 directory: &str,
1528 old_start: u64,
1529 old_end: u64,
1530 new_start: u64,
1531 new_end: u64,
1532 ) -> anyhow::Result<()> {
1533 let old_filename =
1534 timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
1535 let old_path = format!("{directory}/{old_filename}");
1536 let old_object_path = self.to_object_path(&old_path);
1537
1538 let new_filename =
1539 timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
1540 let new_path = format!("{directory}/{new_filename}");
1541 let new_object_path = self.to_object_path(&new_path);
1542
1543 self.move_file(&old_object_path, &new_object_path)
1544 }
1545
1546 /// Converts a catalog path string to an [`ObjectPath`] for object store operations.
1547 ///
1548 /// This method handles the conversion between catalog-relative paths and object store paths,
1549 /// taking into account the catalog's base path configuration. It automatically strips the
1550 /// base path prefix when present to create the correct object store path.
1551 ///
1552 /// # Parameters
1553 ///
1554 /// - `path`: The catalog path string to convert. Can be absolute or relative.
1555 ///
1556 /// # Returns
1557 ///
1558 /// Returns an [`ObjectPath`] suitable for use with object store operations.
1559 ///
1560 /// # Path Handling
1561 ///
1562 /// - If `base_path` is empty, the path is used as-is.
1563 /// - If `base_path` is set, it's stripped from the path if present.
1564 /// - Trailing slashes and backslashes are automatically handled.
1565 /// - The resulting path is relative to the object store root.
1566 /// - All paths are normalized to use forward slashes (object store convention).
1567 ///
1568 /// # Examples
1569 ///
1570 /// ```rust,no_run
1571 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1572 ///
1573 /// let catalog = ParquetDataCatalog::new(/* ... */);
1574 ///
1575 /// // Convert a full catalog path
1576 /// let object_path = catalog.to_object_path("/base/data/quotes/file.parquet");
1577 /// // Returns: ObjectPath("data/quotes/file.parquet") if base_path is "/base"
1578 ///
1579 /// // Convert a relative path
1580 /// let object_path = catalog.to_object_path("data/trades/file.parquet");
1581 /// // Returns: ObjectPath("data/trades/file.parquet")
1582 /// ```
1583 #[must_use]
1584 pub fn to_object_path(&self, path: &str) -> ObjectPath {
1585 // Normalize path separators to forward slashes for object store
1586 let normalized_path = path.replace('\\', "/");
1587
1588 if self.base_path.is_empty() {
1589 return ObjectPath::from(normalized_path);
1590 }
1591
1592 // Normalize base path separators as well
1593 let normalized_base = self.base_path.replace('\\', "/");
1594 let base = normalized_base.trim_end_matches('/');
1595
1596 // Remove the catalog base prefix if present
1597 let without_base = normalized_path
1598 .strip_prefix(&format!("{base}/"))
1599 .or_else(|| normalized_path.strip_prefix(base))
1600 .unwrap_or(&normalized_path);
1601
1602 ObjectPath::from(without_base)
1603 }
1604
1605 /// Helper method to move a file using object store rename operation
1606 pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
1607 self.execute_async(async {
1608 self.object_store
1609 .rename(old_path, new_path)
1610 .await
1611 .map_err(anyhow::Error::from)
1612 })
1613 }
1614
1615 /// Helper method to execute async operations with a runtime
1616 pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
1617 where
1618 F: std::future::Future<Output = anyhow::Result<R>>,
1619 {
1620 let rt = get_runtime();
1621 rt.block_on(future)
1622 }
1623}
1624
1625/// Trait for providing catalog path prefixes for different data types.
1626///
1627/// This trait enables type-safe organization of data within the catalog by providing
1628/// a standardized way to determine the directory structure for each data type.
1629/// Each data type maps to a specific subdirectory within the catalog's data folder.
1630///
1631/// # Implementation
1632///
1633/// Types implementing this trait should return a static string that represents
1634/// the directory name where data of that type should be stored.
1635///
1636/// # Examples
1637///
1638/// ```rust
1639/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
1640/// use nautilus_model::data::QuoteTick;
1641///
1642/// assert_eq!(QuoteTick::path_prefix(), "quotes");
1643/// ```
1644pub trait CatalogPathPrefix {
1645 /// Returns the path prefix (directory name) for this data type.
1646 ///
1647 /// # Returns
1648 ///
1649 /// A static string representing the directory name where this data type is stored.
1650 fn path_prefix() -> &'static str;
1651}
1652
1653/// Macro for implementing [`CatalogPathPrefix`] for data types.
1654///
1655/// This macro provides a convenient way to implement the trait for multiple types
1656/// with their corresponding path prefixes.
1657///
1658/// # Parameters
1659///
1660/// - `$type`: The data type to implement the trait for.
1661/// - `$path`: The path prefix string for that type.
1662macro_rules! impl_catalog_path_prefix {
1663 ($type:ty, $path:expr) => {
1664 impl CatalogPathPrefix for $type {
1665 fn path_prefix() -> &'static str {
1666 $path
1667 }
1668 }
1669 };
1670}
1671
1672// Standard implementations for financial data types
1673impl_catalog_path_prefix!(QuoteTick, "quotes");
1674impl_catalog_path_prefix!(TradeTick, "trades");
1675impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
1676impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
1677impl_catalog_path_prefix!(Bar, "bars");
1678impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
1679impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
1680impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
1681
1682/// Converts timestamps to a filename using ISO 8601 format.
1683///
1684/// This function converts two Unix nanosecond timestamps to a filename that uses
1685/// ISO 8601 format with filesystem-safe characters. The format matches the Python
1686/// implementation for consistency.
1687///
1688/// # Parameters
1689///
1690/// - `timestamp_1`: First timestamp in Unix nanoseconds.
1691/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
1692///
1693/// # Returns
1694///
1695/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
1696///
1697/// # Examples
1698///
1699/// ```rust
1700/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
1701/// # use nautilus_core::UnixNanos;
1702/// let filename = timestamps_to_filename(
1703/// UnixNanos::from(1609459200000000000),
1704/// UnixNanos::from(1609545600000000000)
1705/// );
1706/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
1707/// ```
1708#[must_use]
1709pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
1710 let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
1711 let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
1712
1713 format!("{datetime_1}_{datetime_2}.parquet")
1714}
1715
1716/// Converts an ISO 8601 timestamp to a filesystem-safe format.
1717///
1718/// This function replaces colons and dots with hyphens to make the timestamp
1719/// safe for use in filenames across different filesystems.
1720///
1721/// # Parameters
1722///
1723/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1724///
1725/// # Returns
1726///
1727/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1728///
1729/// # Examples
1730///
1731/// ```rust
1732/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
1733/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
1734/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
1735/// ```
1736fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
1737 iso_timestamp.replace([':', '.'], "-")
1738}
1739
1740/// Converts a filesystem-safe timestamp back to ISO 8601 format.
1741///
1742/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
1743/// converting filesystem-safe timestamps back to standard ISO 8601 format.
1744///
1745/// # Parameters
1746///
1747/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
1748///
1749/// # Returns
1750///
1751/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1752///
1753/// # Examples
1754///
1755/// ```rust
1756/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
1757/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
1758/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
1759/// ```
1760fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
1761 let (date_part, time_part) = file_timestamp
1762 .split_once('T')
1763 .unwrap_or((file_timestamp, ""));
1764 let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
1765
1766 // Find the last hyphen to separate nanoseconds
1767 if let Some(last_hyphen_idx) = time_part.rfind('-') {
1768 let time_with_dot_for_nanos = format!(
1769 "{}.{}",
1770 &time_part[..last_hyphen_idx],
1771 &time_part[last_hyphen_idx + 1..]
1772 );
1773 let final_time_part = time_with_dot_for_nanos.replace('-', ":");
1774 format!("{date_part}T{final_time_part}Z")
1775 } else {
1776 // Fallback if no nanoseconds part found
1777 let final_time_part = time_part.replace('-', ":");
1778 format!("{date_part}T{final_time_part}Z")
1779 }
1780}
1781
1782/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
1783///
1784/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
1785/// It's used to convert parsed timestamps back to the internal representation.
1786///
1787/// # Parameters
1788///
1789/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
1790///
1791/// # Returns
1792///
1793/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
1794///
1795/// # Examples
1796///
1797/// ```rust
1798/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
1799/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
1800/// assert_eq!(nanos, 1609459200000000000);
1801/// ```
1802fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
1803 Ok(iso8601_to_unix_nanos(iso_timestamp.to_string())?.into())
1804}
1805
1806/// Converts an instrument ID to a URI-safe format by removing forward slashes.
1807///
1808/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
1809/// suitable for use in file paths. This function removes these characters to
1810/// create a safe directory name.
1811///
1812/// # Parameters
1813///
1814/// - `instrument_id`: The original instrument ID string.
1815///
1816/// # Returns
1817///
1818/// A URI-safe version of the instrument ID with forward slashes removed.
1819///
1820/// # Examples
1821///
1822/// ```rust
1823/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
1824/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
1825/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
1826/// ```
1827fn urisafe_instrument_id(instrument_id: &str) -> String {
1828 instrument_id.replace('/', "")
1829}
1830
1831/// Extracts the identifier from a file path.
1832///
1833/// The identifier is typically the second-to-last path component (directory name).
1834/// For example, from "`data/quote_tick/EURUSD/file.parquet`", extracts "EURUSD".
1835#[must_use]
1836pub fn extract_identifier_from_path(file_path: &str) -> String {
1837 let path_parts: Vec<&str> = file_path.split('/').collect();
1838 if path_parts.len() >= 2 {
1839 path_parts[path_parts.len() - 2].to_string()
1840 } else {
1841 "unknown".to_string()
1842 }
1843}
1844
1845/// Makes an identifier safe for use in SQL table names.
1846///
1847/// Removes forward slashes, replaces dots, hyphens, and spaces with underscores, and converts to lowercase.
1848#[must_use]
1849pub fn make_sql_safe_identifier(identifier: &str) -> String {
1850 urisafe_instrument_id(identifier)
1851 .replace(['.', '-', ' ', '%'], "_")
1852 .to_lowercase()
1853}
1854
1855/// Extracts the filename from a file path and makes it SQL-safe.
1856///
1857/// For example, from "data/quote_tick/EURUSD/2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
1858/// extracts "`2021_01_01t00_00_00_000000000z_2021_01_02t00_00_00_000000000z`".
1859#[must_use]
1860pub fn extract_sql_safe_filename(file_path: &str) -> String {
1861 if file_path.is_empty() {
1862 return "unknown_file".to_string();
1863 }
1864
1865 let filename = file_path.split('/').next_back().unwrap_or("unknown_file");
1866
1867 // Remove .parquet extension
1868 let name_without_ext = if let Some(dot_pos) = filename.rfind(".parquet") {
1869 &filename[..dot_pos]
1870 } else {
1871 filename
1872 };
1873
1874 // Remove characters that can pose problems: hyphens, colons, etc.
1875 name_without_ext
1876 .replace(['-', ':', '.'], "_")
1877 .to_lowercase()
1878}
1879
1880/// Creates a platform-appropriate local path using `PathBuf`.
1881///
1882/// This function constructs file system paths using the platform's native path separators.
1883/// Use this for local file operations that need to work with the actual file system.
1884///
1885/// # Arguments
1886///
1887/// * `base_path` - The base directory path
1888/// * `components` - Path components to join
1889///
1890/// # Returns
1891///
1892/// A `PathBuf` with platform-appropriate separators
1893///
1894/// # Examples
1895///
1896/// ```rust
1897/// # use nautilus_persistence::backend::catalog::make_local_path;
1898/// let path = make_local_path("/base", &["data", "quotes", "EURUSD"]);
1899/// // On Unix: "/base/data/quotes/EURUSD"
1900/// // On Windows: "\base\data\quotes\EURUSD"
1901/// ```
1902pub fn make_local_path<P: AsRef<Path>>(base_path: P, components: &[&str]) -> PathBuf {
1903 let mut path = PathBuf::from(base_path.as_ref());
1904 for component in components {
1905 path.push(component);
1906 }
1907 path
1908}
1909
1910/// Creates an object store path using forward slashes.
1911///
1912/// Object stores (S3, GCS, etc.) always expect forward slashes regardless of platform.
1913/// Use this when creating paths for object store operations.
1914///
1915/// # Arguments
1916///
1917/// * `base_path` - The base path (can be empty)
1918/// * `components` - Path components to join
1919///
1920/// # Returns
1921///
1922/// A string path with forward slash separators
1923///
1924/// # Examples
1925///
1926/// ```rust
1927/// # use nautilus_persistence::backend::catalog::make_object_store_path;
1928/// let path = make_object_store_path("base", &["data", "quotes", "EURUSD"]);
1929/// assert_eq!(path, "base/data/quotes/EURUSD");
1930/// ```
1931#[must_use]
1932pub fn make_object_store_path(base_path: &str, components: &[&str]) -> String {
1933 let mut parts = Vec::new();
1934
1935 if !base_path.is_empty() {
1936 let normalized_base = base_path
1937 .replace('\\', "/")
1938 .trim_end_matches('/')
1939 .to_string();
1940 if !normalized_base.is_empty() {
1941 parts.push(normalized_base);
1942 }
1943 }
1944
1945 for component in components {
1946 let normalized_component = component
1947 .replace('\\', "/")
1948 .trim_start_matches('/')
1949 .trim_end_matches('/')
1950 .to_string();
1951 if !normalized_component.is_empty() {
1952 parts.push(normalized_component);
1953 }
1954 }
1955
1956 parts.join("/")
1957}
1958
1959/// Creates an object store path using forward slashes with owned strings.
1960///
1961/// This variant accepts owned strings to avoid lifetime issues.
1962///
1963/// # Arguments
1964///
1965/// * `base_path` - The base path (can be empty)
1966/// * `components` - Path components to join (owned strings)
1967///
1968/// # Returns
1969///
1970/// A string path with forward slash separators
1971#[must_use]
1972pub fn make_object_store_path_owned(base_path: &str, components: Vec<String>) -> String {
1973 let mut parts = Vec::new();
1974
1975 if !base_path.is_empty() {
1976 let normalized_base = base_path
1977 .replace('\\', "/")
1978 .trim_end_matches('/')
1979 .to_string();
1980 if !normalized_base.is_empty() {
1981 parts.push(normalized_base);
1982 }
1983 }
1984
1985 for component in components {
1986 let normalized_component = component
1987 .replace('\\', "/")
1988 .trim_start_matches('/')
1989 .trim_end_matches('/')
1990 .to_string();
1991 if !normalized_component.is_empty() {
1992 parts.push(normalized_component);
1993 }
1994 }
1995
1996 parts.join("/")
1997}
1998
1999/// Converts a local `PathBuf` to an object store path string.
2000///
2001/// This function normalizes a local file system path to the forward-slash format
2002/// expected by object stores, handling platform differences.
2003///
2004/// # Arguments
2005///
2006/// * `local_path` - The local `PathBuf` to convert
2007///
2008/// # Returns
2009///
2010/// A string with forward slash separators suitable for object store operations
2011///
2012/// # Examples
2013///
2014/// ```rust
2015/// # use std::path::PathBuf;
2016/// # use nautilus_persistence::backend::catalog::local_to_object_store_path;
2017/// let local_path = PathBuf::from("data").join("quotes").join("EURUSD");
2018/// let object_path = local_to_object_store_path(&local_path);
2019/// assert_eq!(object_path, "data/quotes/EURUSD");
2020/// ```
2021#[must_use]
2022pub fn local_to_object_store_path(local_path: &Path) -> String {
2023 local_path.to_string_lossy().replace('\\', "/")
2024}
2025
2026/// Extracts path components using platform-appropriate path parsing.
2027///
2028/// This function safely parses a path into its components, handling both
2029/// local file system paths and object store paths correctly.
2030///
2031/// # Arguments
2032///
2033/// * `path_str` - The path string to parse
2034///
2035/// # Returns
2036///
2037/// A vector of path components
2038///
2039/// # Examples
2040///
2041/// ```rust
2042/// # use nautilus_persistence::backend::catalog::extract_path_components;
2043/// let components = extract_path_components("data/quotes/EURUSD");
2044/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2045///
2046/// // Works with both separators
2047/// let components = extract_path_components("data\\quotes\\EURUSD");
2048/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
2049/// ```
2050#[must_use]
2051pub fn extract_path_components(path_str: &str) -> Vec<String> {
2052 // Normalize separators and split
2053 let normalized = path_str.replace('\\', "/");
2054 normalized
2055 .split('/')
2056 .filter(|s| !s.is_empty())
2057 .map(ToString::to_string)
2058 .collect()
2059}
2060
2061/// Checks if a filename's timestamp range intersects with a query interval.
2062///
2063/// This function determines whether a Parquet file (identified by its timestamp-based
2064/// filename) contains data that falls within the specified query time range.
2065///
2066/// # Parameters
2067///
2068/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
2069/// - `start`: Optional start timestamp for the query range.
2070/// - `end`: Optional end timestamp for the query range.
2071///
2072/// # Returns
2073///
2074/// Returns `true` if the file's time range intersects with the query range,
2075/// `false` otherwise. Returns `true` if the filename cannot be parsed.
2076///
2077/// # Examples
2078///
2079/// ```rust
2080/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
2081/// // Example with ISO format filenames
2082/// assert!(query_intersects_filename(
2083/// "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
2084/// Some(1609459200000000000),
2085/// Some(1609545600000000000)
2086/// ));
2087/// ```
2088fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
2089 if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
2090 (start.is_none() || start.unwrap() <= file_end)
2091 && (end.is_none() || file_start <= end.unwrap())
2092 } else {
2093 true
2094 }
2095}
2096
2097/// Parses timestamps from a Parquet filename.
2098///
2099/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
2100/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
2101///
2102/// # Parameters
2103///
2104/// - `filename`: The filename to parse (can be a full path).
2105///
2106/// # Returns
2107///
2108/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
2109/// `None` otherwise.
2110///
2111/// # Examples
2112///
2113/// ```rust
2114/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
2115/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
2116/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
2117/// ```
2118#[must_use]
2119pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
2120 let path = Path::new(filename);
2121 let base_name = path.file_name()?.to_str()?;
2122 let base_filename = base_name.strip_suffix(".parquet")?;
2123 let (first_part, second_part) = base_filename.split_once('_')?;
2124
2125 let first_iso = file_timestamp_to_iso_timestamp(first_part);
2126 let second_iso = file_timestamp_to_iso_timestamp(second_part);
2127
2128 let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
2129 let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
2130
2131 Some((first_ts, second_ts))
2132}
2133
2134/// Checks if a list of closed integer intervals are all mutually disjoint.
2135///
2136/// Two intervals are disjoint if they do not overlap. This function validates that
2137/// all intervals in the list are non-overlapping, which is a requirement for
2138/// maintaining data integrity in the catalog.
2139///
2140/// # Parameters
2141///
2142/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2143///
2144/// # Returns
2145///
2146/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
2147/// Returns `true` for empty lists or lists with a single interval.
2148///
2149/// # Examples
2150///
2151/// ```rust
2152/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
2153/// // Disjoint intervals
2154/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
2155///
2156/// // Overlapping intervals
2157/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
2158/// ```
2159#[must_use]
2160pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
2161 let n = intervals.len();
2162
2163 if n <= 1 {
2164 return true;
2165 }
2166
2167 let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2168 sorted_intervals.sort_by_key(|&(start, _)| start);
2169
2170 for i in 0..(n - 1) {
2171 let (_, end1) = sorted_intervals[i];
2172 let (start2, _) = sorted_intervals[i + 1];
2173
2174 if end1 >= start2 {
2175 return false;
2176 }
2177 }
2178
2179 true
2180}
2181
2182/// Checks if intervals are contiguous (adjacent with no gaps).
2183///
2184/// Intervals are contiguous if, when sorted by start time, each interval's start
2185/// timestamp is exactly one more than the previous interval's end timestamp.
2186/// This ensures complete coverage of a time range with no gaps.
2187///
2188/// # Parameters
2189///
2190/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
2191///
2192/// # Returns
2193///
2194/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
2195/// Returns `true` for empty lists or lists with a single interval.
2196///
2197/// # Examples
2198///
2199/// ```rust
2200/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
2201/// // Contiguous intervals
2202/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
2203///
2204/// // Non-contiguous intervals (gap between 5 and 8)
2205/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
2206/// ```
2207#[must_use]
2208pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
2209 let n = intervals.len();
2210 if n <= 1 {
2211 return true;
2212 }
2213
2214 let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
2215 sorted_intervals.sort_by_key(|&(start, _)| start);
2216
2217 for i in 0..(n - 1) {
2218 let (_, end1) = sorted_intervals[i];
2219 let (start2, _) = sorted_intervals[i + 1];
2220
2221 if end1 + 1 != start2 {
2222 return false;
2223 }
2224 }
2225
2226 true
2227}
2228
2229/// Finds the parts of a query interval that are not covered by existing data intervals.
2230///
2231/// This function calculates the "gaps" in data coverage by comparing a requested
2232/// time range against the intervals covered by existing data files. It's used to
2233/// determine what data needs to be fetched or backfilled.
2234///
2235/// # Parameters
2236///
2237/// - `start`: Start timestamp of the query interval (inclusive).
2238/// - `end`: End timestamp of the query interval (inclusive).
2239/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
2240///
2241/// # Returns
2242///
2243/// Returns a vector of (start, end) tuples representing the gaps in coverage.
2244/// Returns an empty vector if the query range is invalid or fully covered.
2245///
2246/// # Examples
2247///
2248/// ```rust
2249/// # use nautilus_persistence::backend::catalog::query_interval_diff;
2250/// // Query 1-100, have data for 10-30 and 60-80
2251/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
2252/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
2253/// ```
2254fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
2255 if start > end {
2256 return Vec::new();
2257 }
2258
2259 let interval_set = get_interval_set(closed_intervals);
2260 let query_range = (Bound::Included(start), Bound::Included(end));
2261 let query_diff = interval_set.get_interval_difference(&query_range);
2262 let mut result: Vec<(u64, u64)> = Vec::new();
2263
2264 for interval in query_diff {
2265 if let Some(tuple) = interval_to_tuple(interval, start, end) {
2266 result.push(tuple);
2267 }
2268 }
2269
2270 result
2271}
2272
2273/// Creates an interval tree from closed integer intervals.
2274///
2275/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
2276/// for use with the interval tree data structure, which is used for efficient
2277/// interval operations and gap detection.
2278///
2279/// # Parameters
2280///
2281/// - `intervals`: A slice of closed intervals as (start, end) tuples.
2282///
2283/// # Returns
2284///
2285/// Returns an [`IntervalTree`] containing the converted intervals.
2286///
2287/// # Notes
2288///
2289/// - Invalid intervals (where start > end) are skipped.
2290/// - Uses saturating addition to prevent overflow when converting to half-open intervals.
2291fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
2292 let mut tree = IntervalTree::default();
2293
2294 if intervals.is_empty() {
2295 return tree;
2296 }
2297
2298 for &(start, end) in intervals {
2299 if start > end {
2300 continue;
2301 }
2302
2303 tree.insert((
2304 Bound::Included(start),
2305 Bound::Excluded(end.saturating_add(1)),
2306 ));
2307 }
2308
2309 tree
2310}
2311
2312/// Converts an interval tree result back to a closed interval tuple.
2313///
2314/// This helper function converts the bounded interval representation used by
2315/// the interval tree back into the (start, end) tuple format used throughout
2316/// the catalog.
2317///
2318/// # Parameters
2319///
2320/// - `interval`: The bounded interval from the interval tree.
2321/// - `query_start`: The start of the original query range.
2322/// - `query_end`: The end of the original query range.
2323///
2324/// # Returns
2325///
2326/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
2327fn interval_to_tuple(
2328 interval: (Bound<&u64>, Bound<&u64>),
2329 query_start: u64,
2330 query_end: u64,
2331) -> Option<(u64, u64)> {
2332 let (bound_start, bound_end) = interval;
2333
2334 let start = match bound_start {
2335 Bound::Included(val) => *val,
2336 Bound::Excluded(val) => val.saturating_add(1),
2337 Bound::Unbounded => query_start,
2338 };
2339
2340 let end = match bound_end {
2341 Bound::Included(val) => *val,
2342 Bound::Excluded(val) => {
2343 if *val == 0 {
2344 return None; // Empty interval
2345 }
2346 val - 1
2347 }
2348 Bound::Unbounded => query_end,
2349 };
2350
2351 if start <= end {
2352 Some((start, end))
2353 } else {
2354 None
2355 }
2356}