Custom Data
Due to the modular nature of the Nautilus design, it is possible to set up systems with very flexible data streams, including custom user-defined data types. This guide covers some possible use cases for this functionality.
It's possible to create custom data types within the Nautilus system. First you
will need to define your data by subclassing from Data
.
As Data
holds no state, it is not strictly necessary to call super().__init__()
.
from nautilus_trader.core.data import Data
class MyDataPoint(Data):
"""
This is an example of a user-defined data class, inheriting from the base class `Data`.
The fields `label`, `x`, `y`, and `z` in this class are examples of arbitrary user data.
"""
def __init__(
self,
label: str,
x: int,
y: int,
z: int,
ts_event: int,
ts_init: int,
) -> None:
self.label = label
self.x = x
self.y = y
self.z = z
self._ts_event = ts_event
self._ts_init = ts_init
@property
def ts_event(self) -> int:
"""
UNIX timestamp (nanoseconds) when the data event occurred.
Returns
-------
int
"""
return self._ts_event
@property
def ts_init(self) -> int:
"""
UNIX timestamp (nanoseconds) when the object was initialized.
Returns
-------
int
"""
return self._ts_init
The Data
abstract base class acts as a contract within the system and requires two properties
for all types of data: ts_event
and ts_init
. These represent the UNIX nanosecond timestamps
for when the event occurred and when the object was initialized, respectively.
The recommended approach to satisfy the contract is to assign ts_event
and ts_init
to backing fields, and then implement the @property
for each as shown above
(for completeness, the docstrings are copied from the Data
base class).
These timestamps enable Nautilus to correctly order data streams for backtests
using monotonically increasing ts_init
UNIX nanoseconds.
We can now work with this data type for backtesting and live trading. For instance,
we could now create an adapter which is able to parse and create objects of this
type - and send them back to the DataEngine
for consumption by subscribers.
You can subscribe to these custom data types within your actor/strategy in the following way:
self.subscribe_data(
data_type=DataType(MyDataPoint, metadata={"some_optional_category": 1}),
client_id=ClientId("MY_ADAPTER"),
)
This will result in your actor/strategy passing these received MyDataPoint
objects to your on_data
method. You will need to check the type, as this
method acts as a flexible handler for all custom data.
def on_data(self, data: Data) -> None:
# First check the type of data
if isinstance(data, MyDataPoint):
# Do something with the data
Publishing and receiving signal data
Here is an example of publishing and receiving signal data using the MessageBus
from an actor or strategy.
A signal is an automatically generated custom data identified by a name containing only one value of a basic type
(str, float, int, bool or bytes).
self.publish_signal("signal_name", value, ts_event)
self.subscribe_signal("signal_name")
def on_signal(self, signal):
print("Signal", data)
Option Greeks example
This example demonstrates how to create a custom data type for option Greeks, specifically the delta.
By following these steps, you can create custom data types, subscribe to them, publish them, and store
them in the Cache
or ParquetDataCatalog
for efficient retrieval.
import msgspec
from nautilus_trader.core.data import Data
from nautilus_trader.model.data import DataType
from nautilus_trader.serialization.base import register_serializable_type
from nautilus_trader.serialization.arrow.serializer import register_arrow
import pyarrow as pa
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.core.datetime import dt_to_unix_nanos, unix_nanos_to_dt, format_iso8601
def unix_nanos_to_str(unix_nanos):
return format_iso8601(unix_nanos_to_dt(unix_nanos))
class GreeksData(Data):
def __init__(
self, instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
ts_event: int = 0,
ts_init: int = 0,
delta: float = 0.0,
) -> None:
self.instrument_id = instrument_id
self._ts_event = ts_event
self._ts_init = ts_init
self.delta = delta
def __repr__(self):
return (f"GreeksData(ts_init={unix_nanos_to_str(self._ts_init)}, instrument_id={self.instrument_id}, delta={self.delta:.2f})")
@property
def ts_event(self):
return self._ts_event
@property
def ts_init(self):
return self._ts_init
def to_dict(self):
return {
"instrument_id": self.instrument_id.value,
"ts_event": self._ts_event,
"ts_init": self._ts_init,
"delta": self.delta,
}
@classmethod
def from_dict(cls, data: dict):
return GreeksData(InstrumentId.from_str(data["instrument_id"]), data["ts_event"], data["ts_init"], data["delta"])
def to_bytes(self):
return msgspec.msgpack.encode(self.to_dict())
@classmethod
def from_bytes(cls, data: bytes):
return cls.from_dict(msgspec.msgpack.decode(data))
def to_catalog(self):
return pa.RecordBatch.from_pylist([self.to_dict()], schema=GreeksData.schema())
@classmethod
def from_catalog(cls, table: pa.Table):
return [GreeksData.from_dict(d) for d in table.to_pylist()]
@classmethod
def schema(cls):
return pa.schema(
{
"instrument_id": pa.string(),
"ts_event": pa.int64(),
"ts_init": pa.int64(),
"delta": pa.float64(),
}
)
Publishing and receiving data
Here is an example of publishing and receiving data using the MessageBus
from an actor or strategy:
register_serializable_type(GreeksData, GreeksData.to_dict, GreeksData.from_dict)
def publish_greeks(self, greeks_data: GreeksData):
self.publish_data(DataType(GreeksData), greeks_data)
def subscribe_to_greeks(self):
self.subscribe_data(DataType(GreeksData))
def on_data(self, data):
if isinstance(GreeksData):
print("Data", data)
Writing and reading data using the cache
Here is an example of writing and reading data using the Cache
from an actor or strategy:
def greeks_key(instrument_id: InstrumentId):
return f"{instrument_id}_GREEKS"
def cache_greeks(self, greeks_data: GreeksData):
self.cache.add(greeks_key(greeks_data.instrument_id), greeks_data.to_bytes())
def greeks_from_cache(self, instrument_id: InstrumentId):
return GreeksData.from_bytes(self.cache.get(greeks_key(instrument_id)))
Writing and reading data using a catalog
For streaming custom data to feather files or writing it to parquet files in a catalog
(register_arrow
needs to be used):
register_arrow(GreeksData, GreeksData.schema(), GreeksData.to_catalog, GreeksData.from_catalog)
from nautilus_trader.persistence.catalog import ParquetDataCatalog
catalog = ParquetDataCatalog('.')
catalog.write_data([GreeksData()])
Creating a custom data class automatically
The @customdataclass
decorator enables the creation of a custom data class with default
implementations for all the features described above.
Each method can also be overridden if needed. Here is an example of its usage:
from nautilus_trader.model.custom import customdataclass
@customdataclass
class GreeksTestData(Data):
instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX")
delta: float = 0.0
GreeksTestData(
instrument_id=InstrumentId.from_str("CL.GLBX"),
delta=1000.0,
ts_event=1,
ts_init=2,
)
Custom data type stub
To enhance development convenience and improve code suggestions in your IDE, you can create a .pyi
stub file with the proper constructor signature for your custom data types as well as type hints for attributes.
This is particularly useful when the constructor is dynamically generated at runtime, as it allows the IDE to recognize
and provide suggestions for the class's methods and attributes.
For instance, if you have a custom data class defined in greeks.py
, you can create a corresponding greeks.pyi
file
with the following constructor signature:
from nautilus_trader.core.data import Data
from nautilus_trader.model.identifiers import InstrumentId
class GreeksData(Data):
instrument_id: InstrumentId
delta: float
def __init__(
self,
ts_event: int = 0,
ts_init: int = 0,
instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
delta: float = 0.0,
) -> GreeksData: ...