pub struct ParquetDataCatalog {
pub base_path: String,
pub original_uri: String,
pub object_store: Arc<dyn ObjectStore>,
pub session: DataBackendSession,
pub batch_size: usize,
pub compression: Compression,
pub max_row_group_size: usize,
}
Expand description
A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
The ParquetDataCatalog
provides a comprehensive solution for managing large volumes of financial
market data with efficient storage, querying, and consolidation capabilities. It supports various
object store backends including local filesystems, AWS S3, and other cloud storage providers.
§Features
- Efficient Storage: Uses Apache Parquet format with configurable compression.
- Object Store Backend: Supports multiple storage backends through the
object_store
crate. - Time-based Organization: Organizes data by timestamp ranges for optimal query performance.
- Data Validation: Ensures timestamp ordering and interval consistency.
- Consolidation: Merges multiple files to reduce storage overhead and improve query speed.
- Type Safety: Strongly typed data handling with compile-time guarantees.
§Data Organization
Data is organized hierarchically by data type and instrument:
data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet
.- Files are named with their timestamp ranges for efficient range queries.
- Intervals are validated to be disjoint to prevent data overlap.
§Performance Considerations
- Batch Size: Controls memory usage during data processing.
- Compression: SNAPPY compression provides good balance of speed and size.
- Row Group Size: Affects query performance and memory usage.
- File Consolidation: Reduces the number of files for better query performance.
Fields§
§base_path: String
The base path for data storage within the object store.
original_uri: String
The original URI provided when creating the catalog.
object_store: Arc<dyn ObjectStore>
The object store backend for data persistence.
session: DataBackendSession
The DataFusion session for query execution.
batch_size: usize
The number of records to process in each batch.
compression: Compression
The compression algorithm used for Parquet files.
max_row_group_size: usize
The maximum number of rows in each Parquet row group.
Implementations§
Source§impl ParquetDataCatalog
impl ParquetDataCatalog
Sourcepub fn new(
base_path: PathBuf,
storage_options: Option<HashMap<String, String>>,
batch_size: Option<usize>,
compression: Option<Compression>,
max_row_group_size: Option<usize>,
) -> Self
pub fn new( base_path: PathBuf, storage_options: Option<HashMap<String, String>>, batch_size: Option<usize>, compression: Option<Compression>, max_row_group_size: Option<usize>, ) -> Self
Creates a new ParquetDataCatalog
instance from a local file path.
This is a convenience constructor that converts a local path to a URI format
and delegates to Self::from_uri
.
§Parameters
base_path
: The base directory path for data storage.storage_options
: OptionalHashMap
containing storage-specific configuration options.batch_size
: Number of records to process in each batch (default: 5000).compression
: Parquet compression algorithm (default: SNAPPY).max_row_group_size
: Maximum rows per Parquet row group (default: 5000).
§Panics
Panics if the path cannot be converted to a valid URI or if the object store cannot be created from the path.
§Examples
use std::path::PathBuf;
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(
PathBuf::from("/tmp/nautilus_data"),
None, // no storage options
Some(1000), // smaller batch size
None, // default compression
None, // default row group size
);
Sourcepub fn from_uri(
uri: &str,
storage_options: Option<HashMap<String, String>>,
batch_size: Option<usize>,
compression: Option<Compression>,
max_row_group_size: Option<usize>,
) -> Result<Self>
pub fn from_uri( uri: &str, storage_options: Option<HashMap<String, String>>, batch_size: Option<usize>, compression: Option<Compression>, max_row_group_size: Option<usize>, ) -> Result<Self>
Creates a new ParquetDataCatalog
instance from a URI with optional storage options.
Supports various URI schemes including local file paths and multiple cloud storage backends
supported by the object_store
crate.
§Supported URI Schemes
- AWS S3:
s3://bucket/path
. - Google Cloud Storage:
gs://bucket/path
orgcs://bucket/path
. - Azure Blob Storage:
azure://account/container/path
orabfs://container@account.dfs.core.windows.net/path
. - HTTP/WebDAV:
http://
orhttps://
. - Local files:
file://path
or plain paths.
§Parameters
uri
: The URI for the data storage location.storage_options
: OptionalHashMap
containing storage-specific configuration options:- For S3:
endpoint_url
, region,access_key_id
,secret_access_key
,session_token
, etc. - For GCS:
service_account_path
,service_account_key
,project_id
, etc. - For Azure:
account_name
,account_key
,sas_token
, etc.
- For S3:
batch_size
: Number of records to process in each batch (default: 5000).compression
: Parquet compression algorithm (default: SNAPPY).max_row_group_size
: Maximum rows per Parquet row group (default: 5000).
§Errors
Returns an error if:
- The URI format is invalid or unsupported.
- The object store cannot be created or accessed.
- Authentication fails for cloud storage backends.
§Examples
use std::collections::HashMap;
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
// Local filesystem
let local_catalog = ParquetDataCatalog::from_uri(
"/tmp/nautilus_data",
None, None, None, None
)?;
// S3 bucket
let s3_catalog = ParquetDataCatalog::from_uri(
"s3://my-bucket/nautilus-data",
None, None, None, None
)?;
// Google Cloud Storage
let gcs_catalog = ParquetDataCatalog::from_uri(
"gs://my-bucket/nautilus-data",
None, None, None, None
)?;
// Azure Blob Storage
let azure_catalog = ParquetDataCatalog::from_uri(
"azure://account/container/nautilus-data",
None, None, None, None
)?;
// S3 with custom endpoint and credentials
let mut storage_options = HashMap::new();
storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
storage_options.insert("access_key_id".to_string(), "my-key".to_string());
storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
let s3_catalog = ParquetDataCatalog::from_uri(
"s3://my-bucket/nautilus-data",
Some(storage_options),
None, None, None,
)?;
Sourcepub fn get_base_path(&self) -> String
pub fn get_base_path(&self) -> String
Returns the base path of the catalog for testing purposes.
Sourcepub fn write_data_enum(
&self,
data: Vec<Data>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> Result<()>
pub fn write_data_enum( &self, data: Vec<Data>, start: Option<UnixNanos>, end: Option<UnixNanos>, ) -> Result<()>
Writes mixed data types to the catalog by separating them into type-specific collections.
This method takes a heterogeneous collection of market data and separates it by type, then writes each type to its appropriate location in the catalog. This is useful when processing mixed data streams or bulk data imports.
§Parameters
data
: A vector of mixed [Data
] enum variants.start
: Optional start timestamp to override the data’s natural range.end
: Optional end timestamp to override the data’s natural range.
§Notes
- Data is automatically sorted by type before writing.
- Each data type is written to its own directory structure.
- Instrument data handling is not yet implemented (TODO).
§Examples
use nautilus_model::data::Data;
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
let mixed_data: Vec<Data> = vec![/* mixed data types */];
catalog.write_data_enum(mixed_data, None, None)?;
Sourcepub fn write_to_parquet<T>(
&self,
data: Vec<T>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
skip_disjoint_check: Option<bool>,
) -> Result<PathBuf>where
T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
pub fn write_to_parquet<T>(
&self,
data: Vec<T>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
skip_disjoint_check: Option<bool>,
) -> Result<PathBuf>where
T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
Writes typed data to a Parquet file in the catalog.
This is the core method for persisting market data to the catalog. It handles data validation, batching, compression, and ensures proper file organization with timestamp-based naming.
§Type Parameters
T
: The data type to write, must implement required traits for serialization and cataloging.
§Parameters
data
: Vector of data records to write (must be in ascending timestamp order).start
: Optional start timestamp to override the natural data range.end
: Optional end timestamp to override the natural data range.
§Returns
Returns the PathBuf
of the created file, or an empty path if no data was provided.
§Errors
This function will return an error if:
- Data serialization to Arrow record batches fails.
- Object store write operations fail.
- File path construction fails.
- Timestamp interval validation fails after writing.
§Panics
Panics if:
- Data timestamps are not in ascending order.
- Record batches are empty after conversion.
- Required metadata is missing from the schema.
§Examples
use nautilus_model::data::QuoteTick;
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
let quotes: Vec<QuoteTick> = vec![/* quote data */];
let path = catalog.write_to_parquet(quotes, None, None)?;
println!("Data written to: {:?}", path);
Sourcepub fn write_to_json<T>(
&self,
data: Vec<T>,
path: Option<PathBuf>,
write_metadata: bool,
) -> Result<PathBuf>where
T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
pub fn write_to_json<T>(
&self,
data: Vec<T>,
path: Option<PathBuf>,
write_metadata: bool,
) -> Result<PathBuf>where
T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
Writes typed data to a JSON file in the catalog.
This method provides an alternative to Parquet format for data export and debugging. JSON files are human-readable but less efficient for large datasets.
§Type Parameters
T
: The data type to write, must implement serialization and cataloging traits.
§Parameters
data
: Vector of data records to write (must be in ascending timestamp order).path
: Optional custom directory path (defaults to catalog’s standard structure).write_metadata
: Whether to write a separate metadata file alongside the data.
§Returns
Returns the PathBuf
of the created JSON file.
§Errors
This function will return an error if:
- JSON serialization fails.
- Object store write operations fail.
- File path construction fails.
§Panics
Panics if data timestamps are not in ascending order.
§Examples
use std::path::PathBuf;
use nautilus_model::data::TradeTick;
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
let trades: Vec<TradeTick> = vec![/* trade data */];
let path = catalog.write_to_json(
trades,
Some(PathBuf::from("/custom/path")),
true // write metadata
)?;
Sourcepub fn data_to_record_batches<T>(
&self,
data: Vec<T>,
) -> Result<Vec<RecordBatch>>where
T: HasTsInit + EncodeToRecordBatch,
pub fn data_to_record_batches<T>(
&self,
data: Vec<T>,
) -> Result<Vec<RecordBatch>>where
T: HasTsInit + EncodeToRecordBatch,
Converts data into Arrow record batches for Parquet serialization.
This method chunks the data according to the configured batch size and converts each chunk into an Arrow record batch with appropriate metadata.
§Type Parameters
T
: The data type to convert, must implement required encoding traits.
§Parameters
data
: Vector of data records to convert.
§Returns
Returns a vector of Arrow [RecordBatch
] instances ready for Parquet serialization.
§Errors
Returns an error if record batch encoding fails for any chunk.
Sourcepub fn extend_file_name(
&self,
data_cls: &str,
instrument_id: Option<String>,
start: UnixNanos,
end: UnixNanos,
) -> Result<()>
pub fn extend_file_name( &self, data_cls: &str, instrument_id: Option<String>, start: UnixNanos, end: UnixNanos, ) -> Result<()>
Extends the timestamp range of an existing Parquet file by renaming it.
This method finds an existing file that is adjacent to the specified time range and renames it to include the new range. This is useful when appending data that extends the time coverage of existing files.
§Parameters
data_cls
: The data type directory name (e.g., “quotes”, “trades”).instrument_id
: Optional instrument ID to target a specific instrument’s data.start
: Start timestamp of the new range to extend to.end
: End timestamp of the new range to extend to.
§Returns
Returns Ok(())
on success, or an error if the operation fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- No adjacent file is found to extend.
- File rename operations fail.
- Interval validation fails after extension.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let catalog = ParquetDataCatalog::new(/* ... */);
// Extend a file's range backwards or forwards
catalog.extend_file_name(
"quotes",
Some("BTCUSD".to_string()),
UnixNanos::from(1609459200000000000),
UnixNanos::from(1609545600000000000)
)?;
Sourcepub fn list_parquet_files(&self, directory: &str) -> Result<Vec<String>>
pub fn list_parquet_files(&self, directory: &str) -> Result<Vec<String>>
Lists all Parquet files in a specified directory.
This method scans a directory and returns the full paths of all files with the .parquet
extension. It works with both local filesystems and remote object stores, making it
suitable for various storage backends.
§Parameters
directory
: The directory path to scan for Parquet files.
§Returns
Returns a vector of full file paths (as strings) for all Parquet files found in the directory. The paths are relative to the object store root and suitable for use with object store operations. Returns an empty vector if the directory doesn’t exist or contains no Parquet files.
§Errors
This function will return an error if:
- Object store listing operations fail.
- Directory access is denied.
- Network issues occur (for remote object stores).
§Notes
- Only files ending with
.parquet
are included. - Subdirectories are not recursively scanned.
- File paths are returned in the order provided by the object store.
- Works with all supported object store backends (local, S3, GCS, Azure, etc.).
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
for file in files {
println!("Found Parquet file: {}", file);
}
Sourcepub fn reconstruct_full_uri(&self, path_str: &str) -> String
pub fn reconstruct_full_uri(&self, path_str: &str) -> String
Helper method to reconstruct full URI for remote object store paths
Sourcepub fn is_remote_uri(&self) -> bool
pub fn is_remote_uri(&self) -> bool
Helper method to check if the original URI uses a remote object store scheme
Sourcepub fn query<T>(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
files: Option<Vec<String>>,
) -> Result<QueryResult>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix,
pub fn query<T>(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
files: Option<Vec<String>>,
) -> Result<QueryResult>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix,
Executes a query against the catalog to retrieve market data of a specific type.
This is the primary method for querying data from the catalog. It registers the appropriate object store with the DataFusion session, finds all relevant Parquet files, and executes the query across them. The method supports filtering by instrument IDs, time ranges, and custom SQL WHERE clauses.
§Type Parameters
T
: The data type to query, must implement required traits for deserialization and cataloging.
§Parameters
instrument_ids
: Optional list of instrument IDs to filter by. IfNone
, queries all instruments.start
: Optional start timestamp for filtering (inclusive). IfNone
, queries from the beginning.end
: Optional end timestamp for filtering (inclusive). IfNone
, queries to the end.where_clause
: Optional SQL WHERE clause for additional filtering (e.g., “price > 100”).
§Returns
Returns a QueryResult
containing the query execution context and data.
Use QueryResult::collect()
to retrieve the actual data records.
§Errors
This function will return an error if:
- Object store registration fails for remote URIs.
- File discovery fails.
- DataFusion query execution fails.
- Data deserialization fails.
§Performance Notes
- Files are automatically filtered by timestamp ranges before querying.
- DataFusion optimizes queries across multiple Parquet files.
- Use specific instrument IDs and time ranges to improve performance.
- WHERE clauses are pushed down to the Parquet reader when possible.
§Examples
use nautilus_model::data::QuoteTick;
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let mut catalog = ParquetDataCatalog::new(/* ... */);
// Query all quote data
let result = catalog.query::<QuoteTick>(None, None, None, None)?;
let quotes = result.collect();
// Query specific instruments within a time range
let result = catalog.query::<QuoteTick>(
Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000)),
None
)?;
// Query with custom WHERE clause
let result = catalog.query::<QuoteTick>(
Some(vec!["EURUSD".to_string()]),
None,
None,
Some("bid_price > 1.2000")
)?;
Sourcepub fn query_typed_data<T>(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
files: Option<Vec<String>>,
) -> Result<Vec<T>>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
pub fn query_typed_data<T>(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
files: Option<Vec<String>>,
) -> Result<Vec<T>>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
Queries typed data from the catalog and returns results as a strongly-typed vector.
This is a convenience method that wraps the generic query
method and automatically
collects and converts the results into a vector of the specific data type. It handles
the type conversion from the generic [Data
] enum to the concrete type T
.
§Type Parameters
T
: The specific data type to query and return. Must implement required traits for deserialization, cataloging, and conversion from the [Data
] enum.
§Parameters
instrument_ids
: Optional list of instrument IDs to filter by. IfNone
, queries all instruments. For exact matches, provide the full instrument ID. For bars, partial matches are supported.start
: Optional start timestamp for filtering (inclusive). IfNone
, queries from the beginning.end
: Optional end timestamp for filtering (inclusive). IfNone
, queries to the end.where_clause
: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax with column names matching the Parquet schema (e.g., “bid_price
> 1.2000”, “volume > 1000”).
§Returns
Returns a vector of the specific data type T
, sorted by timestamp. The vector will be
empty if no data matches the query criteria.
§Errors
This function will return an error if:
- The underlying query execution fails.
- Data type conversion fails.
- Object store access fails.
- Invalid WHERE clause syntax is provided.
§Performance Considerations
- Use specific instrument IDs and time ranges to minimize data scanning.
- WHERE clauses are pushed down to Parquet readers when possible.
- Results are automatically sorted by timestamp during collection.
- Memory usage scales with the amount of data returned.
§Examples
use nautilus_model::data::{QuoteTick, TradeTick, Bar};
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let mut catalog = ParquetDataCatalog::new(/* ... */);
// Query all quotes for a specific instrument
let quotes: Vec<QuoteTick> = catalog.query_typed_data(
Some(vec!["EURUSD".to_string()]),
None,
None,
None
)?;
// Query trades within a specific time range
let trades: Vec<TradeTick> = catalog.query_typed_data(
Some(vec!["BTCUSD".to_string()]),
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000)),
None
)?;
// Query bars with volume filter
let bars: Vec<Bar> = catalog.query_typed_data(
Some(vec!["AAPL".to_string()]),
None,
None,
Some("volume > 1000000")
)?;
// Query multiple instruments with price filter
let quotes: Vec<QuoteTick> = catalog.query_typed_data(
Some(vec!["EURUSD".to_string(), "GBPUSD".to_string()]),
None,
None,
Some("bid_price > 1.2000 AND ask_price < 1.3000")
)?;
Sourcepub fn query_files(
&self,
data_cls: &str,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> Result<Vec<String>>
pub fn query_files( &self, data_cls: &str, instrument_ids: Option<Vec<String>>, start: Option<UnixNanos>, end: Option<UnixNanos>, ) -> Result<Vec<String>>
Queries all Parquet files for a specific data type and optional instrument IDs.
This method finds all Parquet files that match the specified criteria and returns their full URIs. The files are filtered by data type, instrument IDs (if provided), and timestamp range (if provided).
§Parameters
data_cls
: The data type directory name (e.g., “quotes”, “trades”).instrument_ids
: Optional list of instrument IDs to filter by.start
: Optional start timestamp to filter files by their time range.end
: Optional end timestamp to filter files by their time range.
§Returns
Returns a vector of file URI strings that match the query criteria, or an error if the query fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- Object store listing operations fail.
- URI reconstruction fails.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let catalog = ParquetDataCatalog::new(/* ... */);
// Query all quote files
let files = catalog.query_files("quotes", None, None, None)?;
// Query trade files for specific instruments within a time range
let files = catalog.query_files(
"trades",
Some(vec!["BTCUSD".to_string(), "ETHUSD".to_string()]),
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000))
)?;
Sourcepub fn get_missing_intervals_for_request(
&self,
start: u64,
end: u64,
data_cls: &str,
instrument_id: Option<String>,
) -> Result<Vec<(u64, u64)>>
pub fn get_missing_intervals_for_request( &self, start: u64, end: u64, data_cls: &str, instrument_id: Option<String>, ) -> Result<Vec<(u64, u64)>>
Finds the missing time intervals for a specific data type and instrument ID.
This method compares a requested time range against the existing data coverage and returns the gaps that need to be filled. This is useful for determining what data needs to be fetched or backfilled.
§Parameters
start
: Start timestamp of the requested range (Unix nanoseconds).end
: End timestamp of the requested range (Unix nanoseconds).data_cls
: The data type directory name (e.g., “quotes”, “trades”).instrument_id
: Optional instrument ID to target a specific instrument’s data.
§Returns
Returns a vector of (start, end) tuples representing the missing intervals, or an error if the operation fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- Interval retrieval fails.
- Gap calculation fails.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
// Find missing intervals for quote data
let missing = catalog.get_missing_intervals_for_request(
1609459200000000000, // start
1609545600000000000, // end
"quotes",
Some("BTCUSD".to_string())
)?;
for (start, end) in missing {
println!("Missing data from {} to {}", start, end);
}
Sourcepub fn query_last_timestamp(
&self,
data_cls: &str,
instrument_id: Option<String>,
) -> Result<Option<u64>>
pub fn query_last_timestamp( &self, data_cls: &str, instrument_id: Option<String>, ) -> Result<Option<u64>>
Gets the last (most recent) timestamp for a specific data type and instrument ID.
This method finds the latest timestamp covered by existing data files for the specified data type and instrument. This is useful for determining the most recent data available or for incremental data updates.
§Parameters
data_cls
: The data type directory name (e.g., “quotes”, “trades”).instrument_id
: Optional instrument ID to target a specific instrument’s data.
§Returns
Returns Some(timestamp)
if data exists, None
if no data is found,
or an error if the operation fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- Interval retrieval fails.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
// Get the last timestamp for quote data
if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
println!("Last quote timestamp: {}", last_ts);
} else {
println!("No quote data found");
}
Sourcepub fn get_intervals(
&self,
data_cls: &str,
instrument_id: Option<String>,
) -> Result<Vec<(u64, u64)>>
pub fn get_intervals( &self, data_cls: &str, instrument_id: Option<String>, ) -> Result<Vec<(u64, u64)>>
Gets the time intervals covered by Parquet files for a specific data type and instrument ID.
This method returns all time intervals covered by existing data files for the specified data type and instrument. The intervals are sorted by start time and represent the complete data coverage available.
§Parameters
data_cls
: The data type directory name (e.g., “quotes”, “trades”).instrument_id
: Optional instrument ID to target a specific instrument’s data.
§Returns
Returns a vector of (start, end) tuples representing the covered intervals, sorted by start time, or an error if the operation fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- Directory listing fails.
- Filename parsing fails.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
// Get all intervals for quote data
let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
for (start, end) in intervals {
println!("Data available from {} to {}", start, end);
}
Sourcepub fn get_directory_intervals(
&self,
directory: &str,
) -> Result<Vec<(u64, u64)>>
pub fn get_directory_intervals( &self, directory: &str, ) -> Result<Vec<(u64, u64)>>
Gets the time intervals covered by Parquet files in a specific directory.
This method scans a directory for Parquet files and extracts the timestamp ranges from their filenames. It’s used internally by other methods to determine data coverage and is essential for interval-based operations like gap detection and consolidation.
§Parameters
directory
: The directory path to scan for Parquet files.
§Returns
Returns a vector of (start, end) tuples representing the time intervals covered by files in the directory, sorted by start timestamp. Returns an empty vector if the directory doesn’t exist or contains no valid Parquet files.
§Errors
This function will return an error if:
- Object store listing operations fail.
- Directory access is denied.
§Notes
- Only files with valid timestamp-based filenames are included.
- Files with unparseable names are silently ignored.
- The method works with both local and remote object stores.
- Results are automatically sorted by start timestamp.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
for (start, end) in intervals {
println!("File covers {} to {}", start, end);
}
Sourcepub fn make_path(
&self,
type_name: &str,
instrument_id: Option<String>,
) -> Result<String>
pub fn make_path( &self, type_name: &str, instrument_id: Option<String>, ) -> Result<String>
Constructs a directory path for storing data of a specific type and instrument.
This method builds the hierarchical directory structure used by the catalog to organize
data by type and instrument. The path follows the pattern: {base_path}/data/{type_name}/{instrument_id}
.
Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
§Parameters
type_name
: The data type directory name (e.g., “quotes”, “trades”, “bars”).instrument_id
: Optional instrument ID. If provided, creates a subdirectory for the instrument. IfNone
, returns the path to the data type directory.
§Returns
Returns the constructed directory path as a string, or an error if path construction fails.
§Errors
This function will return an error if:
- The instrument ID contains invalid characters that cannot be made URI-safe.
- Path construction fails due to system limitations.
§Path Structure
- Without instrument ID:
{base_path}/data/{type_name}
. - With instrument ID:
{base_path}/data/{type_name}/{safe_instrument_id}
. - If
base_path
is empty:data/{type_name}[/{safe_instrument_id}]
.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
// Path for all quote data
let quotes_path = catalog.make_path("quotes", None)?;
// Returns: "/base/path/data/quotes"
// Path for specific instrument quotes
let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
// Returns: "/base/path/data/quotes/EURUSD" (slash removed)
// Path for bar data with complex instrument ID
let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
// Returns: "/base/path/data/bars/BTCUSD-1H"
Sourcepub fn to_object_path(&self, path: &str) -> ObjectPath
pub fn to_object_path(&self, path: &str) -> ObjectPath
Converts a catalog path string to an [ObjectPath
] for object store operations.
This method handles the conversion between catalog-relative paths and object store paths, taking into account the catalog’s base path configuration. It automatically strips the base path prefix when present to create the correct object store path.
§Parameters
path
: The catalog path string to convert. Can be absolute or relative.
§Returns
Returns an [ObjectPath
] suitable for use with object store operations.
§Path Handling
- If
base_path
is empty, the path is used as-is. - If
base_path
is set, it’s stripped from the path if present. - Trailing slashes are automatically handled.
- The resulting path is relative to the object store root.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
// Convert a full catalog path
let object_path = catalog.to_object_path("/base/data/quotes/file.parquet");
// Returns: ObjectPath("data/quotes/file.parquet") if base_path is "/base"
// Convert a relative path
let object_path = catalog.to_object_path("data/trades/file.parquet");
// Returns: ObjectPath("data/trades/file.parquet")
Sourcepub fn move_file(
&self,
old_path: &ObjectPath,
new_path: &ObjectPath,
) -> Result<()>
pub fn move_file( &self, old_path: &ObjectPath, new_path: &ObjectPath, ) -> Result<()>
Helper method to move a file using object store rename operation
Sourcepub fn execute_async<F, R>(&self, future: F) -> Result<R>
pub fn execute_async<F, R>(&self, future: F) -> Result<R>
Helper method to execute async operations with a runtime
Source§impl ParquetDataCatalog
impl ParquetDataCatalog
Sourcepub fn consolidate_catalog(
&self,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
ensure_contiguous_files: Option<bool>,
) -> Result<()>
pub fn consolidate_catalog( &self, start: Option<UnixNanos>, end: Option<UnixNanos>, ensure_contiguous_files: Option<bool>, ) -> Result<()>
Consolidates all data files in the catalog.
This method identifies all leaf directories in the catalog that contain parquet files
and consolidates them. A leaf directory is one that contains files but no subdirectories.
This is a convenience method that effectively calls consolidate_data
for all data types
and instrument IDs in the catalog.
§Parameters
start
: Optional start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.end
: Optional end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.ensure_contiguous_files
: Whether to validate that consolidated intervals are contiguous (default: true).
§Returns
Returns Ok(())
on success, or an error if consolidation fails for any directory.
§Errors
This function will return an error if:
- Directory listing fails.
- File consolidation operations fail.
- Interval validation fails (when
ensure_contiguous_files
is true).
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let catalog = ParquetDataCatalog::new(/* ... */);
// Consolidate all files in the catalog
catalog.consolidate_catalog(None, None, None)?;
// Consolidate only files within a specific time range
catalog.consolidate_catalog(
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000)),
Some(true)
)?;
Sourcepub fn consolidate_data(
&self,
type_name: &str,
instrument_id: Option<String>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
ensure_contiguous_files: Option<bool>,
) -> Result<()>
pub fn consolidate_data( &self, type_name: &str, instrument_id: Option<String>, start: Option<UnixNanos>, end: Option<UnixNanos>, ensure_contiguous_files: Option<bool>, ) -> Result<()>
Consolidates data files for a specific data type and instrument.
This method consolidates Parquet files within a specific directory (defined by data type and optional instrument ID) by merging multiple files into a single file. This improves query performance and can reduce storage overhead.
§Parameters
type_name
: The data type directory name (e.g., “quotes”, “trades”, “bars”).instrument_id
: Optional instrument ID to target a specific instrument’s data.start
: Optional start timestamp to limit consolidation to files within this range.end
: Optional end timestamp to limit consolidation to files within this range.ensure_contiguous_files
: Whether to validate that consolidated intervals are contiguous (default: true).
§Returns
Returns Ok(())
on success, or an error if consolidation fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- File consolidation operations fail.
- Interval validation fails (when
ensure_contiguous_files
is true).
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let catalog = ParquetDataCatalog::new(/* ... */);
// Consolidate all quote files for a specific instrument
catalog.consolidate_data(
"quotes",
Some("BTCUSD".to_string()),
None,
None,
None
)?;
// Consolidate trade files within a time range
catalog.consolidate_data(
"trades",
None,
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000)),
Some(true)
)?;
Sourcepub fn consolidate_catalog_by_period(
&mut self,
period_nanos: Option<u64>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
ensure_contiguous_files: Option<bool>,
) -> Result<()>
pub fn consolidate_catalog_by_period( &mut self, period_nanos: Option<u64>, start: Option<UnixNanos>, end: Option<UnixNanos>, ensure_contiguous_files: Option<bool>, ) -> Result<()>
Consolidates all data files in the catalog by splitting them into fixed time periods.
This method identifies all leaf directories in the catalog that contain parquet files
and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
This is a convenience method that effectively calls consolidate_data_by_period
for all data types
and instrument IDs in the catalog.
§Parameters
period_nanos
: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000). Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)start
: Optional start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.end
: Optional end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.ensure_contiguous_files
: If true, uses period boundaries for file naming. If false, uses actual data timestamps for file naming.
§Returns
Returns Ok(())
on success, or an error if consolidation fails for any directory.
§Errors
This function will return an error if:
- Directory listing fails.
- Data type extraction from path fails.
- Period-based consolidation operations fail.
§Notes
- This operation can be resource-intensive for large catalogs with many data types. and instruments.
- The consolidation process splits data into fixed time periods rather than combining. all files into a single file per directory.
- Uses the same period-based consolidation logic as
consolidate_data_by_period
. - Original files are removed and replaced with period-based consolidated files.
- This method is useful for periodic maintenance of the catalog to standardize. file organization by time periods.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let catalog = ParquetDataCatalog::new(/* ... */);
// Consolidate all files in the catalog by 1-day periods
catalog.consolidate_catalog_by_period(
Some(86400000000000), // 1 day in nanoseconds
None,
None,
Some(true)
)?;
// Consolidate only files within a specific time range by 1-hour periods
catalog.consolidate_catalog_by_period(
Some(3600000000000), // 1 hour in nanoseconds
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000)),
Some(false)
)?;
Sourcepub fn extract_data_cls_and_identifier_from_path(
&self,
path: &str,
) -> Result<(Option<String>, Option<String>)>
pub fn extract_data_cls_and_identifier_from_path( &self, path: &str, ) -> Result<(Option<String>, Option<String>)>
Extracts data class and identifier from a directory path.
This method parses a directory path to extract the data type and optional instrument identifier. It’s used to determine what type of data consolidation to perform for each directory.
§Parameters
path
: The directory path to parse.
§Returns
Returns a tuple of (data_class
, identifier) where both are optional strings.
Sourcepub fn consolidate_data_by_period(
&mut self,
type_name: &str,
identifier: Option<String>,
period_nanos: Option<u64>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
ensure_contiguous_files: Option<bool>,
) -> Result<()>
pub fn consolidate_data_by_period( &mut self, type_name: &str, identifier: Option<String>, period_nanos: Option<u64>, start: Option<UnixNanos>, end: Option<UnixNanos>, ensure_contiguous_files: Option<bool>, ) -> Result<()>
Consolidates data files by splitting them into fixed time periods.
This method queries data by period and writes consolidated files immediately, using efficient period-based consolidation logic. When start/end boundaries intersect existing files, the function automatically splits those files to preserve all data.
§Parameters
type_name
: The data type directory name (e.g., “quotes”, “trades”, “bars”).identifier
: Optional instrument ID to consolidate. If None, consolidates all instruments.period_nanos
: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000). Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)start
: Optional start timestamp for consolidation range. If None, uses earliest available data. If specified and intersects existing files, those files will be split to preserve data outside the consolidation range.end
: Optional end timestamp for consolidation range. If None, uses latest available data. If specified and intersects existing files, those files will be split to preserve data outside the consolidation range.ensure_contiguous_files
: If true, uses period boundaries for file naming. If false, uses actual data timestamps for file naming.
§Returns
Returns Ok(())
on success, or an error if consolidation fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- File operations fail.
- Data querying or writing fails.
§Notes
- Uses two-phase approach: first determines all queries, then executes them.
- Groups intervals into contiguous groups to preserve holes between groups.
- Allows consolidation across multiple files within each contiguous group.
- Skips queries if target files already exist for efficiency.
- Original files are removed immediately after querying each period.
- When
ensure_contiguous_files=false
, file timestamps match actual data range. - When
ensure_contiguous_files=true
, file timestamps use period boundaries. - Uses modulo arithmetic for efficient period boundary calculation.
- Preserves holes in data by preventing queries from spanning across gaps.
- Automatically splits files at start/end boundaries to preserve all data.
- Split operations are executed before consolidation to ensure data preservation.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let catalog = ParquetDataCatalog::new(/* ... */);
// Consolidate all quote files by 1-day periods
catalog.consolidate_data_by_period(
"quotes",
None,
Some(86400000000000), // 1 day in nanoseconds
None,
None,
Some(true)
)?;
// Consolidate specific instrument by 1-hour periods
catalog.consolidate_data_by_period(
"trades",
Some("BTCUSD".to_string()),
Some(3600000000000), // 1 hour in nanoseconds
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000)),
Some(false)
)?;
Sourcepub fn consolidate_data_by_period_generic<T>(
&mut self,
identifier: Option<String>,
period_nanos: Option<u64>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
ensure_contiguous_files: Option<bool>,
) -> Result<()>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix + EncodeToRecordBatch + HasTsInit + TryFrom<Data> + Clone,
pub fn consolidate_data_by_period_generic<T>(
&mut self,
identifier: Option<String>,
period_nanos: Option<u64>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
ensure_contiguous_files: Option<bool>,
) -> Result<()>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix + EncodeToRecordBatch + HasTsInit + TryFrom<Data> + Clone,
Generic consolidate data files by splitting them into fixed time periods.
This is a type-safe version of consolidate_data_by_period
that uses generic types
to ensure compile-time correctness and enable reuse across different data types.
§Type Parameters
T
: The data type to consolidate, must implement required traits for serialization.
§Parameters
identifier
: Optional instrument ID to target a specific instrument’s data.period_nanos
: Optional period size in nanoseconds (default: 1 day).start
: Optional start timestamp for consolidation range.end
: Optional end timestamp for consolidation range.ensure_contiguous_files
: Optional flag to control file naming strategy.
§Returns
Returns Ok(())
on success, or an error if consolidation fails.
Sourcepub fn prepare_consolidation_queries(
&self,
type_name: &str,
identifier: Option<String>,
intervals: &[(u64, u64)],
period_nanos: u64,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
ensure_contiguous_files: bool,
) -> Result<Vec<ConsolidationQuery>>
pub fn prepare_consolidation_queries( &self, type_name: &str, identifier: Option<String>, intervals: &[(u64, u64)], period_nanos: u64, start: Option<UnixNanos>, end: Option<UnixNanos>, ensure_contiguous_files: bool, ) -> Result<Vec<ConsolidationQuery>>
Prepares all queries for consolidation by filtering, grouping, and handling splits.
This auxiliary function handles all the preparation logic for consolidation:
- Filters intervals by time range.
- Groups intervals into contiguous groups.
- Identifies and creates split operations for data preservation.
- Generates period-based consolidation queries.
- Checks for existing target files.
Sourcepub fn group_contiguous_intervals(
&self,
intervals: &[(u64, u64)],
) -> Vec<Vec<(u64, u64)>>
pub fn group_contiguous_intervals( &self, intervals: &[(u64, u64)], ) -> Vec<Vec<(u64, u64)>>
Groups intervals into contiguous groups for efficient consolidation.
This method analyzes a list of time intervals and groups them into contiguous sequences. Intervals are considered contiguous if the end of one interval is exactly one nanosecond before the start of the next interval. This grouping preserves data gaps while allowing consolidation within each contiguous group.
§Parameters
intervals
: A slice of timestamp intervals as (start, end) tuples.
§Returns
Returns a vector of groups, where each group is a vector of contiguous intervals. Returns an empty vector if the input is empty.
§Algorithm
- Starts with the first interval in a new group.
- For each subsequent interval, checks if it’s contiguous with the previous.
- If contiguous (
prev_end
+ 1 ==curr_start
), adds to current group. - If not contiguous, starts a new group.
- Returns all groups.
§Examples
Contiguous intervals: [(1,5), (6,10), (11,15)]
Returns: [[(1,5), (6,10), (11,15)]]
Non-contiguous intervals: [(1,5), (8,10), (12,15)]
Returns: [[(1,5)], [(8,10)], [(12,15)]]
§Notes
- Input intervals should be sorted by start timestamp.
- Gaps between groups are preserved and not consolidated.
- Used internally by period-based consolidation methods.
Sourcepub fn reset_catalog_file_names(&self) -> Result<()>
pub fn reset_catalog_file_names(&self) -> Result<()>
Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
This method scans all leaf data directories in the catalog and renames files based on the actual timestamp range of their content. This is useful when files have been modified or when filename conventions have changed.
§Returns
Returns Ok(())
on success, or an error if the operation fails.
§Errors
This function will return an error if:
- Directory listing fails.
- File metadata reading fails.
- File rename operations fail.
- Interval validation fails after renaming.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
// Reset all filenames in the catalog
catalog.reset_catalog_file_names()?;
Sourcepub fn reset_data_file_names(
&self,
data_cls: &str,
instrument_id: Option<String>,
) -> Result<()>
pub fn reset_data_file_names( &self, data_cls: &str, instrument_id: Option<String>, ) -> Result<()>
Resets the filenames of Parquet files for a specific data type and instrument ID.
This method renames files in a specific directory based on the actual timestamp range of their content. This is useful for correcting filenames after data modifications or when filename conventions have changed.
§Parameters
data_cls
: The data type directory name (e.g., “quotes”, “trades”).instrument_id
: Optional instrument ID to target a specific instrument’s data.
§Returns
Returns Ok(())
on success, or an error if the operation fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- File metadata reading fails.
- File rename operations fail.
- Interval validation fails after renaming.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
// Reset filenames for all quote files
catalog.reset_data_file_names("quotes", None)?;
// Reset filenames for a specific instrument's trade files
catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
Sourcepub fn find_leaf_data_directories(&self) -> Result<Vec<String>>
pub fn find_leaf_data_directories(&self) -> Result<Vec<String>>
Finds all leaf data directories in the catalog.
A leaf directory is one that contains data files but no subdirectories. This method is used to identify directories that can be processed for consolidation or other operations.
§Returns
Returns a vector of directory path strings representing leaf directories, or an error if directory traversal fails.
§Errors
This function will return an error if:
- Object store listing operations fail.
- Directory structure cannot be analyzed.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
let catalog = ParquetDataCatalog::new(/* ... */);
let leaf_dirs = catalog.find_leaf_data_directories()?;
for dir in leaf_dirs {
println!("Found leaf directory: {}", dir);
}
Sourcepub fn delete_data_range(
&mut self,
type_name: &str,
identifier: Option<String>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> Result<()>
pub fn delete_data_range( &mut self, type_name: &str, identifier: Option<String>, start: Option<UnixNanos>, end: Option<UnixNanos>, ) -> Result<()>
Deletes data within a specified time range for a specific data type and instrument.
This method identifies all parquet files that intersect with the specified time range and handles them appropriately:
- Files completely within the range are deleted
- Files partially overlapping the range are split to preserve data outside the range
- The original intersecting files are removed after processing
§Parameters
type_name
: The data type directory name (e.g., “quotes”, “trades”, “bars”).identifier
: Optional instrument ID to delete data for. If None, deletes data across all instruments.start
: Optional start timestamp for the deletion range. If None, deletes from the beginning.end
: Optional end timestamp for the deletion range. If None, deletes to the end.
§Returns
Returns Ok(())
on success, or an error if deletion fails.
§Errors
This function will return an error if:
- The directory path cannot be constructed.
- File operations fail.
- Data querying or writing fails.
§Notes
- This operation permanently removes data and cannot be undone.
- Files that partially overlap the deletion range are split to preserve data outside the range.
- The method ensures data integrity by using atomic operations where possible.
- Empty directories are not automatically removed after deletion.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let catalog = ParquetDataCatalog::new(/* ... */);
// Delete all quote data for a specific instrument
catalog.delete_data_range(
"quotes",
Some("BTCUSD".to_string()),
None,
None
)?;
// Delete trade data within a specific time range
catalog.delete_data_range(
"trades",
None,
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000))
)?;
Sourcepub fn delete_catalog_range(
&mut self,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> Result<()>
pub fn delete_catalog_range( &mut self, start: Option<UnixNanos>, end: Option<UnixNanos>, ) -> Result<()>
Deletes data within a specified time range across the entire catalog.
This method identifies all leaf directories in the catalog that contain parquet files
and deletes data within the specified time range from each directory. A leaf directory
is one that contains files but no subdirectories. This is a convenience method that
effectively calls delete_data_range
for all data types and instrument IDs in the catalog.
§Parameters
start
: Optional start timestamp for the deletion range. If None, deletes from the beginning.end
: Optional end timestamp for the deletion range. If None, deletes to the end.
§Returns
Returns Ok(())
on success, or an error if deletion fails.
§Errors
This function will return an error if:
- Directory traversal fails.
- Data class extraction from paths fails.
- Individual delete operations fail.
§Notes
- This operation permanently removes data and cannot be undone.
- The deletion process handles file intersections intelligently by splitting files when they partially overlap with the deletion range.
- Files completely within the deletion range are removed entirely.
- Files partially overlapping the deletion range are split to preserve data outside the range.
- This method is useful for bulk data cleanup operations across the entire catalog.
- Empty directories are not automatically removed after deletion.
§Examples
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use nautilus_core::UnixNanos;
let mut catalog = ParquetDataCatalog::new(/* ... */);
// Delete all data before a specific date across entire catalog
catalog.delete_catalog_range(
None,
Some(UnixNanos::from(1609459200000000000))
)?;
// Delete all data within a specific range across entire catalog
catalog.delete_catalog_range(
Some(UnixNanos::from(1609459200000000000)),
Some(UnixNanos::from(1609545600000000000))
)?;
// Delete all data after a specific date across entire catalog
catalog.delete_catalog_range(
Some(UnixNanos::from(1609459200000000000)),
None
)?;
Sourcepub fn delete_data_range_generic<T>(
&mut self,
identifier: Option<String>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> Result<()>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix + EncodeToRecordBatch + HasTsInit + TryFrom<Data> + Clone,
pub fn delete_data_range_generic<T>(
&mut self,
identifier: Option<String>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> Result<()>where
T: DecodeDataFromRecordBatch + CatalogPathPrefix + EncodeToRecordBatch + HasTsInit + TryFrom<Data> + Clone,
Generic implementation for deleting data within a specified time range.
This method provides the core deletion logic that works with any data type that implements the required traits. It handles file intersection analysis, data splitting for partial overlaps, and file cleanup.
§Type Parameters
T
: The data type that implements required traits for catalog operations.
§Parameters
identifier
: Optional instrument ID to delete data for.start
: Optional start timestamp for the deletion range.end
: Optional end timestamp for the deletion range.
§Returns
Returns Ok(())
on success, or an error if deletion fails.
Sourcepub fn prepare_delete_operations(
&self,
type_name: &str,
identifier: Option<String>,
intervals: &[(u64, u64)],
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> Result<Vec<DeleteOperation>>
pub fn prepare_delete_operations( &self, type_name: &str, identifier: Option<String>, intervals: &[(u64, u64)], start: Option<UnixNanos>, end: Option<UnixNanos>, ) -> Result<Vec<DeleteOperation>>
Prepares all operations for data deletion by identifying files that need to be split or removed.
This auxiliary function handles all the preparation logic for deletion:
- Filters intervals by time range
- Identifies files that intersect with the deletion range
- Creates split operations for files that partially overlap
- Generates removal operations for files completely within the range
§Parameters
type_name
: The data type directory name for path generation.identifier
: Optional instrument identifier for path generation.intervals
: List of (start_ts
,end_ts
) tuples representing existing file intervals.start
: Optional start timestamp for deletion range.end
: Optional end timestamp for deletion range.
§Returns
Returns a vector of DeleteOperation
structs ready for execution.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for ParquetDataCatalog
impl !RefUnwindSafe for ParquetDataCatalog
impl Send for ParquetDataCatalog
impl Sync for ParquetDataCatalog
impl Unpin for ParquetDataCatalog
impl !UnwindSafe for ParquetDataCatalog
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more