Persistence¶
The persistence subpackage handles data storage and retrieval, mainly to support backtesting.
- class BaseDataCatalog¶
Bases:
ABCProvides a abstract base class for a queryable data catalog.
- abstractmethod classmethod from_env() BaseDataCatalog¶
- abstractmethod classmethod from_uri(uri: str, storage_options: dict[str, str] | None = None) BaseDataCatalog¶
- instruments(instrument_type: type | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) list[Instrument]¶
- instrument_status(instrument_ids: list[str] | None = None, **kwargs: Any) list[InstrumentStatus]¶
- instrument_closes(instrument_ids: list[str] | None = None, **kwargs: Any) list[InstrumentClose]¶
- order_book_deltas(instrument_ids: list[str] | None = None, batched: bool = False, **kwargs: Any) list[OrderBookDelta] | list[OrderBookDeltas]¶
- order_book_depth10(instrument_ids: list[str] | None = None, **kwargs: Any) list[OrderBookDepth10]¶
- funding_rates(instrument_ids: list[str] | None = None, **kwargs: Any) list[FundingRateUpdate]¶
- bars(bar_types: list[str] | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) list[Bar]¶
- custom_data(cls: type, instrument_ids: list[str] | None = None, as_nautilus: bool = False, metadata: dict | None = None, **kwargs: Any) list[CustomData]¶
- abstractmethod query(data_cls: type, identifiers: list[str] | None = None, **kwargs: Any) list[Data]¶
- abstractmethod query_first_timestamp(data_cls: type, identifier: str | None = None) Timestamp | None¶
- abstractmethod query_last_timestamp(data_cls: type, identifier: str | None = None) Timestamp | None¶
- abstractmethod list_data_types() list[str]¶
- list_generic_data_types() list[str]¶
- abstractmethod list_backtest_runs() list[str]¶
- abstractmethod list_live_runs() list[str]¶
- class FeatherFile¶
Bases:
NamedTupleFeatherFile(path, class_name)
- path: str¶
Alias for field number 0
- class_name: str¶
Alias for field number 1
- count(value, /)¶
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)¶
Return first index of value.
Raises ValueError if the value is not present.
- class ParquetDataCatalog¶
Bases:
BaseDataCatalogProvides a queryable data catalog persisted to files in Parquet (Arrow) format.
- Parameters:
path (PathLike[str] | str) – The root path for this data catalog. Must exist and must be an absolute path.
fs_protocol (str, default 'file') – The filesystem protocol used by fsspec to handle file operations. This determines how the data catalog interacts with storage, be it local filesystem, cloud storage, or others. Common protocols include ‘file’ for local storage, ‘s3’ for Amazon S3, and ‘gcs’ for Google Cloud Storage. If not provided, it defaults to ‘file’, meaning the catalog operates on the local filesystem.
fs_storage_options (dict, optional) – The fs storage options.
fs_rust_storage_options (dict[str, str], optional) – Storage-specific configuration options for the rust backend. Defaults to what is used for fs_storage_options if not specified.
max_rows_per_group (int, default 5000) – The maximum number of rows per group. If the value is greater than 0, then the dataset writer may split up large incoming batches into multiple row groups.
show_query_paths (bool, default False) – If globed query paths should be printed to stdout.
Warning
The data catalog is not threadsafe. Using it in a multithreaded environment can lead to unexpected behavior.
Notes
For further details about fsspec and its filesystem protocols, see https://filesystem-spec.readthedocs.io/en/latest/.
- fs_protocol: str¶
- fs: AbstractFileSystem¶
- classmethod from_env() ParquetDataCatalog¶
Create a data catalog instance by accessing the ‘NAUTILUS_PATH’ environment variable.
- Return type:
- Raises:
OSError – If the ‘NAUTILUS_PATH’ environment variable is not set.
- classmethod from_uri(uri: str, fs_storage_options: dict[str, str] | None = None, fs_rust_storage_options: dict[str, str] | None = None) ParquetDataCatalog¶
Create a data catalog instance from the given uri with optional storage options.
- Parameters:
uri (str) – The URI string for the backing path.
fs_storage_options (dict[str, str], optional) – Storage-specific configuration options. For S3: endpoint_url, region, access_key_id, secret_access_key, session_token, etc. For GCS: service_account_path, service_account_key, project_id, etc. For Azure: account_name, account_key, sas_token, etc.
fs_rust_storage_options (dict[str, str], optional) – Storage-specific configuration options for the rust backend. Defaults to what is used for fs_storage_options if not specified.
- Return type:
- write_data(data: list[Data | Event] | list[OrderBookDelta | OrderBookDepth10 | QuoteTick | TradeTick | Bar], start: int | None = None, end: int | None = None, skip_disjoint_check: bool = False) None¶
Write the given data to the catalog.
The function categorizes the data based on their class name and, when applicable, their associated instrument ID. It then delegates the actual writing process to the write_chunk method.
- Parameters:
Warning
Any existing data which already exists under a filename will be overwritten.
Notes
All data of the same type is expected to be monotonically increasing, or non-decreasing.
The data is sorted and grouped based on its class name and instrument ID (if applicable) before writing.
Instrument-specific data should have either an instrument_id attribute or be an instance of Instrument.
The Bar class is treated as a special case, being grouped based on its bar_type attribute.
The input data list must be non-empty, and all data items must be of the appropriate class type.
- Raises:
ValueError – If data of the same type is not monotonically increasing (or non-decreasing) based on ts_init.
- extend_file_name(data_cls: type[Data], identifier: str | None = None, start: int | None = None, end: int | None = None)¶
Extend the timestamp range of an existing parquet file by renaming it.
This method looks for parquet files that are adjacent to the specified timestamp range and renames them to include the new range. It’s useful for extending existing files without having to rewrite them when a query returns an empty list.
- Parameters:
data_cls (type[Data]) – The data class type to extend files for.
identifier (str, optional) – The instrument ID to filter files by. If None, applies to all instruments.
start (int, optional) – The start timestamp (nanoseconds) of the new range.
end (int, optional) – The end timestamp (nanoseconds) of the new range.
Notes
Both start and end must be provided for the method to take effect.
The method only extends files if they are exactly adjacent to the new range (i.e., if interval[0] == end + 1 or interval[1] == start - 1).
After renaming, the method verifies that the intervals remain disjoint.
- reset_all_file_names() None¶
Reset the filenames of all parquet files in the catalog to match their actual content timestamps.
This method identifies all leaf directories in the catalog that contain parquet files and resets their filenames to accurately reflect the minimum and maximum timestamps of the data they contain. It does this by examining the parquet metadata for each file and renaming the file to follow the pattern ‘{first_timestamp}-{last_timestamp}.parquet’.
This is useful when file names may have become inconsistent with their content, for example after manual file operations or data corruption. It ensures that query operations that rely on filename-based filtering will work correctly.
Notes
This operation scans all parquet files in the catalog and may be resource-intensive for large catalogs.
The method does not modify the content of the files, only their names.
After renaming, the method verifies that the intervals represented by the filenames are disjoint (non-overlapping) to maintain data integrity.
This method is a convenience wrapper that calls _reset_file_names on each leaf directory.
- reset_data_file_names(data_cls: type, identifier: str | None = None) None¶
Reset the filenames of parquet files for a specific data class and instrument ID.
This method resets the filenames of parquet files for the specified data class and identifier to accurately reflect the minimum and maximum timestamps of the data they contain. It examines the parquet metadata for each file and renames the file to follow the pattern ‘{first_timestamp}-{last_timestamp}.parquet’.
- Parameters:
data_cls (type) – The data class type to reset filenames for (e.g., QuoteTick, TradeTick, Bar).
identifier (str, optional) – The specific identifier (instrument ID, etc) to reset filenames for. If None, resets filenames for all instruments of the specified data class.
Notes
This operation is more targeted than reset_all_file_names as it only affects files for a specific data class and identifier.
The method does not modify the content of the files, only their names.
After renaming, the method verifies that the intervals represented by the filenames are disjoint (non-overlapping) to maintain data integrity.
This method is useful for correcting filename inconsistencies for a specific data type without processing the entire catalog.
- consolidate_catalog(start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True, deduplicate: bool = False) None¶
Consolidate all parquet files across the entire catalog within the specified time range.
This method identifies all leaf directories in the catalog that contain parquet files and consolidates them. A leaf directory is one that contains files but no subdirectories. This is a convenience method that effectively calls consolidate_data for all data types and instrument IDs in the catalog.
- Parameters:
start (TimestampLike, optional) – The start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.
end (TimestampLike, optional) – The end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.
ensure_contiguous_files (bool, default True) – If True, ensures that files have contiguous timestamps before consolidation.
deduplicate (bool, default False) – If True, removes duplicate rows from the consolidated file.
Notes
This operation can be resource-intensive for large catalogs with many data types and instruments.
The consolidation process only combines files with non-overlapping timestamp ranges.
If timestamp ranges overlap between files in any directory, the consolidation for that directory will be aborted for safety.
After consolidation, the original files are removed and replaced with a single file in each leaf directory.
This method is useful for periodic maintenance of the catalog to improve query performance and reduce storage overhead.
- consolidate_data(data_cls: type, identifier: str | None = None, start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True, deduplicate: bool = False) None¶
Consolidate multiple parquet files for a specific data class and instrument ID into a single file.
This method identifies all parquet files within the specified time range for the given data class and instrument ID, then combines them into a single parquet file. This helps improve query performance and reduces storage overhead by eliminating small fragmented files.
- Parameters:
data_cls (type) – The data class type to consolidate (e.g., QuoteTick, TradeTick, Bar).
identifier (str, optional) – The specific instrument ID to consolidate data for. If None, consolidates data for all instruments of the specified data class.
start (TimestampLike, optional) – The start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.
end (TimestampLike, optional) – The end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.
ensure_contiguous_files (bool, default True) – If True, ensures that files have contiguous timestamps before consolidation.
deduplicate (bool, default False) – If True, removes duplicate rows from the consolidated file.
Notes
The consolidation process only combines files with non-overlapping timestamp ranges.
If timestamp ranges overlap between files, the consolidation will be aborted for safety.
The method uses the _combine_data_files function which sorts files by their first timestamp before combining them.
After consolidation, the original files are removed and replaced with a single file.
- consolidate_catalog_by_period(period: Timedelta = Timedelta('1 days 00:00:00'), start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True) None¶
Consolidate all parquet files across the entire catalog by splitting them into fixed time periods.
This method identifies all leaf directories in the catalog that contain parquet files and consolidates them by period. A leaf directory is one that contains files but no subdirectories. This is a convenience method that effectively calls consolidate_data_by_period for all data types and instrument IDs in the catalog.
- Parameters:
period (pd.Timedelta, default pd.Timedelta(days=1)) – The period duration for consolidation. Default is 1 day. Examples: pd.Timedelta(hours=1), pd.Timedelta(days=7), pd.Timedelta(minutes=30)
start (TimestampLike, optional) – The start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.
end (TimestampLike, optional) – The end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.
ensure_contiguous_files (bool, default True) – If True, uses period boundaries for file naming. If False, uses actual data timestamps for file naming.
Notes
This operation can be resource-intensive for large catalogs with many data types and instruments.
The consolidation process splits data into fixed time periods rather than combining all files into a single file per directory.
Uses the same period-based consolidation logic as consolidate_data_by_period.
Original files are removed and replaced with period-based consolidated files.
This method is useful for periodic maintenance of the catalog to standardize file organization by time periods.
- consolidate_data_by_period(data_cls: type, identifier: str | None = None, period: Timedelta = Timedelta('1 days 00:00:00'), start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True) None¶
Consolidate data files by splitting them into fixed time periods.
This method queries data by period and writes consolidated files immediately, using the skip_disjoint_check parameter to avoid interval conflicts during the consolidation process. When start/end boundaries intersect existing files, the function automatically splits those files to preserve all data.
- Parameters:
data_cls (type) – The data class type to consolidate.
identifier (str, optional) – The instrument ID to consolidate. If None, consolidates all instruments.
period (pd.Timedelta, default pd.Timedelta(days=1)) – The period duration for consolidation. Default is 1 day. Examples: pd.Timedelta(hours=1), pd.Timedelta(days=7), pd.Timedelta(minutes=30)
start (TimestampLike, optional) – The start timestamp for consolidation range. If None, uses earliest available data. If specified and intersects existing files, those files will be split to preserve data outside the consolidation range.
end (TimestampLike, optional) – The end timestamp for consolidation range. If None, uses latest available data. If specified and intersects existing files, those files will be split to preserve data outside the consolidation range.
ensure_contiguous_files (bool, default True) – If True, uses period boundaries for file naming. If False, uses actual data timestamps for file naming.
Notes
Uses two-phase approach: first determines all queries, then executes them
Groups intervals into contiguous groups to preserve holes between groups
Allows consolidation across multiple files within each contiguous group
Skips queries if target files already exist for efficiency
Original files are removed immediately after querying each period
Uses skip_disjoint_check to avoid interval conflicts during consolidation
When ensure_contiguous_files=False, file timestamps match actual data range
When ensure_contiguous_files=True, file timestamps use period boundaries
Uses modulo arithmetic for efficient period boundary calculation
Preserves holes in data by preventing queries from spanning across gaps
Automatically splits files at start/end boundaries to preserve all data
Split operations are executed before consolidation to ensure data preservation
- delete_catalog_range(start: int | str | float | None = None, end: int | str | float | None = None) None¶
Delete data within a specified time range across the entire catalog.
This method identifies all leaf directories in the catalog that contain parquet files and deletes data within the specified time range from each directory. A leaf directory is one that contains files but no subdirectories. This is a convenience method that effectively calls delete_data_range for all data types and instrument IDs in the catalog.
- Parameters:
start (TimestampLike, optional) – The start timestamp for the deletion range. If None, deletes from the beginning.
end (TimestampLike, optional) – The end timestamp for the deletion range. If None, deletes to the end.
Notes
This operation permanently removes data and cannot be undone
The deletion process handles file intersections intelligently by splitting files when they partially overlap with the deletion range
Files completely within the deletion range are removed entirely
Files partially overlapping the deletion range are split to preserve data outside the range
This method is useful for bulk data cleanup operations across the entire catalog
Empty directories are not automatically removed after deletion
- delete_data_range(data_cls: type, identifier: str | None = None, start: int | str | float | None = None, end: int | str | float | None = None) None¶
Delete data within a specified time range for a specific data class and instrument.
This method identifies all parquet files that intersect with the specified time range and handles them appropriately: - Files completely within the range are deleted - Files partially overlapping the range are split to preserve data outside the range - The original intersecting files are removed after processing
- Parameters:
data_cls (type) – The data class type to delete data for (e.g., QuoteTick, TradeTick, Bar).
identifier (str, optional) – The instrument identifier to delete data for. If None, deletes data across all instruments for the specified data class.
start (TimestampLike, optional) – The start timestamp for the deletion range. If None, deletes from the beginning.
end (TimestampLike, optional) – The end timestamp for the deletion range. If None, deletes to the end.
Notes
This operation permanently removes data and cannot be undone
Files that partially overlap the deletion range are split to preserve data outside the range
The method ensures data integrity by using atomic operations where possible
Empty directories are not automatically removed after deletion
- query(data_cls: type, identifiers: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, where: str | None = None, files: list[str] | None = None, **kwargs: Any) list[Data | CustomData]¶
Query the catalog for data matching the specified criteria.
This method retrieves data from the catalog based on the provided filters. It automatically selects the appropriate query implementation (Rust or PyArrow) based on the data class and filesystem protocol.
- Parameters:
data_cls (type) – The data class type to query for.
identifiers (list[str], optional) – A list of instrument IDs to filter by. If None, all instruments are included.
start (TimestampLike, optional) – The start timestamp for the query range. If None, no lower bound is applied.
end (TimestampLike, optional) – The end timestamp for the query range. If None, no upper bound is applied.
where (str, optional) – An additional SQL WHERE clause to filter the data (used in Rust queries).
files (list[str], optional) – A specific list of files to query from. If provided, these files are used instead of discovering files through the normal process.
**kwargs (Any) – Additional keyword arguments passed to the underlying query implementation.
- Returns:
A list of data objects matching the query criteria.
- Return type:
list[Data | CustomData]
Notes
For Nautilus built-in data types (OrderBookDelta, QuoteTick, etc.) with the ‘file’ protocol, the Rust implementation is used for better performance.
For other data types or protocols, the PyArrow implementation is used.
When files parameter is provided, PyArrow backend is used regardless of data type.
Non-Nautilus data classes are wrapped in CustomData objects with the appropriate DataType.
- backend_session(data_cls: type, identifiers: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, where: str | None = None, session: DataBackendSession | None = None, files: list[str] | None = None, optimize_file_loading: bool = False, **kwargs: Any) DataBackendSession¶
Create or update a DataBackendSession for querying data using the Rust backend.
This method is used internally by the query_rust method to set up the query session. It identifies the relevant parquet files and adds them to the session with appropriate SQL queries.
- Parameters:
data_cls (type) – The data class type to query for.
identifiers (list[str], optional) – A list of instrument IDs to filter by. If None, all instruments are included.
start (TimestampLike, optional) – The start timestamp for the query range. If None, no lower bound is applied.
end (TimestampLike, optional) – The end timestamp for the query range. If None, no upper bound is applied.
where (str, optional) – An additional SQL WHERE clause to filter the data.
session (DataBackendSession, optional) – An existing session to update. If None, a new session is created.
files (list[str], optional) – A list of known files to use, skipping the file discovery step. This is a performance optimization when the caller already knows which files exist. Note: With optimize_file_loading=True, the entire directory containing these files will be read by DataFusion, not just the specified files.
optimize_file_loading (bool, default False) – If True, registers entire directories with DataFusion, which is more efficient for managing many files. If False, registers each file individually (needed for operations like consolidation where precise file control is required).
**kwargs (Any) – Additional keyword arguments.
- Returns:
The updated or newly created session.
- Return type:
DataBackendSession
Notes
It maps the data class to the appropriate NautilusDataType for the Rust backend.
The method filters files by directory structure and filename patterns before adding them to the session.
Each file is added with a SQL query that includes the specified filters.
Supports various object store backends including local files, AWS S3, Google Cloud Storage, Azure Blob Storage, and HTTP/WebDAV servers.
- Raises:
RuntimeError – If the data class is not supported by the Rust backend.
- filter_files(data_cls: type, file_paths: list[str], identifiers: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None) list[str]¶
Filter a list of file paths based on identifiers and time range.
This function filters the provided file paths by: 1. Matching identifiers (exact match for instruments, prefix match for bars) 2. Intersecting with the specified time range
- Parameters:
data_cls (type) – The data class type to filter for (e.g., Bar, TradeTick).
file_paths (list[str]) – List of file paths to filter.
identifiers (list[str] | None, optional) – List of identifiers to match against file paths. If None, no identifier filtering is applied.
start (TimestampLike | None, optional) – Start timestamp for filtering. If None, no start time constraint is applied.
end (TimestampLike | None, optional) – End timestamp for filtering. If None, no end time constraint is applied.
- Returns:
Filtered list of file paths that match the criteria.
- Return type:
list[str]
Notes
For Bar data classes, if exact identifier matching fails, the function attempts partial matching by checking if the file’s identifier starts with the provided identifier followed by a dash (to match bar type patterns).
- get_file_list_from_data_cls(data_cls: type) list[str]¶
Retrieve a list of file paths for a given data class.
This function constructs a glob pattern to find all parquet files associated with the specified data class in the catalog’s directory structure.
- Parameters:
data_cls (type) – The data class type to retrieve file paths for (e.g., Bar, TradeTick).
- Returns:
List of file paths matching the data class.
- Return type:
list[str]
- get_missing_intervals_for_request(start: int, end: int, data_cls: type, identifier: str | None = None) list[tuple[int, int]]¶
Find the missing time intervals for a specific data class and instrument ID.
This method identifies the gaps in the data between the specified start and end timestamps. It’s useful for determining what data needs to be fetched or generated to complete a time series.
- Parameters:
start (int) – The start timestamp (nanoseconds) of the request range.
end (int) – The end timestamp (nanoseconds) of the request range.
data_cls (type) – The data class type to check for.
identifier (str, optional) – The instrument ID to check for. If None, checks across all instruments.
- Returns:
A list of (start, end) timestamp tuples representing the missing intervals. Each tuple represents a continuous range of missing data.
- Return type:
list[tuple[int, int]]
Notes
The method uses the filename patterns to determine the available data intervals.
It does not examine the actual content of the files.
The returned intervals are disjoint (non-overlapping) and sorted by start time.
If all data is available (no gaps), an empty list is returned.
If no data is available in the entire range, a single tuple (start, end) is returned.
- get_intervals(data_cls: type, identifier: str | None = None) list[tuple[int, int]]¶
Get the time intervals covered by parquet files for a specific data class and instrument ID.
This method retrieves the timestamp ranges of all parquet files for the specified data class and instrument ID. Each parquet file in the catalog follows a naming convention of ‘{start_timestamp}-{end_timestamp}.parquet’, which this method parses to determine the available data intervals.
- Parameters:
data_cls (type) – The data class type to get intervals for.
identifier (str, optional) – The instrument ID to get intervals for. If None, gets intervals across all instruments for the specified data class.
- Returns:
A list of (start, end) timestamp tuples representing the available data intervals. Each tuple contains the start and end timestamps (in nanoseconds) of a continuous range of data. The intervals are sorted by start time.
- Return type:
list[tuple[int, int]]
Notes
This method only examines the filenames and does not inspect the actual content of the files.
The returned intervals are sorted by start timestamp.
If no data is available, an empty list is returned.
This method is useful for determining what data is available before making queries.
Used internally by methods like get_missing_intervals_for_request and _query_last_timestamp.
- list_data_types() list[str]¶
List all data types available in the catalog.
- Returns:
list[str]
A list of data type names (as directory stems) in the catalog.
- list_backtest_runs() list[str]¶
List all backtest run IDs available in the catalog.
- Returns:
list[str]
A list of backtest run IDs (as directory stems) in the catalog.
- list_live_runs() list[str]¶
List all live run IDs available in the catalog.
- Returns:
list[str]
A list of live run IDs (as directory stems) in the catalog.
- read_live_run(instance_id: str, **kwargs: Any) list[Data] | dict[str, list[Data]]¶
Read data from a live run.
This method reads all data associated with a specific live run instance from feather files.
- Parameters:
instance_id (str) – The ID of the live run instance.
**kwargs (Any) – Additional keyword arguments passed to the underlying _read_feather method.
- Returns:
A list of data objects from the live run, sorted by timestamp.
- Return type:
list[Data]
- read_backtest(instance_id: str, **kwargs: Any) list[Data] | dict[str, list[Data]]¶
Read data from a backtest run.
This method reads all data associated with a specific backtest run instance from feather files.
- Parameters:
instance_id (str) – The ID of the backtest run instance.
**kwargs (Any) – Additional keyword arguments passed to the underlying _read_feather method.
- Returns:
A list of data objects from the backtest run, sorted by timestamp.
- Return type:
list[Data]
- convert_stream_to_data(instance_id: str, data_cls: type, other_catalog: ParquetDataCatalog | None = None, subdirectory: str = 'backtest', identifiers: list[str] | None = None, use_ts_event_for_ts_init: bool = False) None¶
Convert stream data from feather files to parquet files.
This method reads data from feather files generated during a backtest or live run and writes it to the catalog in parquet format. It’s useful for converting temporary stream data into a more permanent and queryable format.
- Parameters:
instance_id (str) – The ID of the backtest or live run instance.
data_cls (type) – The data class type to convert.
other_catalog (ParquetDataCatalog, optional) – An alternative catalog to write the data to. If None, writes to this catalog.
subdirectory (str, default "backtest") – The subdirectory containing the feather files. Either “backtest” or “live”.
identifiers (list[str], optional) – Filter to only include data containing these identifiers in their instrument_ids or bar_types.
use_ts_event_for_ts_init (bool, default False) – If True, replaces the ts_init column with ts_event column values before deserializing.
- bars(bar_types: list[str] | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) list[Bar]¶
- custom_data(cls: type, instrument_ids: list[str] | None = None, as_nautilus: bool = False, metadata: dict | None = None, **kwargs: Any) list[CustomData]¶
- funding_rates(instrument_ids: list[str] | None = None, **kwargs: Any) list[FundingRateUpdate]¶
- instrument_closes(instrument_ids: list[str] | None = None, **kwargs: Any) list[InstrumentClose]¶
- instrument_status(instrument_ids: list[str] | None = None, **kwargs: Any) list[InstrumentStatus]¶
- instruments(instrument_type: type | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) list[Instrument]¶
- list_generic_data_types() list[str]¶
- order_book_deltas(instrument_ids: list[str] | None = None, batched: bool = False, **kwargs: Any) list[OrderBookDelta] | list[OrderBookDeltas]¶
- order_book_depth10(instrument_ids: list[str] | None = None, **kwargs: Any) list[OrderBookDepth10]¶
- class BarDataWrangler¶
Bases:
objectBarDataWrangler(BarType bar_type, Instrument instrument)
Provides a means of building lists of Nautilus Bar objects.
- Parameters:
bar_type (BarType) – The bar type for the wrangler.
instrument (Instrument) – The instrument for the wrangler.
- bar_type¶
- instrument¶
- process(self, data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0)¶
Process the given bar dataset into Nautilus Bar objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, if one does not exist then will use the default_volume.
- Parameters:
data (pd.DataFrame) – The data to process.
default_volume (float) – The default volume for each bar (if not provided).
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
- Return type:
list[Bar]
- Raises:
ValueError – If data is empty.
- class OrderBookDeltaDataWrangler¶
Bases:
objectOrderBookDeltaDataWrangler(Instrument instrument)
Provides a means of building lists of Nautilus OrderBookDelta objects.
- Parameters:
instrument (Instrument) – The instrument for the data wrangler.
- instrument¶
- process(self, data: pd.DataFrame, int ts_init_delta: int = 0, bool is_raw=False)¶
Process the given order book dataset into Nautilus OrderBookDelta objects.
- Parameters:
data (pd.DataFrame) – The data to process.
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
is_raw (bool, default False) – If the data is scaled to Nautilus fixed-point values.
- Raises:
ValueError – If data is empty.
- class QuoteTickDataWrangler¶
Bases:
objectQuoteTickDataWrangler(Instrument instrument)
Provides a means of building lists of Nautilus QuoteTick objects.
- Parameters:
instrument (Instrument) – The instrument for the data wrangler.
- instrument¶
- process(self, data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0)¶
Process the given tick dataset into Nautilus QuoteTick objects.
Expects columns [‘bid_price’, ‘ask_price’] with ‘timestamp’ index. Note: The ‘bid_size’ and ‘ask_size’ columns are optional, will then use the default_volume.
- Parameters:
data (pd.DataFrame) – The tick data to process.
default_volume (float) – The default volume for each tick (if not provided).
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system. Cannot be negative.
- Return type:
list[QuoteTick]
- process_bar_data(self, bid_data: pd.DataFrame, ask_data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0, int offset_interval_ms: int = 100, bool timestamp_is_close: bool = True, int random_seed: int | None = None, bool is_raw: bool = False, bool sort_data: bool = True)¶
Process the given bar datasets into Nautilus QuoteTick objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, will then use the default_volume.
- Parameters:
bid_data (pd.DataFrame) – The bid bar data.
ask_data (pd.DataFrame) – The ask bar data.
default_volume (float) – The volume per tick if not available from the data.
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
offset_interval_ms (int, default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
random_seed (int, optional) – The random seed for shuffling order of high and low ticks from bar data. If random_seed is
Nonethen won’t shuffle.is_raw (bool, default False) – If the data is scaled to Nautilus fixed-point values.
timestamp_is_close (bool, default True) – If bar timestamps are at the close. If True, then open, high, low timestamps are offset before the close timestamp. If False, then high, low, close timestamps are offset after the open timestamp.
sort_data (bool, default True) – If the data should be sorted by timestamp.
- class TradeTickDataWrangler¶
Bases:
objectTradeTickDataWrangler(Instrument instrument)
Provides a means of building lists of Nautilus TradeTick objects.
- Parameters:
instrument (Instrument) – The instrument for the data wrangler.
- instrument¶
- process(self, data: pd.DataFrame, int ts_init_delta: int = 0, bool is_raw=False)¶
Process the given trade tick dataset into Nautilus TradeTick objects.
- Parameters:
data (pd.DataFrame) – The data to process.
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
is_raw (bool, default False) – If the data is scaled to Nautilus fixed-point values.
- Return type:
list[TradeTick]
- Raises:
ValueError – If data is empty.
- process_bar_data(self, data: pd.DataFrame, int ts_init_delta: int = 0, int offset_interval_ms: int = 100, bool timestamp_is_close: bool = True, int random_seed: int | None = None, bool is_raw: bool = False, bool sort_data: bool = True)¶
Process the given bar datasets into Nautilus TradeTick objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, will then use the default_volume.
- Parameters:
data (pd.DataFrame) – The trade bar data.
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
offset_interval_ms (int, default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
random_seed (int, optional) – The random seed for shuffling order of high and low ticks from bar data. If random_seed is
Nonethen won’t shuffle.is_raw (bool, default False) – If the data is scaled to Nautilus fixed-point.
timestamp_is_close (bool, default True) – If bar timestamps are at the close. If True, then open, high, low timestamps are offset before the close timestamp. If False, then high, low, close timestamps are offset after the open timestamp.
sort_data (bool, default True) – If the data should be sorted by timestamp.
- Return type:
list[TradeTick]
- Raises:
ValueError – If data is empty.
- processed_data¶
- align_bid_ask_bar_data(bid_data: pd.DataFrame, ask_data: pd.DataFrame)¶
Merge bid and ask data into a single DataFrame with prefixed column names.
- calculate_bar_price_offsets(num_records, bool timestamp_is_close: bool, int offset_interval_ms: int, random_seed=None)¶
Calculate and potentially randomize the time offsets for bar prices based on the closeness of the timestamp.
- Parameters:
num_records (int) – The number of records for which offsets are to be generated.
timestamp_is_close (bool) – A flag indicating whether the timestamp is close to the trading time.
offset_interval_ms (int) – The offset interval in milliseconds to be applied.
random_seed (Optional[int]) – The seed for random number generation to ensure reproducibility.
- Returns:
dict – high and low offsets are randomized.
- Return type:
A dictionary with arrays of offsets for open, high, low, and close prices. If random_seed is provided,
- calculate_volume_quarter(volume: np.ndarray, int precision: int, double size_increment: float)¶
Convert raw volume data to quarter precision.
- Parameters:
volume (np.ndarray) – An array of volume data to be processed.
precision (int) – The decimal precision to which the volume data is rounded.
- Returns:
The volume data adjusted to quarter precision.
- Return type:
np.ndarray
- prepare_event_and_init_timestamps(index: pd.DatetimeIndex, int ts_init_delta: int)¶
- preprocess_bar_data(data: pd.DataFrame, bool is_raw: bool)¶
Preprocess financial bar data to a standardized format.
Ensures the DataFrame index is labeled as “timestamp”, converts the index to UTC, removes time zone awareness, drops rows with NaN values in critical columns, and optionally scales the data.
- Parameters:
data (pd.DataFrame) – The input DataFrame containing financial bar data.
is_raw (bool) – A flag to determine whether the data should be scaled. If True, scales the data back by FIXED_SCALAR.
- Returns:
pd.DataFrame
- Return type:
The preprocessed DataFrame with a cleaned and standardized structure.
- class StreamingFeatherWriter¶
Bases:
objectProvides a stream writer of Nautilus objects into feather files with rotation capabilities.
- Parameters:
path (str) – The path to persist the stream to. Must be a directory.
cache (Cache) – The cache for the query info.
clock (Clock) – The clock to use for time-related operations.
fs_protocol (str, default 'file') – The fsspec file system protocol.
flush_interval_ms (int, optional) – The flush interval (milliseconds) for writing chunks.
replace (bool, default False) – If existing files at the given path should be replaced.
include_types (list[type], optional) – A list of Arrow serializable types to write. If this is specified then only the included types will be written.
rotation_mode (RotationMode, default RotationMode.NO_ROTATION) – The mode for file rotation.
max_file_size (int, default 1GB) – The maximum file size in bytes before rotation (for SIZE mode).
rotation_interval (pd.Timedelta, default 1 day) – The time interval for file rotation (for INTERVAL mode and SCHEDULED_DATES mode).
rotation_time (datetime.time, default 00:00) – The time of day for file rotation (for SCHEDULED_DATES mode).
rotation_timezone (str, default 'UTC') – The timezone for rotation calculations(for SCHEDULED_DATES mode).
- fs: AbstractFileSystem¶
- write(obj: object) None¶
Write the object to the stream.
- Parameters:
obj (object) – The object to write.
- Raises:
ValueError – If obj is
None.
- check_flush() None¶
Flush all stream writers if current time greater than the next flush interval.
- flush() None¶
Flush all stream writers.
- close() None¶
Flush and close all stream writers.
- get_current_file_info() dict[str | tuple[str, str], dict[str, Any]]¶
Get information about the current files being written.
- Returns:
A dictionary containing file information for each table.
- Return type:
dict[str | tuple[str, str], dict[str, Any]]
- get_next_rotation_time(table_name: str | tuple[str, str]) Timestamp | None¶
Get the expected time for the next file rotation.
- Parameters:
table_name (str | tuple[str, str]) – The specific table name to get the next rotation time for.
- Returns:
The next rotation time for the specified table, or None if not set.
- Return type:
pd.Timestamp | None
- property is_closed: bool¶
Return whether all file streams are closed.
- Returns:
True if all streams are closed, False otherwise.
- Return type:
bool