Data
The data subpackage groups components relating to the data stack and data tooling for the platform.
The layered architecture of the data stack somewhat mirrors the execution stack with a central engine, cache layer beneath, database layer beneath, with alternative implementations able to be written on top.
Due to the high-performance, the core components are reusable between both backtest and live implementations - helping to ensure consistent logic for trading operations.
Aggregation
class BarAggregator
Bases: object
BarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None], bool await_partial=False)
Provides a means of aggregating specified bars and sending to a registered handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- await_partial (bool , default False) – If the aggregator should await an initial partial bar prior to aggregating.
- Raises: ValueError – If instrument.id != bar_type.instrument_id.
bar_type
The aggregators bar type.
- Returns: BarType
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
class BarBuilder
Bases: object
BarBuilder(Instrument instrument, BarType bar_type)
Provides a generic bar builder for aggregation.
- Parameters:
- instrument (Instrument) – The instrument for the builder.
- bar_type (BarType) – The bar type for the builder.
- Raises: ValueError – If instrument.id != bar_type.instrument_id.
build(self, uint64_t ts_event, uint64_t ts_init) → Bar
Return the aggregated bar with the given closing timestamp, and reset.
- Parameters:
- ts_event (uint64_t) – UNIX timestamp (nanoseconds) for the bar event.
- ts_init (uint64_t) – UNIX timestamp (nanoseconds) for the bar initialization.
- Return type: Bar
build_now(self) → Bar
Return the aggregated bar and reset.
- Return type: Bar
count
The builders current update count.
- Returns: int
initialized
If the builder is initialized.
- Returns: bool
price_precision
The price precision for the builders instrument.
- Returns: uint8
reset(self) → void
Reset the bar builder.
All stateful fields are reset to their initial value.
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
size_precision
The size precision for the builders instrument.
- Returns: uint8
ts_last
UNIX timestamp (nanoseconds) when the builder last updated.
- Returns: uint64_t
update(self, Price price, Quantity size, uint64_t ts_event) → void
Update the bar builder.
- Parameters:
- price (Price) – The update price.
- size (Decimal) – The update size.
- ts_event (uint64_t) – UNIX timestamp (nanoseconds) of the update.
update_bar(self, Bar bar, Quantity volume, uint64_t ts_init) → void
Update the bar builder.
- Parameters: bar (Bar) – The update Bar.
class TickBarAggregator
Bases: BarAggregator
TickBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None])
Provides a means of building tick bars from ticks.
When received tick count reaches the step threshold of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- Raises: ValueError – If instrument.id != bar_type.instrument_id.
bar_type
The aggregators bar type.
- Returns: BarType
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
class TimeBarAggregator
Bases: BarAggregator
TimeBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None], Clock clock, bool build_with_no_updates=True, bool timestamp_on_close=True, unicode interval_type=u’left-open’, time_bars_origin=None, int composite_bar_build_delay=15)
Provides a means of building time bars from ticks with an internal timer.
When the time reaches the next time interval of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- clock (Clock) – The clock for the aggregator.
- build_with_no_updates (bool , default True) – If build and emit bars with no new market updates.
- timestamp_on_close (bool , default True) – If True, then timestamp will be the bar close’s time. If False, then timestamp will be the bar open’s time.
- interval_type (str , default 'left-open') – Determines the type of interval used for time aggregation.
- ‘left-open’: start time is excluded and end time is included (default).
- ‘right-open’: start time is included and end time is excluded.
- Raises: ValueError – If instrument.id != bar_type.instrument_id.
bar_type
The aggregators bar type.
- Returns: BarType
get_start_time(self, datetime now: datetime, enable_delay: bool = True) → datetime
Return the start time for the aggregators next bar.
- Returns: The timestamp (UTC).
- Return type: datetime
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
interval
The aggregators time interval.
- Returns: timedelta
interval_ns
The aggregators time interval.
- Returns: uint64_t
next_close_ns
The aggregators next closing time.
- Returns: uint64_t
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop(self) → void
Stop the bar aggregator.
stop_batch_update(self) → None
class ValueBarAggregator
Bases: BarAggregator
ValueBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None])
Provides a means of building value bars from ticks.
When received value reaches the step threshold of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- Raises: ValueError – If instrument.id != bar_type.instrument_id.
bar_type
The aggregators bar type.
- Returns: BarType
get_cumulative_value(self)
Return the current cumulative value of the aggregator.
- Return type: Decimal
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
class VolumeBarAggregator
Bases: BarAggregator
VolumeBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None])
Provides a means of building volume bars from ticks.
When received volume reaches the step threshold of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- Raises: ValueError – If instrument.id != bar_type.instrument_id.
bar_type
The aggregators bar type.
- Returns: BarType
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
Client
class DataClient
Bases: Component
DataClient(ClientId client_id, MessageBus msgbus, Cache cache, Clock clock, Venue venue: Venue | None = None, config: NautilusConfig | None = None)
The base class for all data clients.
- Parameters:
- client_id (ClientId) – The data client ID.
- msgbus (MessageBus) – The message bus for the client.
- clock (Clock) – The clock for the client.
- venue (Venue , optional) – The client venue. If multi-venue then can be
None
. - config (NautilusConfig , optional) – The configuration for the instance.
WARNING
This class should not be used directly, but through a concrete subclass.
degrade(self) → void
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component
will remain in a DEGRADING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
dispose(self) → void
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component
will remain in a DISPOSING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
fault(self) → void
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component
will remain in a FAULTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
classmethod fully_qualified_name(cls) → str
Return the fully qualified name for the components class.
- Return type: str
id
The components ID.
- Returns: ComponentId
is_connected
If the client is connected.
- Returns: bool
is_degraded
bool
Return whether the current component state is DEGRADED
.
- Return type: bool
- Type: Component.is_degraded
is_disposed
bool
Return whether the current component state is DISPOSED
.
- Return type: bool
- Type: Component.is_disposed
is_faulted
bool
Return whether the current component state is FAULTED
.
- Return type: bool
- Type: Component.is_faulted
is_initialized
bool
Return whether the component has been initialized (component.state >= INITIALIZED
).
- Return type: bool
- Type: Component.is_initialized
is_running
bool
Return whether the current component state is RUNNING
.
- Return type: bool
- Type: Component.is_running
is_stopped
bool
Return whether the current component state is STOPPED
.
- Return type: bool
- Type: Component.is_stopped
request(self, DataType data_type, UUID4 correlation_id, dict params=None) → void
Request data for the given data type.
- Parameters:
reset(self) → void
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component
will remain in a RESETTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
resume(self) → void
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component
will remain in a RESUMING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
shutdown_system(self, unicode reason=None) → void
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters: reason (str , optional) – The reason for issuing the shutdown command.
start(self) → void
Start the component.
While executing on_start() any exception will be logged and reraised, then the component
will remain in a STARTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
state
ComponentState
Return the components current state.
- Return type: ComponentState
- Type: Component.state
stop(self) → void
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component
will remain in a STOPPING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
subscribe(self, DataType data_type, dict params=None) → void
Subscribe to data for the given data type.
- Parameters: data_type (DataType) – The data type for the subscription.
subscribed_custom_data(self) → list
Return the custom data types subscribed to.
- Return type: list[DataType]
trader_id
The trader ID associated with the component.
- Returns: TraderId
type
The components type.
- Returns: type
unsubscribe(self, DataType data_type, dict params=None) → void
Unsubscribe from data for the given data type.
- Parameters: data_type (DataType) – The data type for the subscription.
venue
The clients venue ID (if applicable).
- Returns:
Venue or
None
class MarketDataClient
Bases: DataClient
MarketDataClient(ClientId client_id, MessageBus msgbus, Cache cache, Clock clock, Venue venue: Venue | None = None, config: NautilusConfig | None = None)
The base class for all market data clients.
- Parameters:
- client_id (ClientId) – The data client ID.
- msgbus (MessageBus) – The message bus for the client.
- cache (Cache) – The cache for the client.
- clock (Clock) – The clock for the client.
- venue (Venue , optional) – The client venue. If multi-venue then can be
None
. - config (NautilusConfig , optional) – The configuration for the instance.
WARNING
This class should not be used directly, but through a concrete subclass.
degrade(self) → void
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component
will remain in a DEGRADING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
dispose(self) → void
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component
will remain in a DISPOSING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
fault(self) → void
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component
will remain in a FAULTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
classmethod fully_qualified_name(cls) → str
Return the fully qualified name for the components class.
- Return type: str
id
The components ID.
- Returns: ComponentId
is_connected
If the client is connected.
- Returns: bool
is_degraded
bool
Return whether the current component state is DEGRADED
.
- Return type: bool
- Type: Component.is_degraded
is_disposed
bool
Return whether the current component state is DISPOSED
.
- Return type: bool
- Type: Component.is_disposed
is_faulted
bool
Return whether the current component state is FAULTED
.
- Return type: bool
- Type: Component.is_faulted
is_initialized
bool
Return whether the component has been initialized (component.state >= INITIALIZED
).
- Return type: bool
- Type: Component.is_initialized
is_running
bool
Return whether the current component state is RUNNING
.
- Return type: bool
- Type: Component.is_running
is_stopped
bool
Return whether the current component state is STOPPED
.
- Return type: bool
- Type: Component.is_stopped
request(self, DataType data_type, UUID4 correlation_id, dict params=None) → void
Request data for the given data type.
- Parameters:
request_bars(self, BarType bar_type, int limit, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void
Request historical Bar data. To load historical data from a catalog, you can pass a list[DataCatalogConfig] to the TradingNodeConfig or the BacktestEngineConfig.
- Parameters:
- bar_type (BarType) – The bar type for the request.
- limit (int) – The limit for the number of returned bars.
- correlation_id (UUID4) – The correlation ID for the request.
- start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
- end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
- params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.
request_instrument(self, InstrumentId instrument_id, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void
Request Instrument data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The instrument ID for the request.
- correlation_id (UUID4) – The correlation ID for the request.
- start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
- end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
- params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.
request_instruments(self, Venue venue, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void
Request all Instrument data for the given venue.
- Parameters:
- venue (Venue) – The venue for the request.
- correlation_id (UUID4) – The correlation ID for the request.
- start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
- end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
- params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.
request_order_book_snapshot(self, InstrumentId instrument_id, int limit, UUID4 correlation_id, dict params=None) → void
Request order book snapshot data.
- Parameters:
- instrument_id (InstrumentId) – The instrument ID for the order book snapshot request.
- limit (int) – The limit on the depth of the order book snapshot.
- correction_id (UUID4) – The correlation ID for the request.
request_quote_ticks(self, InstrumentId instrument_id, int limit, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void
Request historical QuoteTick data.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument ID for the request.
- limit (int) – The limit for the number of returned ticks.
- correlation_id (UUID4) – The correlation ID for the request.
- start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
- end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
- params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.
request_trade_ticks(self, InstrumentId instrument_id, int limit, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void
Request historical TradeTick data.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument ID for the request.
- limit (int) – The limit for the number of returned ticks.
- correlation_id (UUID4) – The correlation ID for the request.
- start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
- end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
- params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.
reset(self) → void
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component
will remain in a RESETTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
resume(self) → void
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component
will remain in a RESUMING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
shutdown_system(self, unicode reason=None) → void
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters: reason (str , optional) – The reason for issuing the shutdown command.
start(self) → void
Start the component.
While executing on_start() any exception will be logged and reraised, then the component
will remain in a STARTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
state
ComponentState
Return the components current state.
- Return type: ComponentState
- Type: Component.state
stop(self) → void
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component
will remain in a STOPPING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
subscribe(self, DataType data_type, dict params=None) → void
Subscribe to data for the given data type.
- Parameters: data_type (DataType) – The data type for the subscription.
subscribe_bars(self, BarType bar_type, dict params=None) → void
Subscribe to Bar data for the given bar type.
- Parameters:
- bar_type (BarType) – The bar type to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instrument(self, InstrumentId instrument_id, dict params=None) → void
Subscribe to the Instrument with the given instrument ID.
- Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instrument_close(self, InstrumentId instrument_id, dict params=None) → void
Subscribe to InstrumentClose updates for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instrument_status(self, InstrumentId instrument_id, dict params=None) → void
Subscribe to InstrumentStatus data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instruments(self, dict params=None) → void
Subscribe to all Instrument data.
- Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_order_book_deltas(self, InstrumentId instrument_id, BookType book_type, int depth=0, dict params=None) → void
Subscribe to OrderBookDeltas data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The order book instrument to subscribe to.
- book_type (BookType {
L1_MBP
,L2_MBP
,L3_MBO
}) – The order book type. - depth (int , optional , default None) – The maximum depth for the subscription.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_order_book_snapshots(self, InstrumentId instrument_id, BookType book_type, int depth=0, dict params=None) → void
Subscribe to OrderBook snapshots data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The order book instrument to subscribe to.
- book_type (BookType {
L1_MBP
,L2_MBP
,L3_MBO
}) – The order book level. - depth (int , optional) – The maximum depth for the order book. A depth of 0 is maximum depth.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_quote_ticks(self, InstrumentId instrument_id, dict params=None) → void
Subscribe to QuoteTick data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_trade_ticks(self, InstrumentId instrument_id, dict params=None) → void
Subscribe to TradeTick data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribed_bars(self) → list
Return the bar types subscribed to.
- Return type: list[BarType]
subscribed_custom_data(self) → list
Return the custom data types subscribed to.
- Return type: list[DataType]
subscribed_instrument_close(self) → list
Return the instrument closes subscribed to.
- Return type: list[InstrumentId]
subscribed_instrument_status(self) → list
Return the status update instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_instruments(self) → list
Return the instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_order_book_deltas(self) → list
Return the order book delta instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_order_book_snapshots(self) → list
Return the order book snapshot instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_quote_ticks(self) → list
Return the quote tick instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_trade_ticks(self) → list
Return the trade tick instruments subscribed to.
- Return type: list[InstrumentId]
trader_id
The trader ID associated with the component.
- Returns: TraderId
type
The components type.
- Returns: type
unsubscribe(self, DataType data_type, dict params=None) → void
Unsubscribe from data for the given data type.
- Parameters: data_type (DataType) – The data type for the subscription.
unsubscribe_bars(self, BarType bar_type, dict params=None) → void
Unsubscribe from Bar data for the given bar type.
- Parameters:
- bar_type (BarType) – The bar type to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_instrument(self, InstrumentId instrument_id, dict params=None) → void
Unsubscribe from Instrument data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The instrument to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_instrument_close(self, InstrumentId instrument_id, dict params=None) → void
Unsubscribe from InstrumentClose data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_instrument_status(self, InstrumentId instrument_id, dict params=None) → void
Unsubscribe from InstrumentStatus data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The instrument status updates to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_instruments(self, dict params=None) → void
Unsubscribe from all Instrument data.
- Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_order_book_deltas(self, InstrumentId instrument_id, dict params=None) → void
Unsubscribe from OrderBookDeltas data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The order book instrument to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_order_book_snapshots(self, InstrumentId instrument_id, dict params=None) → void
Unsubscribe from OrderBook snapshots data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The order book instrument to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_quote_ticks(self, InstrumentId instrument_id, dict params=None) → void
Unsubscribe from QuoteTick data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
unsubscribe_trade_ticks(self, InstrumentId instrument_id, dict params=None) → void
Unsubscribe from TradeTick data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
venue
The clients venue ID (if applicable).
- Returns:
Venue or
None
Engine
The DataEngine is the central component of the entire data stack.
The data engines primary responsibility is to orchestrate interactions between the DataClient instances, and the rest of the platform. This includes sending requests to, and receiving responses from, data endpoints via its registered data clients.
The engine employs a simple fan-in fan-out messaging pattern to execute DataCommand type messages, and process DataResponse messages or market data objects.
Alternative implementations can be written on top of the generic engine - which just need to override the execute, process, send and receive methods.
class DataEngine
Bases: Component
DataEngine(MessageBus msgbus, Cache cache, Clock clock, config: DataEngineConfig | None = None) -> None
Provides a high-performance data engine for managing many DataClient instances, for the asynchronous ingest of data.
- Parameters:
- msgbus (MessageBus) – The message bus for the engine.
- cache (Cache) – The cache for the engine.
- clock (Clock) – The clock for the engine.
- config (DataEngineConfig , optional) – The configuration for the instance.
check_connected(self) → bool
Check all of the engines clients are connected.
- Returns: True if all clients connected, else False.
- Return type: bool
check_disconnected(self) → bool
Check all of the engines clients are disconnected.
- Returns: True if all clients disconnected, else False.
- Return type: bool
command_count
The total count of data commands received by the engine.
- Returns: int
connect(self) → None
Connect the engine by calling connect on all registered clients.
data_count
The total count of data stream objects received by the engine.
- Returns: int
debug
If debug mode is active (will provide extra debug logging).
- Returns: bool
default_client
ClientId | None
Return the default data client registered with the engine.
- Return type:
ClientId or
None
- Type: DataEngine.default_client
degrade(self) → void
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component
will remain in a DEGRADING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
deregister_client(self, DataClient client) → void
Deregister the given data client from the data engine.
- Parameters: client (DataClient) – The data client to deregister.
disconnect(self) → None
Disconnect the engine by calling disconnect on all registered clients.
dispose(self) → void
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component
will remain in a DISPOSING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
execute(self, DataCommand command) → void
Execute the given data command.
- Parameters: command (DataCommand) – The command to execute.
fault(self) → void
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component
will remain in a FAULTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
classmethod fully_qualified_name(cls) → str
Return the fully qualified name for the components class.
- Return type: str
id
The components ID.
- Returns: ComponentId
is_degraded
bool
Return whether the current component state is DEGRADED
.
- Return type: bool
- Type: Component.is_degraded
is_disposed
bool
Return whether the current component state is DISPOSED
.
- Return type: bool
- Type: Component.is_disposed
is_faulted
bool
Return whether the current component state is FAULTED
.
- Return type: bool
- Type: Component.is_faulted
is_initialized
bool
Return whether the component has been initialized (component.state >= INITIALIZED
).
- Return type: bool
- Type: Component.is_initialized
is_running
bool
Return whether the current component state is RUNNING
.
- Return type: bool
- Type: Component.is_running
is_stopped
bool
Return whether the current component state is STOPPED
.
- Return type: bool
- Type: Component.is_stopped
process(self, Data data) → void
Process the given data.
- Parameters: data (Data) – The data to process.
register_catalog(self, catalog: ParquetDataCatalog, unicode name: str = u'catalog_0') → None
Register the given data catalog with the engine.
- Parameters:
- catalog (ParquetDataCatalog) – The data catalog to register.
- name (str , default 'catalog_0') – The name of the catalog to register.
register_client(self, DataClient client) → void
Register the given data client with the data engine.
- Parameters: client (DataClient) – The client to register.
- Raises: ValueError – If client is already registered.
register_default_client(self, DataClient client) → void
Register the given client as the default routing client (when a specific venue routing cannot be found).
Any existing default routing client will be overwritten.
- Parameters: client (DataClient) – The client to register.
register_venue_routing(self, DataClient client, Venue venue) → void
Register the given client to route messages to the given venue.
Any existing client in the routing map for the given venue will be overwritten.
- Parameters:
- venue (Venue) – The venue to route messages to.
- client (DataClient) – The client for the venue routing.
registered_clients
list[ClientId]
Return the execution clients registered with the engine.
- Return type: list[ClientId]
- Type: DataEngine.registered_clients
request(self, DataRequest request) → void
Handle the given request.
- Parameters: request (DataRequest) – The request to handle.
request_count
The total count of data requests received by the engine.
- Returns: int
reset(self) → void
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component
will remain in a RESETTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
response(self, DataResponse response) → void
Handle the given response.
- Parameters: response (DataResponse) – The response to handle.
response_count
The total count of data responses received by the engine.
- Returns: int
resume(self) → void
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component
will remain in a RESUMING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
shutdown_system(self, unicode reason=None) → void
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters: reason (str , optional) – The reason for issuing the shutdown command.
start(self) → void
Start the component.
While executing on_start() any exception will be logged and reraised, then the component
will remain in a STARTING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
state
ComponentState
Return the components current state.
- Return type: ComponentState
- Type: Component.state
stop(self) → void
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component
will remain in a STOPPING
state.
WARNING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
stop_clients(self) → void
Stop the registered clients.
subscribed_bars(self) → list
Return the bar types subscribed to.
- Return type: list[BarType]
subscribed_custom_data(self) → list
Return the custom data types subscribed to.
- Return type: list[DataType]
subscribed_instrument_close(self) → list
Return the close price instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_instrument_status(self) → list
Return the status update instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_instruments(self) → list
Return the instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_order_book_deltas(self) → list
Return the order book delta instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_order_book_snapshots(self) → list
Return the order book snapshot instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_quote_ticks(self) → list
Return the quote tick instruments subscribed to.
- Return type: list[InstrumentId]
subscribed_synthetic_quotes(self) → list
Return the synthetic instrument quotes subscribed to.
- Return type: list[InstrumentId]
subscribed_synthetic_trades(self) → list
Return the synthetic instrument trades subscribed to.
- Return type: list[InstrumentId]
subscribed_trade_ticks(self) → list
Return the trade tick instruments subscribed to.
- Return type: list[InstrumentId]
trader_id
The trader ID associated with the component.
- Returns: TraderId
type
The components type.
- Returns: type
class SnapshotInfo
Bases: object
Messages
class DataCommand
Bases: Command
DataCommand(ClientId client_id: ClientId | None, Venue venue: Venue | None, DataType data_type, UUID4 command_id, uint64_t ts_init, dict params: dict | None = None)
The base class for all data commands.
- Parameters:
- client_id (ClientId or
None
) – The data client ID for the command. - venue (Venue or
None
) – The venue for the command. - data_type (type) – The data type for the command.
- command_id (UUID4) – The command ID.
- ts_init (uint64_t) – UNIX timestamp (nanoseconds) when the object was initialized.
- params (dict *[*str , object ] , optional) – Additional parameters for the command.
- client_id (ClientId or
- Raises:
ValueError – If both client_id and venue are both
None
(not enough routing info).
WARNING
This class should not be used directly, but through a concrete subclass.
client_id
The data client ID for the command.
- Returns:
ClientId or
None
data_type
The command data type.
- Returns: type
id
The command message ID.
- Returns: UUID4
params
Additional specific parameters for the command.
- Returns:
dict[str, object] or
None
ts_init
UNIX timestamp (nanoseconds) when the object was initialized.
- Returns: uint64_t
venue
The venue for the command.
- Returns:
Venue or
None
class DataRequest
Bases: Request
DataRequest(ClientId client_id: ClientId | None, Venue venue: Venue | None, DataType data_type, callback: Callable[[Any], None], UUID4 request_id, uint64_t ts_init, dict params: dict | None = None)
Represents a request for data.
- Parameters:
- client_id (ClientId or
None
) – The data client ID for the request. - venue (Venue or
None
) – The venue for the request. - data_type (type) – The data type for the request.
- callback (Callable [ *[*Any ] , None ]) – The delegate to call with the data.
- request_id (UUID4) – The request ID.
- ts_init (uint64_t) – UNIX timestamp (nanoseconds) when the object was initialized.
- params (dict *[*str , object ] , optional) – Additional parameters for the request.
- client_id (ClientId or
- Raises:
ValueError – If both client_id and venue are both
None
(not enough routing info).
callback
The callback for the response.
- Returns: Callable
client_id
The data client ID for the request.
- Returns:
ClientId or
None
data_type
The request data type.
- Returns: type
id
The request message ID.
- Returns: UUID4
params
Additional specific parameters for the command.
- Returns:
dict[str, object] or
None
ts_init
UNIX timestamp (nanoseconds) when the object was initialized.
- Returns: uint64_t
venue
The venue for the request.
- Returns:
Venue or
None
class DataResponse
Bases: Response
DataResponse(ClientId client_id: ClientId | None, Venue venue: Venue | None, DataType data_type, data, UUID4 correlation_id, UUID4 response_id, uint64_t ts_init, dict params: dict | None = None)
Represents a response with data.
- Parameters:
- client_id (ClientId or
None
) – The data client ID of the response. - venue (Venue or
None
) – The venue for the response. - data_type (type) – The data type of the response.
- data (object) – The data of the response.
- correlation_id (UUID4) – The correlation ID.
- response_id (UUID4) – The response ID.
- ts_init (uint64_t) – UNIX timestamp (nanoseconds) when the object was initialized.
- params (dict *[*str , object ] , optional) – Additional parameters for the response.
- client_id (ClientId or
- Raises:
ValueError – If both client_id and venue are both
None
(not enough routing info).
client_id
The data client ID for the response.
- Returns:
ClientId or
None
correlation_id
The response correlation ID.
- Returns: UUID4
data
The response data.
- Returns: object
data_type
The response data type.
- Returns: type
id
The response message ID.
- Returns: UUID4
params
Additional specific parameters for the response.
- Returns:
dict[str, object] or
None
ts_init
UNIX timestamp (nanoseconds) when the object was initialized.
- Returns: uint64_t
venue
The venue for the response.
- Returns:
Venue or
None
class Subscribe
Bases: DataCommand
Subscribe(ClientId client_id: ClientId | None, Venue venue: Venue | None, DataType data_type, UUID4 command_id, uint64_t ts_init, dict params: dict | None = None)
Represents a command to subscribe to data.
- Parameters:
- client_id (ClientId or
None
) – The data client ID for the command. - venue (Venue or
None
) – The venue for the command. - data_type (type) – The data type for the subscription.
- command_id (UUID4) – The command ID.
- ts_init (uint64_t) – UNIX timestamp (nanoseconds) when the object was initialized.
- params (dict *[*str , object ] , optional) – Additional parameters for the subscription.
- client_id (ClientId or
- Raises:
ValueError – If both client_id and venue are both
None
(not enough routing info).
client_id
The data client ID for the command.
- Returns:
ClientId or
None
data_type
The command data type.
- Returns: type
id
The command message ID.
- Returns: UUID4
params
Additional specific parameters for the command.
- Returns:
dict[str, object] or
None
ts_init
UNIX timestamp (nanoseconds) when the object was initialized.
- Returns: uint64_t
venue
The venue for the command.
- Returns:
Venue or
None
class Unsubscribe
Bases: DataCommand
Unsubscribe(ClientId client_id: ClientId | None, Venue venue: Venue | None, DataType data_type, UUID4 command_id, uint64_t ts_init, dict params: dict | None = None)
Represents a command to unsubscribe from data.
- Parameters:
- client_id (ClientId or
None
) – The data client ID for the command. - venue (Venue or
None
) – The venue for the command. - data_type (type) – The data type to unsubscribe from.
- command_id (UUID4) – The command ID.
- ts_init (uint64_t) – UNIX timestamp (nanoseconds) when the object was initialized.
- params (dict *[*str , object ] , optional) – Additional parameters for the subscription.
- client_id (ClientId or
- Raises:
ValueError – If both client_id and venue are both
None
(not enough routing info).
client_id
The data client ID for the command.
- Returns:
ClientId or
None
data_type
The command data type.
- Returns: type
id
The command message ID.
- Returns: UUID4
params
Additional specific parameters for the command.
- Returns:
dict[str, object] or
None
ts_init
UNIX timestamp (nanoseconds) when the object was initialized.
- Returns: uint64_t
venue
The venue for the command.
- Returns:
Venue or
None