The data subpackage groups components relating to the data stack and data tooling for the platform.
The layered architecture of the data stack somewhat mirrors the execution stack with a central engine, cache layer beneath, database layer beneath, with alternative implementations able to be written on top.
Due to the high-performance, the core components are reusable between both backtest and live implementations - helping to ensure consistent logic for trading operations.
class BarAggregator
Bases: object
BarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None], bool await_partial=False) -> None Provides a means of aggregating specified bars and sending to a registered handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- await_partial (bool , default False) – If the aggregator should await an initial partial bar prior to aggregating.
- Raises: ValueError – If != bar_type.instrument_id.
The aggregators bar type.
- Returns: BarType
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
- Type: is_running
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
class BarBuilder
Bases: object
BarBuilder(Instrument instrument, BarType bar_type) -> None Provides a generic bar builder for aggregation.
- Parameters:
- instrument (Instrument) – The instrument for the builder.
- bar_type (BarType) – The bar type for the builder.
- Raises: ValueError – If != bar_type.instrument_id.
build(self, uint64_t ts_event, uint64_t ts_init) → Bar
Return the aggregated bar with the given closing timestamp, and reset.
- Parameters:
- ts_event (uint64_t) – UNIX timestamp (nanoseconds) for the bar event.
- ts_init (uint64_t) – UNIX timestamp (nanoseconds) for the bar initialization.
- Return type: Bar
build_now(self) → Bar
Return the aggregated bar and reset.
- Return type: Bar
The builders current update count.
- Returns: int
If the builder is initialized.
- Returns: bool
The price precision for the builders instrument.
- Returns: uint8
reset(self) → void
Reset the bar builder.
All stateful fields are reset to their initial value.
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
The size precision for the builders instrument.
- Returns: uint8
UNIX timestamp (nanoseconds) when the builder last updated.
- Returns: uint64_t
update(self, Price price, Quantity size, uint64_t ts_event) → void
Update the bar builder.
- Parameters:
- price (Price) – The update price.
- size (Decimal) – The update size.
- ts_event (uint64_t) – UNIX timestamp (nanoseconds) of the update.
update_bar(self, Bar bar, Quantity volume, uint64_t ts_init) → void
Update the bar builder.
- Parameters: bar (Bar) – The update Bar.
class TickBarAggregator
Bases: BarAggregator
TickBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None]) -> None Provides a means of building tick bars from ticks.
When received tick count reaches the step threshold of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- Raises: ValueError – If != bar_type.instrument_id.
The aggregators bar type.
- Returns: BarType
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
- Type: is_running
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
class TimeBarAggregator
Bases: BarAggregator
TimeBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None], Clock clock, str interval_type=’left-open’, bool timestamp_on_close=True, bool skip_first_non_full_bar=False, bool build_with_no_updates=True, time_bars_origin: pd.Timedelta | pd.DateOffset = None, int composite_bar_build_delay=15) -> None Provides a means of building time bars from ticks with an internal timer.
When the time reaches the next time interval of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- clock (Clock) – The clock for the aggregator.
- interval_type (str , default 'left-open') – Determines the type of interval used for time aggregation.
- ‘left-open’: start time is excluded and end time is included (default).
- ‘right-open’: start time is included and end time is excluded.
- timestamp_on_close (bool , default True) – If True, then timestamp will be the bar close time. If False, then timestamp will be the bar open time.
- skip_first_non_full_bar (bool , default False) – If will skip emitting a bar if the aggregation starts mid-interval.
- build_with_no_updates (bool , default True) – If build and emit bars with no new market updates.
- time_bars_origin (pd.Timedelta or pd.DateOffset , optional) – The origin time offset.
- composite_bar_build_delay (int , default 15) – The time delay (microseconds) before building and emitting a composite bar type.
- Raises: ValueError – If != bar_type.instrument_id.
The aggregators bar type.
- Returns: BarType
get_start_time(self, datetime now: datetime) → datetime
Return the start time for the aggregators next bar.
- Returns: The timestamp (UTC).
- Return type: datetime
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
The aggregators time interval.
- Returns: timedelta
The aggregators time interval.
- Returns: uint64_t
- Type: is_running
The aggregators next closing time.
- Returns: uint64_t
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop(self) → void
Stop the bar aggregator.
stop_batch_update(self) → None
class ValueBarAggregator
Bases: BarAggregator
ValueBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None]) -> None Provides a means of building value bars from ticks.
When received value reaches the step threshold of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- Raises: ValueError – If != bar_type.instrument_id.
The aggregators bar type.
- Returns: BarType
Return the current cumulative value of the aggregator.
- Return type: Decimal
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
- Type: is_running
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
class VolumeBarAggregator
Bases: BarAggregator
VolumeBarAggregator(Instrument instrument, BarType bar_type, handler: Callable[[Bar], None]) -> None Provides a means of building volume bars from ticks.
When received volume reaches the step threshold of the bar specification, then a bar is created and sent to the handler.
- Parameters:
- instrument (Instrument) – The instrument for the aggregator.
- bar_type (BarType) – The bar type for the aggregator.
- handler (Callable [ [Bar ] , None ]) – The bar handler for the aggregator.
- Raises: ValueError – If != bar_type.instrument_id.
The aggregators bar type.
- Returns: BarType
handle_bar(self, Bar bar) → void
Update the aggregator with the given bar.
- Parameters: bar (Bar) – The bar for the update.
handle_quote_tick(self, QuoteTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (QuoteTick) – The tick for the update.
handle_trade_tick(self, TradeTick tick) → void
Update the aggregator with the given tick.
- Parameters: tick (TradeTick) – The tick for the update.
- Type: is_running
set_await_partial(self, bool value)
set_partial(self, Bar partial_bar) → void
Set the initial values for a partially completed bar.
This method can only be called once per instance.
- Parameters: partial_bar (Bar) – The partial bar with values to set.
start_batch_update(self, handler: Callable[[Bar], None], uint64_t time_ns) → None
stop_batch_update(self) → None
class DataClient
Bases: Component
DataClient(ClientId client_id, MessageBus msgbus, Cache cache, Clock clock, Venue venue: Venue | None = None, config: NautilusConfig | None = None) The base class for all data clients.
- Parameters:
- client_id (ClientId) – The data client ID.
- msgbus (MessageBus) – The message bus for the client.
- clock (Clock) – The clock for the client.
- venue (Venue , optional) – The client venue. If multi-venue then can be
. - config (NautilusConfig , optional) – The configuration for the instance.
This class should not be used directly, but through a concrete subclass.
degrade(self) → void
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component
will remain in a DEGRADING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
dispose(self) → void
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component
will remain in a DISPOSING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
fault(self) → void
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component
will remain in a FAULTING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
classmethod fully_qualified_name(cls) → str
Return the fully qualified name for the components class.
- Return type: str
The components ID.
- Returns: ComponentId
If the client is connected.
- Returns: bool
Return whether the current component state is DEGRADED
- Return type: bool
- Type: Component.is_degraded
Return whether the current component state is DISPOSED
- Return type: bool
- Type: Component.is_disposed
Return whether the current component state is FAULTED
- Return type: bool
- Type: Component.is_faulted
Return whether the component has been initialized (component.state >= INITIALIZED
- Return type: bool
- Type: Component.is_initialized
Return whether the current component state is RUNNING
- Return type: bool
- Type: Component.is_running
Return whether the current component state is STOPPED
- Return type: bool
- Type: Component.is_stopped
request(self, RequestData request) → void
Request data for the given data type.
- Parameters: request (RequestData) – The message for the data request.
reset(self) → void
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component
will remain in a RESETTING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
resume(self) → void
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component
will remain in a RESUMING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
shutdown_system(self, str reason=None) → void
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters: reason (str , optional) – The reason for issuing the shutdown command.
start(self) → void
Start the component.
While executing on_start() any exception will be logged and reraised, then the component
will remain in a STARTING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
ComponentState Return the components current state.
- Return type: ComponentState
- Type: Component.state
stop(self) → void
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component
will remain in a STOPPING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
subscribe(self, SubscribeData command) → void
Subscribe to data for the given data type.
- Parameters: data_type (DataType) – The data type for the subscription.
subscribed_custom_data(self) → list
Return the custom data types subscribed to.
- Return type: list[DataType]
The trader ID associated with the component.
- Returns: TraderId
The components type.
- Returns: type
unsubscribe(self, UnsubscribeData command) → void
Unsubscribe from data for the given data type.
- Parameters: data_type (DataType) – The data type for the subscription.
The clients venue ID (if applicable).
- Returns:
Venue or
class MarketDataClient
Bases: DataClient
MarketDataClient(ClientId client_id, MessageBus msgbus, Cache cache, Clock clock, Venue venue: Venue | None = None, config: NautilusConfig | None = None) The base class for all market data clients.
- Parameters:
- client_id (ClientId) – The data client ID.
- msgbus (MessageBus) – The message bus for the client.
- cache (Cache) – The cache for the client.
- clock (Clock) – The clock for the client.
- venue (Venue , optional) – The client venue. If multi-venue then can be
. - config (NautilusConfig , optional) – The configuration for the instance.
This class should not be used directly, but through a concrete subclass.
degrade(self) → void
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component
will remain in a DEGRADING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
dispose(self) → void
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component
will remain in a DISPOSING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
fault(self) → void
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component
will remain in a FAULTING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
classmethod fully_qualified_name(cls) → str
Return the fully qualified name for the components class.
- Return type: str
The components ID.
- Returns: ComponentId
If the client is connected.
- Returns: bool
Return whether the current component state is DEGRADED
- Return type: bool
- Type: Component.is_degraded
Return whether the current component state is DISPOSED
- Return type: bool
- Type: Component.is_disposed
Return whether the current component state is FAULTED
- Return type: bool
- Type: Component.is_faulted
Return whether the component has been initialized (component.state >= INITIALIZED
- Return type: bool
- Type: Component.is_initialized
Return whether the current component state is RUNNING
- Return type: bool
- Type: Component.is_running
Return whether the current component state is STOPPED
- Return type: bool
- Type: Component.is_stopped
request(self, RequestData request) → void
Request data for the given data type.
- Parameters: request (RequestData) – The message for the data request.
request_bars(self, RequestBars request) → void
Request historical Bar data. To load historical data from a catalog, you can pass a list[DataCatalogConfig] to the TradingNodeConfig or the BacktestEngineConfig.
- Parameters: request (RequestBars) – The message for the data request.
request_instrument(self, RequestInstrument request) → void
Request Instrument data for the given instrument ID.
- Parameters: request (RequestInstrument) – The message for the data request.
request_instruments(self, RequestInstruments request) → void
Request all Instrument data for the given venue.
- Parameters: request (RequestInstruments) – The message for the data request.
request_order_book_snapshot(self, RequestOrderBookSnapshot request) → void
Request order book snapshot data.
- Parameters: request (RequestOrderBookSnapshot) – The message for the data request.
request_quote_ticks(self, RequestQuoteTicks request) → void
Request historical QuoteTick data.
- Parameters: request (RequestQuoteTicks) – The message for the data request.
request_trade_ticks(self, RequestTradeTicks request) → void
Request historical TradeTick data.
- Parameters: request (RequestTradeTicks) – The message for the data request.
reset(self) → void
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component
will remain in a RESETTING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
resume(self) → void
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component
will remain in a RESUMING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
shutdown_system(self, str reason=None) → void
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters: reason (str , optional) – The reason for issuing the shutdown command.
start(self) → void
Start the component.
While executing on_start() any exception will be logged and reraised, then the component
will remain in a STARTING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
ComponentState Return the components current state.
- Return type: ComponentState
- Type: Component.state
stop(self) → void
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component
will remain in a STOPPING
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
subscribe(self, SubscribeData command) → void
Subscribe to data for the given data type.
- Parameters:
- data_type (DataType) – The data type for the subscription.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_bars(self, SubscribeBars command) → void
Subscribe to Bar data for the given bar type.
- Parameters:
- bar_type (BarType) – The bar type to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instrument(self, SubscribeInstrument command) → void
Subscribe to the Instrument with the given instrument ID.
- Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instrument_close(self, SubscribeInstrumentClose command) → void
Subscribe to InstrumentClose updates for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instrument_status(self, SubscribeInstrumentStatus command) → void
Subscribe to InstrumentStatus data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_instruments(self, SubscribeInstruments command) → void
Subscribe to all Instrument data.
- Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_order_book_deltas(self, SubscribeOrderBook command) → void
Subscribe to OrderBookDeltas data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The order book instrument to subscribe to.
- book_type (BookType {
}) – The order book type. - depth (int , optional , default None) – The maximum depth for the subscription.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_order_book_snapshots(self, SubscribeOrderBook command) → void
Subscribe to OrderBook snapshots data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The order book instrument to subscribe to.
- book_type (BookType {
}) – The order book level. - depth (int , optional) – The maximum depth for the order book. A depth of 0 is maximum depth.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_quote_ticks(self, SubscribeQuoteTicks command) → void
Subscribe to QuoteTick data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribe_trade_ticks(self, SubscribeTradeTicks command) → void
Subscribe to TradeTick data for the given instrument ID.
- Parameters:
- instrument_id (InstrumentId) – The tick instrument to subscribe to.
- params (dict *[*str , Any ] , optional) – Additional params for the subscription.
subscribed_bars(self) → list
Return the bar types subscribed to.
- Return type: list[BarType]
subscribed_custom_data(self) → list
Return the custom data types subscribed to.
- Return type: list[DataType]
subscribed_instrument_close(self) → list
Return the instrument closes subscribed to.
- Return type: list[InstrumentId]
subscribed_instrument_status(self) → list
Return the status update instruments subscribed to.
- Return type: list[InstrumentId]