Persistence
The persistence subpackage handles data storage and retrieval, mainly to support backtesting.
class BaseDataCatalog
Bases: ABC
Provides a abstract base class for a queryable data catalog.
abstract classmethod from_env() → BaseDataCatalog
abstract classmethod from_uri(uri: str) → BaseDataCatalog
abstract query(data_cls: type, instrument_ids: list[str] | None = None, bar_types: list[str] | None = None, **kwargs: Any) → list[Data]
instruments(instrument_type: type | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) → list[Instrument]
instrument_status(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentStatus]
instrument_closes(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentClose]
order_book_deltas(instrument_ids: list[str] | None = None, batched: bool = False, **kwargs: Any) → list[OrderBookDelta] | list[OrderBookDeltas]
order_book_depth10(instrument_ids: list[str] | None = None, **kwargs: Any) → list[OrderBookDepth10]
quote_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[QuoteTick]
trade_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[TradeTick]
bars(bar_types: list[str] | None = None, **kwargs: Any) → list[Bar]
custom_data(cls: type, as_nautilus: bool = False, metadata: dict | None = None, **kwargs: Any) → list[CustomData]
abstract list_data_types() → list[str]
list_generic_data_types() → list[str]
abstract list_backtest_runs() → list[str]
abstract list_live_runs() → list[str]
abstract read_live_run(instance_id: str, **kwargs: Any) → list[str]
abstract read_backtest(instance_id: str, **kwargs: Any) → list[str]
class FeatherFile
Bases: NamedTuple
FeatherFile(path, class_name)
path : str
Alias for field number 0
class_name : str
Alias for field number 1
count(value, /)
Return number of occurrences of value.
index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
class ParquetDataCatalog
Bases: BaseDataCatalog
Provides a queryable data catalog persisted to files in Parquet (Arrow) format.
- Parameters:
- path (PathLike *[*str ] | str) – The root path for this data catalog. Must exist and must be an absolute path.
- fs_protocol (str , default 'file') – The filesystem protocol used by fsspec to handle file operations. This determines how the data catalog interacts with storage, be it local filesystem, cloud storage, or others. Common protocols include ‘file’ for local storage, ‘s3’ for Amazon S3, and ‘gcs’ for Google Cloud Storage. If not provided, it defaults to ‘file’, meaning the catalog operates on the local filesystem.
- fs_storage_options (dict , optional) – The fs storage options.
- min_rows_per_group (int , default 0) – The minimum number of rows per group. When the value is greater than 0, the dataset writer will batch incoming data and only write the row groups to the disk when sufficient rows have accumulated.
- max_rows_per_group (int , default 5000) – The maximum number of rows per group. If the value is greater than 0, then the dataset writer may split up large incoming batches into multiple row groups. If this value is set, then min_rows_per_group should also be set. Otherwise it could end up with very small row groups.
- show_query_paths (bool , default False) – If globed query paths should be printed to stdout.
WARNING
The data catalog is not threadsafe. Using it in a multithreaded environment can lead to unexpected behavior.
classmethod from_env() → ParquetDataCatalog
Create a data catalog instance by accessing the ‘NAUTILUS_PATH’ environment variable.
- Return type: ParquetDataCatalog
- Raises: OSError – If the ‘NAUTILUS_PATH’ environment variable is not set.
classmethod from_uri(uri: str) → ParquetDataCatalog
Create a data catalog instance from the given uri.
- Parameters: uri (str) – The URI string for the backing path.
- Return type: ParquetDataCatalog
write_chunk(data: list[Data], data_cls: type[Data], instrument_id: str | None = None, basename_template: str = 'part-{i}', mode: str = 'overwrite', **kwargs: Any) → None
write_data(data: list[Data | Event] | list[OrderBookDelta | OrderBookDepth10 | QuoteTick | TradeTick | Bar], basename_template: str = 'part-{i}', mode: str = 'overwrite', **kwargs: Any) → None
Write the given data to the catalog.
The function categorizes the data based on their class name and, when applicable, their associated instrument ID. It then delegates the actual writing process to the write_chunk method.
- Parameters:
- data (list *[*Data | Event ]) – The data or event objects to be written to the catalog.
- basename_template (str , default 'part-{i}') – A template string used to generate basenames of written data files. The token ‘{i}’ will be replaced with an automatically incremented integer as files are partitioned. If not specified, it defaults to ‘part-{i}’ + the default extension ‘.parquet’.
- mode (str , optional) – The mode to use when writing data and when not using using the “partitioning” option.
Can be one of the following:
- “append”: Appends the data to the existing data.
- “prepend”: Prepends the data to the existing data.
- “overwrite”: Overwrites the existing data. If not specified, it defaults to ‘overwrite’.
- kwargs (Any) – Additional keyword arguments to be passed to the write_chunk method.
WARNING
Any existing data which already exists under a filename will be overwritten. If a basename_template is not provided, then its very likely existing data for the data type and instrument ID will be overwritten. To prevent data loss, ensure that the basename_template (or the default naming scheme) generates unique filenames for different data sets.
- Raises: ValueError – If data of the same type is not monotonically increasing (or non-decreasing) based on ts_init.
query(data_cls: type, instrument_ids: list[str] | None = None, bar_types: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, where: str | None = None, **kwargs: Any) → list[Data | CustomData]
backend_session(data_cls: type, instrument_ids: list[str] | None = None, bar_types: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, where: str | None = None, session: DataBackendSession | None = None, **kwargs: Any) → DataBackendSession
query_rust(data_cls: type, instrument_ids: list[str] | None = None, bar_types: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, where: str | None = None, **kwargs: Any) → list[Data]
query_pyarrow(data_cls: type, instrument_ids: list[str] | None = None, bar_types: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, filter_expr: str | None = None, **kwargs: Any) → list[Data]
instruments(instrument_type: type | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) → list[Instrument]
list_data_types() → list[str]
list_backtest_runs() → list[str]
list_live_runs() → list[str]
read_live_run(instance_id: str, **kwargs: Any) → list[Data]
read_backtest(instance_id: str, **kwargs: Any) → list[Data]
bars(bar_types: list[str] | None = None, **kwargs: Any) → list[Bar]
convert_stream_to_data(instance_id: UUID4, data_cls: type, other_catalog: ParquetDataCatalog | None = None, **kwargs: Any) → None
custom_data(cls: type, as_nautilus: bool = False, metadata: dict | None = None, **kwargs: Any) → list[CustomData]
instrument_closes(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentClose]
instrument_status(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentStatus]
list_generic_data_types() → list[str]
order_book_deltas(instrument_ids: list[str] | None = None, batched: bool = False, **kwargs: Any) → list[OrderBookDelta] | list[OrderBookDeltas]
order_book_depth10(instrument_ids: list[str] | None = None, **kwargs: Any) → list[OrderBookDepth10]
quote_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[QuoteTick]
trade_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[TradeTick]
class BarDataWrangler
Bases: object
BarDataWrangler(BarType bar_type, Instrument instrument)
Provides a means of building lists of Nautilus Bar objects.
- Parameters:
- bar_type (BarType) – The bar type for the wrangler.
- instrument (Instrument) – The instrument for the wrangler.
bar_type
instrument
process(self, data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0)
Process the given bar dataset into Nautilus Bar objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, if one does not exist then will use the default_volume.
- Parameters:
- data (pd.DataFrame) – The data to process.
- default_volume (float) – The default volume for each bar (if not provided).
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
- Return type: list[Bar]
- Raises: ValueError – If data is empty.
class OrderBookDeltaDataWrangler
Bases: object
OrderBookDeltaDataWrangler(Instrument instrument)
Provides a means of building lists of Nautilus OrderBookDelta objects.
- Parameters: instrument (Instrument) – The instrument for the data wrangler.
instrument
process(self, data: pd.DataFrame, int ts_init_delta: int = 0, bool is_raw=False)
Process the given order book dataset into Nautilus OrderBookDelta objects.
- Parameters:
- data (pd.DataFrame) – The data to process.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
- is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point values.
- Raises: ValueError – If data is empty.
class QuoteTickDataWrangler
Bases: object
QuoteTickDataWrangler(Instrument instrument)
Provides a means of building lists of Nautilus QuoteTick objects.
- Parameters: instrument (Instrument) – The instrument for the data wrangler.
instrument
process(self, data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0)
Process the given tick dataset into Nautilus QuoteTick objects.
Expects columns [‘bid_price’, ‘ask_price’] with ‘timestamp’ index. Note: The ‘bid_size’ and ‘ask_size’ columns are optional, will then use the default_volume.
- Parameters:
- data (pd.DataFrame) – The tick data to process.
- default_volume (float) – The default volume for each tick (if not provided).
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system. Cannot be negative.
- Return type: list[QuoteTick]
process_bar_data(self, bid_data: pd.DataFrame, ask_data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0, int offset_interval_ms: int = 100, bool timestamp_is_close: bool = True, random_seed: int | None = None, bool is_raw: bool = False, bool sort_data: bool = True)
Process the given bar datasets into Nautilus QuoteTick objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, will then use the default_volume.
- Parameters:
- bid_data (pd.DataFrame) – The bid bar data.
- ask_data (pd.DataFrame) – The ask bar data.
- default_volume (float) – The volume per tick if not available from the data.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
- offset_interval_ms (int , default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
- random_seed (int , optional) – The random seed for shuffling order of high and low ticks from bar
data. If random_seed is
None
then won’t shuffle. - is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point values.
- timestamp_is_close (bool , default True) – If bar timestamps are at the close. If True, then open, high, low timestamps are offset before the close timestamp. If False, then high, low, close timestamps are offset after the open timestamp.
- sort_data (bool , default True) – If the data should be sorted by timestamp.
class TradeTickDataWrangler
Bases: object
TradeTickDataWrangler(Instrument instrument)
Provides a means of building lists of Nautilus TradeTick objects.
- Parameters: instrument (Instrument) – The instrument for the data wrangler.
instrument
process(self, data: pd.DataFrame, int ts_init_delta: int = 0, bool is_raw=False)
Process the given trade tick dataset into Nautilus TradeTick objects.
- Parameters:
- data (pd.DataFrame) – The data to process.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
- is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point values.
- Raises: ValueError – If data is empty.
process_bar_data(self, data: pd.DataFrame, int ts_init_delta: int = 0, int offset_interval_ms: int = 100, bool timestamp_is_close: bool = True, random_seed: int | None = None, bool is_raw: bool = False, bool sort_data: bool = True)
Process the given bar datasets into Nautilus QuoteTick objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, will then use the default_volume.
- Parameters:
- data (pd.DataFrame) – The trade bar data.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
- offset_interval_ms (int , default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
- random_seed (int , optional) – The random seed for shuffling order of high and low ticks from bar
data. If random_seed is
None
then won’t shuffle. - is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point.
- timestamp_is_close (bool , default True) – If bar timestamps are at the close. If True, then open, high, low timestamps are offset before the close timestamp. If False, then high, low, close timestamps are offset after the open timestamp.
- sort_data (bool , default True) – If the data should be sorted by timestamp.
processed_data
align_bid_ask_bar_data(bid_data: pd.DataFrame, ask_data: pd.DataFrame)
Merge bid and ask data into a single DataFrame with prefixed column names.
- Parameters:
-
bid_data (pd.DataFrame) – The DataFrame containing bid data.
-
ask_data (pd.DataFrame) – The DataFrame containing ask data.
-
Returns
-
pd.DataFrame – A merged DataFrame with columns prefixed by ‘
bid_
’ for bid data and ‘
ask_
’ for ask data, joined on their indexes.
-
calculate_bar_price_offsets(num_records, timestamp_is_close: bool, int offset_interval_ms: int, random_seed=None)
Calculate and potentially randomize the time offsets for bar prices based on the closeness of the timestamp.
- Parameters:
- num_records (int) – The number of records for which offsets are to be generated.
- timestamp_is_close (bool) – A flag indicating whether the timestamp is close to the trading time.
- offset_interval_ms (int) – The offset interval in milliseconds to be applied.
- random_seed (Optional *[*int ]) – The seed for random number generation to ensure reproducibility.
- Returns: dict – high and low offsets are randomized.
- Return type: A dictionary with arrays of offsets for open, high, low, and close prices. If random_seed is provided,
calculate_volume_quarter(volume: np.ndarray, int precision: int)
Convert raw volume data to quarter precision.
- Parameters:
- volume (np.ndarray) – An array of volume data to be processed.
- precision (int) – The decimal precision to which the volume data is rounded, adjusted by subtracting 9.
- Returns: The volume data adjusted to quarter precision.
- Return type: np.ndarray
prepare_event_and_init_timestamps(index: pd.DatetimeIndex, int ts_init_delta: int)
preprocess_bar_data(data: pd.DataFrame, is_raw: bool)
Preprocess financial bar data to a standardized format.
Ensures the DataFrame index is labeled as “timestamp”, converts the index to UTC, removes time zone awareness, drops rows with NaN values in critical columns, and optionally scales the data.
- Parameters:
- data (pd.DataFrame) – The input DataFrame containing financial bar data.
- is_raw (bool) – A flag to determine whether the data should be scaled. If False, scales the data by 1e9.
- Returns: pd.DataFrame
- Return type: The preprocessed DataFrame with a cleaned and standardized structure.
class RotationMode
Bases: Enum
SIZE = 0
INTERVAL = 1
SCHEDULED_DATES = 2
NO_ROTATION = 3
class StreamingFeatherWriter
Bases: object
Provides a stream writer of Nautilus objects into feather files with rotation capabilities.
- Parameters:
- path (str) – The path to persist the stream to. Must be a directory.
- cache (Cache) – The cache for the query info.
- clock (Clock) – The clock to use for time-related operations.
- fs_protocol (str , default 'file') – The fsspec file system protocol.
- flush_interval_ms (int , optional) – The flush interval (milliseconds) for writing chunks.
- replace (bool , default False) – If existing files at the given path should be replaced.
- include_types (list *[*type ] , optional) – A list of Arrow serializable types to write. If this is specified then only the included types will be written.
- rotation_mode (RotationMode, default RotationMode.NO_ROTATION) – The mode for file rotation.
- max_file_size (int , default 1GB) – The maximum file size in bytes before rotation (for SIZE mode).
- rotation_interval (pd.Timedelta , default 1 day) – The time interval for file rotation (for INTERVAL mode and SCHEDULED_DATES mode).
- rotation_time (datetime.time , default 00:00) – The time of day for file rotation (for SCHEDULED_DATES mode).
- rotation_timezone (str , default 'UTC') – The timezone for rotation calculations(for SCHEDULED_DATES mode).
write(obj: object) → None
Write the object to the stream.
- Parameters: obj (object) – The object to write.
- Raises:
ValueError – If obj is
None
.
check_flush() → None
Flush all stream writers if current time greater than the next flush interval.
flush() → None
Flush all stream writers.
close() → None
Flush and close all stream writers.
get_current_file_info() → dict[str | tuple[str, str], dict[str, Any]]
Get information about the current files being written.
- Returns: A dictionary containing file information for each table.
- Return type: dict[str | tuple[str, str], dict[str, Any]]
get_next_rotation_time(table_name: str | tuple[str, str]) → Timestamp | None
Get the expected time for the next file rotation.
- Parameters: table_name (str | tuple *[*str , str ]) – The specific table name to get the next rotation time for.
- Returns: The next rotation time for the specified table, or None if not set.
- Return type: pd.Timestamp | None
property is_closed : bool
Return whether all file streams are closed.
- Returns: True if all streams are closed, False otherwise.
- Return type: bool