Live¶
The live subpackage groups all engine and client implementations for live trading.
Generally a common event loop is passed into each live engine to support the overarching design of a single efficient event loop, by default uvloop.
The LiveDataClient class is responsible for interfacing with a particular API which may be presented directly by a venue, or through a broker intermediary.
It could also be possible to write clients for specialized data providers.
- class LiveDataClient¶
Bases:
DataClientThe base class for all live data clients.
- Parameters:
loop (asyncio.AbstractEventLoop) – The event loop for the client.
client_id (ClientId) – The client ID.
venue (Venue or
None) – The client venue. If multi-venue then can beNone.msgbus (MessageBus) – The message bus for the client.
cache (Cache) – The cache for the client.
clock (LiveClock) – The clock for the client.
config (NautilusConfig, optional) – The configuration for the instance.
Warning
This class should not be used directly, but through a concrete subclass.
- async run_after_delay(delay: float, coro: Coroutine) None¶
Run the given coroutine after a delay.
- Parameters:
delay (float) – The delay (seconds) before running the coroutine.
coro (Coroutine) – The coroutine to run after the initial delay.
- create_task(coro: Coroutine, log_msg: str | None = None, actions: Callable | None = None, success_msg: str | None = None, success_color: LogColor = <LogColor.NORMAL: 0>) Task¶
Run the given coroutine with error handling and optional callback actions when done.
- Parameters:
coro (Coroutine) – The coroutine to run.
log_msg (str, optional) – The log message for the task.
actions (Callable, optional) – The actions callback to run when the coroutine is done.
success_msg (str, optional) – The log message to write on actions success.
success_color (LogColor, default
NORMAL) – The log message color for actions success.
- Return type:
asyncio.Task
- connect() None¶
Connect the client.
- disconnect() None¶
Disconnect the client.
- subscribe(self, SubscribeData command) void¶
Subscribe to data for the given data type.
- Parameters:
data_type (DataType) – The data type for the subscription.
- unsubscribe(self, UnsubscribeData command) void¶
Unsubscribe from data for the given data type.
- Parameters:
data_type (DataType) – The data type for the subscription.
- request(self, RequestData request) void¶
Request data for the given data type.
- Parameters:
request (RequestData) – The message for the data request.
- async cancel_pending_tasks(timeout_secs: float = 5.0) None¶
Cancel all pending tasks and await their cancellation.
- Parameters:
timeout_secs (float, default 5.0) – The timeout in seconds to wait for tasks to cancel.
- degrade(self) void¶
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component will remain in a
DEGRADINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- dispose(self) void¶
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component will remain in a
DISPOSINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- fault(self) void¶
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component will remain in a
FAULTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- classmethod fully_qualified_name(cls) str¶
Return the fully qualified name for the components class.
- Return type:
str
References
- id¶
The components ID.
- Returns:
ComponentId
- is_connected¶
If the client is connected.
- Returns:
bool
- is_degraded¶
bool
Return whether the current component state is
DEGRADED.- Return type:
bool
- Type:
- is_disposed¶
bool
Return whether the current component state is
DISPOSED.- Return type:
bool
- Type:
- is_faulted¶
bool
Return whether the current component state is
FAULTED.- Return type:
bool
- Type:
- is_initialized¶
bool
Return whether the component has been initialized (component.state >=
INITIALIZED).- Return type:
bool
- Type:
- is_running¶
bool
Return whether the current component state is
RUNNING.- Return type:
bool
- Type:
- is_stopped¶
bool
Return whether the current component state is
STOPPED.- Return type:
bool
- Type:
- reset(self) void¶
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component will remain in a
RESETTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- resume(self) void¶
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component will remain in a
RESUMINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- shutdown_system(self, str reason=None) void¶
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters:
reason (str, optional) – The reason for issuing the shutdown command.
- start(self) void¶
Start the component.
While executing on_start() any exception will be logged and reraised, then the component will remain in a
STARTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- state¶
ComponentState
Return the components current state.
- Return type:
ComponentState
- Type:
- stop(self) void¶
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component will remain in a
STOPPINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- subscribed_custom_data(self) list¶
Return the custom data types subscribed to.
- Return type:
list[DataType]
- trader_id¶
The trader ID associated with the component.
- Returns:
TraderId
- type¶
The components type.
- Returns:
type
- venue¶
The clients venue ID (if applicable).
- Returns:
Venue or
None
- class LiveMarketDataClient¶
Bases:
MarketDataClientThe base class for all live data clients.
- Parameters:
loop (asyncio.AbstractEventLoop) – The event loop for the client.
client_id (ClientId) – The client ID.
venue (Venue or
None) – The client venue. If multi-venue then can beNone.msgbus (MessageBus) – The message bus for the client.
cache (Cache) – The cache for the client.
clock (LiveClock) – The clock for the client.
instrument_provider (InstrumentProvider) – The instrument provider for the client.
config (NautilusConfig, optional) – The configuration for the instance.
Warning
This class should not be used directly, but through a concrete subclass.
- async run_after_delay(delay: float, coro: Coroutine) None¶
Run the given coroutine after a delay.
- Parameters:
delay (float) – The delay (seconds) before running the coroutine.
coro (Coroutine) – The coroutine to run after the initial delay.
- create_task(coro: Coroutine, log_msg: str | None = None, actions: Callable | None = None, success_msg: str | None = None, success_color: LogColor = <LogColor.NORMAL: 0>) Task | None¶
Run the given coroutine with error handling and optional callback actions when done.
- Parameters:
coro (Coroutine) – The coroutine to run.
log_msg (str, optional) – The log message for the task.
actions (Callable, optional) – The actions callback to run when the coroutine is done.
success_msg (str, optional) – The log message to write on actions success.
success_color (LogColor, default
NORMAL) – The log message color for actions success.
- Return type:
asyncio.Task
- connect() None¶
Connect the client.
- disconnect() None¶
Disconnect the client.
- subscribe(self, SubscribeData command) void¶
Subscribe to data for the given data type.
- Parameters:
data_type (DataType) – The data type for the subscription.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_instruments(self, SubscribeInstruments command) void¶
Subscribe to all Instrument data.
- Parameters:
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_instrument(self, SubscribeInstrument command) void¶
Subscribe to the Instrument with the given instrument ID.
- Parameters:
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_order_book_deltas(self, SubscribeOrderBook command) void¶
Subscribe to OrderBookDeltas data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The order book instrument to subscribe to.
book_type (BookType {
L1_MBP,L2_MBP,L3_MBO}) – The order book type.depth (int, optional, default None) – The maximum depth for the subscription.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_order_book_depth(self, SubscribeOrderBook command) void¶
Subscribe to OrderBookDepth10 data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The order book instrument to subscribe to.
depth (int, optional) – The maximum depth for the order book (defaults to 10).
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_quote_ticks(self, SubscribeQuoteTicks command) void¶
Subscribe to QuoteTick data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The tick instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_trade_ticks(self, SubscribeTradeTicks command) void¶
Subscribe to TradeTick data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The tick instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_mark_prices(self, SubscribeMarkPrices command) void¶
Subscribe to MarkPriceUpdate data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_index_prices(self, SubscribeIndexPrices command) void¶
Subscribe to IndexPriceUpdate data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_funding_rates(self, SubscribeFundingRates command) void¶
Subscribe to FundingRateUpdate data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_bars(self, SubscribeBars command) void¶
Subscribe to Bar data for the given bar type.
- Parameters:
bar_type (BarType) – The bar type to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_instrument_status(self, SubscribeInstrumentStatus command) void¶
Subscribe to InstrumentStatus data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The tick instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- subscribe_instrument_close(self, SubscribeInstrumentClose command) void¶
Subscribe to InstrumentClose updates for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The tick instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe(self, UnsubscribeData command) void¶
Unsubscribe from data for the given data type.
- Parameters:
data_type (DataType) – The data type for the subscription.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_instruments(self, UnsubscribeInstruments command) void¶
Unsubscribe from all Instrument data.
- Parameters:
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_instrument(self, UnsubscribeInstrument command) void¶
Unsubscribe from Instrument data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_order_book_deltas(self, UnsubscribeOrderBook command) void¶
Unsubscribe from OrderBookDeltas data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The order book instrument to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_order_book_depth(self, UnsubscribeOrderBook command) void¶
Unsubscribe from OrderBookDepth10 data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The order book instrument to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_quote_ticks(self, UnsubscribeQuoteTicks command) void¶
Unsubscribe from QuoteTick data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_trade_ticks(self, UnsubscribeTradeTicks command) void¶
Unsubscribe from TradeTick data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_mark_prices(self, UnsubscribeMarkPrices command) void¶
Unsubscribe from MarkPriceUpdate data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_index_prices(self, UnsubscribeIndexPrices command) void¶
Unsubscribe from IndexPriceUpdate data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_funding_rates(self, UnsubscribeFundingRates command) void¶
Unsubscribe from FundingRateUpdate data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument to subscribe to.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_bars(self, UnsubscribeBars command) void¶
Unsubscribe from Bar data for the given bar type.
- Parameters:
bar_type (BarType) – The bar type to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_instrument_status(self, UnsubscribeInstrumentStatus command) void¶
Unsubscribe from InstrumentStatus data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument status updates to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- unsubscribe_instrument_close(self, UnsubscribeInstrumentClose command) void¶
Unsubscribe from InstrumentClose data for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
params (dict[str, Any], optional) – Additional params for the subscription.
- request(self, RequestData request) void¶
Request data for the given data type.
- Parameters:
request (RequestData) – The message for the data request.
- request_instrument(self, RequestInstrument request) void¶
Request Instrument data for the given instrument ID.
- Parameters:
request (RequestInstrument) – The message for the data request.
- request_instruments(self, RequestInstruments request) void¶
Request all Instrument data for the given venue.
- Parameters:
request (RequestInstruments) – The message for the data request.
- request_order_book_deltas(self, RequestOrderBookDeltas request) void¶
Request historical OrderBookDeltas data.
- Parameters:
request (RequestOrderBookDeltas) – The message for the data request.
- request_order_book_depth(request: RequestOrderBookDepth) None¶
- request_order_book_snapshot(self, RequestOrderBookSnapshot request) void¶
Request order book snapshot data.
- Parameters:
request (RequestOrderBookSnapshot) – The message for the data request.
- request_quote_ticks(self, RequestQuoteTicks request) void¶
Request historical QuoteTick data.
- Parameters:
request (RequestQuoteTicks) – The message for the data request.
- request_trade_ticks(self, RequestTradeTicks request) void¶
Request historical TradeTick data.
- Parameters:
request (RequestTradeTicks) – The message for the data request.
- request_funding_rates(self, RequestFundingRates request) void¶
Request historical FundingRateUpdate data.
- Parameters:
request (RequestFundingRates) – The message for the data request.
- request_bars(self, RequestBars request) void¶
Request historical Bar data. To load historical data from a catalog, you can pass a list[DataCatalogConfig] to the TradingNodeConfig or the BacktestEngineConfig.
- Parameters:
request (RequestBars) – The message for the data request.
- async cancel_pending_tasks(timeout_secs: float = 5.0) None¶
Cancel all pending tasks and await their cancellation.
- Parameters:
timeout_secs (float, default 5.0) – The timeout in seconds to wait for tasks to cancel.
- degrade(self) void¶
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component will remain in a
DEGRADINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- dispose(self) void¶
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component will remain in a
DISPOSINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- fault(self) void¶
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component will remain in a
FAULTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- classmethod fully_qualified_name(cls) str¶
Return the fully qualified name for the components class.
- Return type:
str
References
- id¶
The components ID.
- Returns:
ComponentId
- is_connected¶
If the client is connected.
- Returns:
bool
- is_degraded¶
bool
Return whether the current component state is
DEGRADED.- Return type:
bool
- Type:
- is_disposed¶
bool
Return whether the current component state is
DISPOSED.- Return type:
bool
- Type:
- is_faulted¶
bool
Return whether the current component state is
FAULTED.- Return type:
bool
- Type:
- is_initialized¶
bool
Return whether the component has been initialized (component.state >=
INITIALIZED).- Return type:
bool
- Type:
- is_running¶
bool
Return whether the current component state is
RUNNING.- Return type:
bool
- Type:
- is_stopped¶
bool
Return whether the current component state is
STOPPED.- Return type:
bool
- Type:
- reset(self) void¶
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component will remain in a
RESETTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- resume(self) void¶
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component will remain in a
RESUMINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- shutdown_system(self, str reason=None) void¶
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters:
reason (str, optional) – The reason for issuing the shutdown command.
- start(self) void¶
Start the component.
While executing on_start() any exception will be logged and reraised, then the component will remain in a
STARTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- state¶
ComponentState
Return the components current state.
- Return type:
ComponentState
- Type:
- stop(self) void¶
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component will remain in a
STOPPINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- subscribed_custom_data(self) list¶
Return the custom data types subscribed to.
- Return type:
list[DataType]
- subscribed_funding_rates(self) list¶
Return the funding rate update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_index_prices(self) list¶
Return the index price update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_instrument_close(self) list¶
Return the instrument closes subscribed to.
- Return type:
list[InstrumentId]
- subscribed_instrument_status(self) list¶
Return the status update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_instruments(self) list¶
Return the instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_mark_prices(self) list¶
Return the mark price update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_order_book_deltas(self) list¶
Return the order book delta instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_order_book_depth(self) list¶
Return the order book depth instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_quote_ticks(self) list¶
Return the quote tick instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_trade_ticks(self) list¶
Return the trade tick instruments subscribed to.
- Return type:
list[InstrumentId]
- trader_id¶
The trader ID associated with the component.
- Returns:
TraderId
- type¶
The components type.
- Returns:
type
- venue¶
The clients venue ID (if applicable).
- Returns:
Venue or
None
- class LiveDataEngine¶
Bases:
DataEngineProvides a high-performance asynchronous live data engine.
- Parameters:
loop (asyncio.AbstractEventLoop) – The event loop for the engine.
msgbus (MessageBus) – The message bus for the engine.
cache (Cache) – The cache for the engine.
clock (LiveClock) – The clock for the engine.
config (LiveDataEngineConfig, optional) – The configuration for the instance.
- Raises:
TypeError – If config is not of type LiveDataEngineConfig.
- connect() None¶
Connect the engine by calling connect on all registered clients.
- disconnect() None¶
Disconnect the engine by calling disconnect on all registered clients.
- get_cmd_queue_task() Task | None¶
Return the internal command queue task for the engine.
- Return type:
asyncio.Task or
None
- get_req_queue_task() Task | None¶
Return the internal request queue task for the engine.
- Return type:
asyncio.Task or
None
- get_res_queue_task() Task | None¶
Return the internal response queue task for the engine.
- Return type:
asyncio.Task or
None
- get_data_queue_task() Task | None¶
Return the internal data queue task for the engine.
- Return type:
asyncio.Task or
None
- cmd_qsize() int¶
Return the number of DataCommand objects buffered on the internal queue.
- Return type:
int
- req_qsize() int¶
Return the number of RequestData objects buffered on the internal queue.
- Return type:
int
- res_qsize() int¶
Return the number of DataResponse objects buffered on the internal queue.
- Return type:
int
- data_qsize() int¶
Return the number of Data objects buffered on the internal queue.
- Return type:
int
- kill() None¶
Kill the engine by abruptly canceling the queue tasks and calling stop.
- execute(command: DataCommand) None¶
Execute the given data command.
If the internal queue is at or near capacity, it logs a warning (throttled) and schedules an asynchronous put() operation. This ensures all messages are eventually enqueued and processed without blocking the caller when the queue is full.
- Parameters:
command (DataCommand) – The command to execute.
- request(request: RequestData) None¶
Handle the given request.
If the internal queue is at or near capacity, it logs a warning (throttled) and schedules an asynchronous put() operation. This ensures all messages are eventually enqueued and processed without blocking the caller when the queue is full.
- Parameters:
request (RequestData) – The request to handle.
- response(response: DataResponse) None¶
Handle the given response.
If the internal queue is at or near capacity, it logs a warning (throttled) and schedules an asynchronous put() operation. This ensures all messages are eventually enqueued and processed without blocking the caller when the queue is full.
- Parameters:
response (DataResponse) – The response to handle.
- process(data: Data) None¶
Process the given data message.
If the internal queue is at or near capacity, it logs a warning (throttled) and schedules an asynchronous put() operation. This ensures all messages are eventually enqueued and processed without blocking the caller when the queue is full.
- Parameters:
data (Data) – The data to process.
Warning
This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.
- check_connected(self) bool¶
Check all of the engines clients are connected.
- Returns:
True if all clients connected, else False.
- Return type:
bool
- check_disconnected(self) bool¶
Check all of the engines clients are disconnected.
- Returns:
True if all clients disconnected, else False.
- Return type:
bool
- command_count¶
The total count of data commands received by the engine.
- Returns:
int
- data_count¶
The total count of data stream objects received by the engine.
- Returns:
int
- debug¶
If debug mode is active (will provide extra debug logging).
- Returns:
bool
- default_client¶
ClientId | None
Return the default data client registered with the engine.
- Return type:
ClientId or
None- Type:
- degrade(self) void¶
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component will remain in a
DEGRADINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- deregister_client(self, DataClient client) void¶
Deregister the given data client from the data engine.
- Parameters:
client (DataClient) – The data client to deregister.
- dispose(self) void¶
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component will remain in a
DISPOSINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- fault(self) void¶
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component will remain in a
FAULTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- classmethod fully_qualified_name(cls) str¶
Return the fully qualified name for the components class.
- Return type:
str
References
- get_external_client_ids(self) set¶
Returns the configured external client order IDs.
- Return type:
set[ClientId]
- id¶
The components ID.
- Returns:
ComponentId
- is_degraded¶
bool
Return whether the current component state is
DEGRADED.- Return type:
bool
- Type:
- is_disposed¶
bool
Return whether the current component state is
DISPOSED.- Return type:
bool
- Type:
- is_faulted¶
bool
Return whether the current component state is
FAULTED.- Return type:
bool
- Type:
- is_initialized¶
bool
Return whether the component has been initialized (component.state >=
INITIALIZED).- Return type:
bool
- Type:
- is_running¶
bool
Return whether the current component state is
RUNNING.- Return type:
bool
- Type:
- is_stopped¶
bool
Return whether the current component state is
STOPPED.- Return type:
bool
- Type:
- process_historical(self, Data data) void¶
Process historical data.
- Parameters:
data (Data) – The historical data to process.
- register_catalog(self, catalog: ParquetDataCatalog, str name: str = 'catalog_0') None¶
Register the given data catalog with the engine.
- Parameters:
catalog (ParquetDataCatalog) – The data catalog to register.
name (str, default 'catalog_0') – The name of the catalog to register.
- register_client(self, DataClient client) void¶
Register the given data client with the data engine.
- Parameters:
client (DataClient) – The client to register.
- Raises:
ValueError – If client is already registered.
- register_default_client(self, DataClient client) void¶
Register the given client as the default routing client (when a specific venue routing cannot be found).
Any existing default routing client will be overwritten.
- Parameters:
client (DataClient) – The client to register.
- register_venue_routing(self, DataClient client, Venue venue) void¶
Register the given client to route messages to the given venue.
Any existing client in the routing map for the given venue will be overwritten.
- Parameters:
venue (Venue) – The venue to route messages to.
client (DataClient) – The client for the venue routing.
- registered_clients¶
list[ClientId]
Return the execution clients registered with the engine.
- Return type:
list[ClientId]
- Type:
- request_count¶
The total count of data requests received by the engine.
- Returns:
int
- reset(self) void¶
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component will remain in a
RESETTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- response_count¶
The total count of data responses received by the engine.
- Returns:
int
- resume(self) void¶
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component will remain in a
RESUMINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- routing_map¶
dict[Venue, DataClient]
Return the default data client registered with the engine.
- Return type:
ClientId or
None- Type:
- shutdown_system(self, str reason=None) void¶
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters:
reason (str, optional) – The reason for issuing the shutdown command.
- start(self) void¶
Start the component.
While executing on_start() any exception will be logged and reraised, then the component will remain in a
STARTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- state¶
ComponentState
Return the components current state.
- Return type:
ComponentState
- Type:
- stop(self) void¶
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component will remain in a
STOPPINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- stop_clients(self) void¶
Stop the registered clients.
- subscribed_custom_data(self) list¶
Return the custom data types subscribed to.
- Return type:
list[DataType]
- subscribed_funding_rates(self) list¶
Return the funding rate update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_index_prices(self) list¶
Return the index price update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_instrument_close(self) list¶
Return the close price instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_instrument_status(self) list¶
Return the status update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_instruments(self) list¶
Return the instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_mark_prices(self) list¶
Return the mark price update instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_order_book_deltas(self) list¶
Return the order book delta instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_order_book_depth(self) list¶
Return the order book depth instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_quote_ticks(self) list¶
Return the quote tick instruments subscribed to.
- Return type:
list[InstrumentId]
- subscribed_synthetic_quotes(self) list¶
Return the synthetic instrument quotes subscribed to.
- Return type:
list[InstrumentId]
- subscribed_synthetic_trades(self) list¶
Return the synthetic instrument trades subscribed to.
- Return type:
list[InstrumentId]
- subscribed_trade_ticks(self) list¶
Return the trade tick instruments subscribed to.
- Return type:
list[InstrumentId]
- trader_id¶
The trader ID associated with the component.
- Returns:
TraderId
- type¶
The components type.
- Returns:
type
The LiveExecutionClient class is responsible for interfacing with a particular API which may be presented directly by a venue, or through a broker intermediary.
- class LiveExecutionClient¶
Bases:
ExecutionClientThe base class for all live execution clients.
- Parameters:
loop (asyncio.AbstractEventLoop) – The event loop for the client.
client_id (ClientId) – The client ID.
venue (Venue or
None) – The client venue. If multi-venue then can beNone.instrument_provider (InstrumentProvider) – The instrument provider for the client.
account_type (AccountType) – The account type for the client.
base_currency (Currency, optional) – The account base currency for the client. Use
Nonefor multi-currency accounts.msgbus (MessageBus) – The message bus for the client.
cache (Cache) – The cache for the client.
clock (LiveClock) – The clock for the client.
config (NautilusConfig, optional) – The configuration for the instance.
- Raises:
ValueError – If oms_type is
UNSPECIFIED(must be specified).
Warning
This class should not be used directly, but through a concrete subclass.
- async run_after_delay(delay: float, coro: Coroutine) None¶
Run the given coroutine after a delay.
- Parameters:
delay (float) – The delay (seconds) before running the coroutine.
coro (Coroutine) – The coroutine to run after the initial delay.
- create_task(coro: Coroutine, log_msg: str | None = None, actions: Callable | None = None, success_msg: str | None = None, success_color: LogColor = <LogColor.NORMAL: 0>) Task¶
Run the given coroutine with error handling and optional callback actions when done.
- Parameters:
coro (Coroutine) – The coroutine to run.
log_msg (str, optional) – The log message for the task.
actions (Callable, optional) – The actions callback to run when the coroutine is done.
success_msg (str, optional) – The log message to write on actions success.
success_color (str, default
NORMAL) – The log message color for actions success.
- Return type:
asyncio.Task
- connect() None¶
Connect the client.
- disconnect() None¶
Disconnect the client.
- async cancel_pending_tasks(timeout_secs: float = 5.0) None¶
Cancel all pending tasks and await their cancellation.
- Parameters:
timeout_secs (float, default 5.0) – The timeout in seconds to wait for tasks to cancel.
- submit_order(self, SubmitOrder command) void¶
Submit the order contained in the given command for execution.
- Parameters:
command (SubmitOrder) – The command to execute.
- submit_order_list(self, SubmitOrderList command) void¶
Submit the order list contained in the given command for execution.
- Parameters:
command (SubmitOrderList) – The command to execute.
- modify_order(self, ModifyOrder command) void¶
Modify the order with parameters contained in the command.
- Parameters:
command (ModifyOrder) – The command to execute.
- cancel_order(self, CancelOrder command) void¶
Cancel the order with the client order ID contained in the given command.
- Parameters:
command (CancelOrder) – The command to execute.
- cancel_all_orders(self, CancelAllOrders command) void¶
Cancel all orders for the instrument ID contained in the given command.
- Parameters:
command (CancelAllOrders) – The command to execute.
- batch_cancel_orders(self, BatchCancelOrders command) void¶
Batch cancel orders for the instrument ID contained in the given command.
- Parameters:
command (BatchCancelOrders) – The command to execute.
- query_account(self, QueryAccount command) void¶
Query the account specified by the command which will generate an AccountState event.
- Parameters:
command (QueryAccount) – The command to execute.
- query_order(self, QueryOrder command) void¶
Initiate a reconciliation for the queried order which will generate an OrderStatusReport.
- Parameters:
command (QueryOrder) – The command to execute.
- async generate_order_status_report(command: GenerateOrderStatusReport) OrderStatusReport | None¶
Generate an OrderStatusReport for the given order identifier parameter(s).
If the order is not found, or an error occurs, then logs and returns
None.- Parameters:
command (GenerateOrderStatusReport) – The command to generate the report.
- Return type:
OrderStatusReport or
None- Raises:
ValueError – If both the client_order_id and venue_order_id are
None.
- async generate_order_status_reports(command: GenerateOrderStatusReports) list[OrderStatusReport]¶
Generate a list of `OrderStatusReport`s with optional query filters.
The returned list may be empty if no orders match the given parameters.
- Parameters:
command (GenerateOrderStatusReports) – The command for generating the reports.
- Return type:
list[OrderStatusReport]
- async generate_fill_reports(command: GenerateFillReports) list[FillReport]¶
Generate a list of `FillReport`s with optional query filters.
The returned list may be empty if no trades match the given parameters.
- Parameters:
command (GenerateFillReports) – The command for generating the reports.
- Return type:
list[FillReport]
- async generate_position_status_reports(command: GeneratePositionStatusReports) list[PositionStatusReport]¶
Generate a list of `PositionStatusReport`s with optional query filters.
The returned list may be empty if no positions match the given parameters.
- Parameters:
command (GeneratePositionStatusReports) – The command for generating the position status reports.
- Return type:
list[PositionStatusReport]
- async generate_mass_status(lookback_mins: int | None = None) ExecutionMassStatus | None¶
Generate an ExecutionMassStatus report.
- Parameters:
lookback_mins (int, optional) – The maximum lookback for querying closed orders, trades and positions.
- Return type:
ExecutionMassStatus or
None
- account_id¶
The clients account ID.
- Returns:
AccountId or
None
- account_type¶
The clients account type.
- Returns:
AccountType
- base_currency¶
The clients account base currency (None for multi-currency accounts).
- Returns:
Currency or
None
- degrade(self) void¶
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component will remain in a
DEGRADINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- dispose(self) void¶
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component will remain in a
DISPOSINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- fault(self) void¶
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component will remain in a
FAULTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- classmethod fully_qualified_name(cls) str¶
Return the fully qualified name for the components class.
- Return type:
str
References
- generate_account_state(self, list balances, list margins, bool reported, uint64_t ts_event, dict info=None) void¶
Generate an AccountState event and publish on the message bus.
- Parameters:
balances (list[AccountBalance]) – The account balances.
margins (list[MarginBalance]) – The margin balances.
reported (bool) – If the balances are reported directly from the exchange.
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the account state event occurred.
info (dict [str, object]) – The additional implementation specific account information.
- generate_order_accepted(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) void¶
Generate an OrderAccepted event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order accepted event occurred.
- generate_order_cancel_rejected(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, str reason, uint64_t ts_event) void¶
Generate an OrderCancelRejected event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
reason (str) – The order cancel rejected reason.
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order cancel rejected event occurred.
- generate_order_canceled(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) void¶
Generate an OrderCanceled event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when order canceled event occurred.
- generate_order_denied(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, str reason, uint64_t ts_event) void¶
Generate an OrderDenied event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
reason (str) – The order denied reason.
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order denied event occurred.
- generate_order_expired(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) void¶
Generate an OrderExpired event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order expired event occurred.
- generate_order_filled(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, PositionId venue_position_id: PositionId | None, TradeId trade_id, OrderSide order_side, OrderType order_type, Quantity last_qty, Price last_px, Currency quote_currency, Money commission, LiquiditySide liquidity_side, uint64_t ts_event, dict info=None) void¶
Generate an OrderFilled event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
trade_id (TradeId) – The trade ID.
venue_position_id (PositionId or
None) – The venue position ID associated with the order. If the trading venue has assigned a position ID / ticket then pass that here, otherwise passNoneand the execution engine OMS will handle position ID resolution.order_side (OrderSide {
BUY,SELL}) – The execution order side.order_type (OrderType) – The execution order type.
last_qty (Quantity) – The fill quantity for this execution.
last_px (Price) – The fill price for this execution (not average price).
quote_currency (Currency) – The currency of the price.
commission (Money) – The fill commission.
liquidity_side (LiquiditySide {
NO_LIQUIDITY_SIDE,MAKER,TAKER}) – The execution liquidity side.ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order filled event occurred.
info (dict[str, object], optional) – The additional fill information.
- generate_order_modify_rejected(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, str reason, uint64_t ts_event) void¶
Generate an OrderModifyRejected event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
reason (str) – The order update rejected reason.
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order update rejection event occurred.
- generate_order_rejected(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, str reason, uint64_t ts_event, bool due_post_only=False) void¶
Generate an OrderRejected event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
reason (datetime) – The order rejected reason.
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order rejected event occurred.
due_post_only (bool, default False) – If the order was rejected because it was post-only and would execute immediately as a taker.
- generate_order_submitted(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, uint64_t ts_event) void¶
Generate an OrderSubmitted event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order submitted event occurred.
- generate_order_triggered(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) void¶
Generate an OrderTriggered event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order triggered event occurred.
- generate_order_updated(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, Quantity quantity, Price price, Price trigger_price, uint64_t ts_event, bool venue_order_id_modified=False) void¶
Generate an OrderUpdated event and send it to the ExecutionEngine.
- Parameters:
strategy_id (StrategyId) – The strategy ID associated with the event.
instrument_id (InstrumentId) – The instrument ID.
client_order_id (ClientOrderId) – The client order ID.
venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
quantity (Quantity) – The orders current quantity.
price (Price) – The orders current price.
trigger_price (Price or
None) – The orders current trigger price.ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order update event occurred.
venue_order_id_modified (bool) – If the ID was modified for this event.
- get_account(self) Account¶
Return the account for the client (if registered).
- Return type:
Account or
None
- id¶
The components ID.
- Returns:
ComponentId
- is_connected¶
If the client is connected.
- Returns:
bool
- is_degraded¶
bool
Return whether the current component state is
DEGRADED.- Return type:
bool
- Type:
- is_disposed¶
bool
Return whether the current component state is
DISPOSED.- Return type:
bool
- Type:
- is_faulted¶
bool
Return whether the current component state is
FAULTED.- Return type:
bool
- Type:
- is_initialized¶
bool
Return whether the component has been initialized (component.state >=
INITIALIZED).- Return type:
bool
- Type:
- is_running¶
bool
Return whether the current component state is
RUNNING.- Return type:
bool
- Type:
- is_stopped¶
bool
Return whether the current component state is
STOPPED.- Return type:
bool
- Type:
- oms_type¶
The venues order management system type.
- Returns:
OmsType
- reset(self) void¶
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component will remain in a
RESETTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- resume(self) void¶
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component will remain in a
RESUMINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- shutdown_system(self, str reason=None) void¶
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters:
reason (str, optional) – The reason for issuing the shutdown command.
- start(self) void¶
Start the component.
While executing on_start() any exception will be logged and reraised, then the component will remain in a
STARTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- state¶
ComponentState
Return the components current state.
- Return type:
ComponentState
- Type:
- stop(self) void¶
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component will remain in a
STOPPINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- trader_id¶
The trader ID associated with the component.
- Returns:
TraderId
- type¶
The components type.
- Returns:
type
- venue¶
The clients venue ID (if not a routing client).
- Returns:
Venue or
None
- class LiveExecutionEngine¶
Bases:
ExecutionEngineProvides a high-performance asynchronous live execution engine.
- Parameters:
loop (asyncio.AbstractEventLoop) – The event loop for the engine.
msgbus (MessageBus) – The message bus for the engine.
cache (Cache) – The cache for the engine.
clock (LiveClock) – The clock for the engine.
config (LiveExecEngineConfig, optional) – The configuration for the instance.
- Raises:
TypeError – If config is not of type LiveExecEngineConfig.
- property reconciliation: bool¶
Return whether the reconciliation process will be run on start.
- Return type:
bool
- connect() None¶
Connect the engine by calling connect on all registered clients.
- disconnect() None¶
Disconnect the engine by calling disconnect on all registered clients.
- get_cmd_queue_task() Task | None¶
Return the internal command queue task for the engine.
- Return type:
asyncio.Task or
None
- get_evt_queue_task() Task | None¶
Return the internal event queue task for the engine.
- Return type:
asyncio.Task or
None
- get_own_books_audit_task() Task | None¶
Return the own books audit task for the engine.
- Return type:
asyncio.Task or
None
- get_reconciliation_task() Task | None¶
Return the continuous reconciliation task for the engine.
- Return type:
asyncio.Task or
None
- cmd_qsize() int¶
Return the number of Command messages buffered on the internal queue.
- Return type:
int
- evt_qsize() int¶
Return the number of Event messages buffered on the internal queue.
- Return type:
int
- kill() None¶
Kill the engine by abruptly canceling the queue task and calling stop.
- execute(command: Command) None¶
Execute the given command.
If the internal queue is already full then will log a warning and block until queue size reduces.
- Parameters:
command (Command) – The command to execute.
- process(event: OrderEvent) None¶
Process the given event message.
If the internal queue is at or near capacity, it logs a warning (throttled) and schedules an asynchronous put() operation. This ensures all messages are eventually enqueued and processed without blocking the caller when the queue is full.
- Parameters:
event (OrderEvent) – The event to process.
- generate_execution_mass_status(command: GenerateExecutionMassStatus) None¶
Handle request to generate execution mass status, triggering startup reconciliation.
- async reconcile_execution_state(timeout_secs: float = 10.0) bool¶
Reconcile execution state as main entry point for startup reconciliation, coordinating reconciliation across all execution clients.
- reconcile_execution_report(report: ExecutionReport) bool¶
Reconcile a single execution report received at runtime, routing to appropriate reconciliation method based on report type.
- reconcile_execution_mass_status(report: ExecutionMassStatus) None¶
Entry point for mass status reconciliation.
- allow_overfills¶
If order fills exceeding order quantity are allowed (logs warning instead of raising).
- Returns:
bool
- check_connected(self) bool¶
Check all of the engines clients are connected.
- Returns:
True if all clients connected, else False.
- Return type:
bool
- check_disconnected(self) bool¶
Check all of the engines clients are disconnected.
- Returns:
True if all clients disconnected, else False.
- Return type:
bool
- check_integrity(self) bool¶
Check integrity of data within the cache and clients.
- Returns:
True if checks pass, else False.
- Return type:
bool
- check_residuals(self) bool¶
Check for any residual open state and log warnings if found.
‘Open state’ is considered to be open orders and open positions.
- Returns:
True if residuals exist, else False.
- Return type:
bool
- command_count¶
The total count of commands received by the engine.
- Returns:
int
- convert_quote_qty_to_base¶
If quote-denominated order quantities should be converted to base units before submission.
- Returns:
bool
- debug¶
If debug mode is active (will provide extra debug logging).
- Returns:
bool
- default_client¶
ClientId | None
Return the default execution client registered with the engine.
- Return type:
ClientId or
None- Type:
- degrade(self) void¶
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component will remain in a
DEGRADINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- deregister_client(self, ExecutionClient client) void¶
Deregister the given execution client from the execution engine.
- Parameters:
client (ExecutionClient) – The execution client to deregister.
- Raises:
ValueError – If client is not registered with the execution engine.
- dispose(self) void¶
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component will remain in a
DISPOSINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- event_count¶
The total count of events received by the engine.
- Returns:
int
- fault(self) void¶
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component will remain in a
FAULTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- flush_db(self) void¶
Flush the execution database which permanently removes all persisted data.
Warning
Permanent data loss.
- classmethod fully_qualified_name(cls) str¶
Return the fully qualified name for the components class.
- Return type:
str
References
- get_clients_for_orders(self, list orders) set¶
Get all execution clients corresponding to the given orders.
- Parameters:
orders (list[Order]) – The orders to locate associated execution clients for.
- Return type:
set[ExecutionClient]
- get_external_client_ids(self) set¶
Returns the configured external client order IDs.
- Return type:
set[ClientId]
- get_external_order_claim(self, InstrumentId instrument_id) StrategyId¶
Get any external order claim for the given instrument ID.
- Parameters:
instrument_id (InstrumentId) – The instrument ID for the claim.
- Return type:
StrategyId or
None
- get_external_order_claims_instruments(self) set¶
Get all instrument IDs registered for external order claims.
- Return type:
set[InstrumentId]
- id¶
The components ID.
- Returns:
ComponentId
- is_degraded¶
bool
Return whether the current component state is
DEGRADED.- Return type:
bool
- Type:
- is_disposed¶
bool
Return whether the current component state is
DISPOSED.- Return type:
bool
- Type:
- is_faulted¶
bool
Return whether the current component state is
FAULTED.- Return type:
bool
- Type:
- is_initialized¶
bool
Return whether the component has been initialized (component.state >=
INITIALIZED).- Return type:
bool
- Type:
- is_running¶
bool
Return whether the current component state is
RUNNING.- Return type:
bool
- Type:
- is_stopped¶
bool
Return whether the current component state is
STOPPED.- Return type:
bool
- Type:
- load_cache(self) void¶
Load the cache up from the execution database.
- manage_own_order_books¶
If the execution engine should maintain own order books based on commands and events.
- Returns:
bool
- position_id_count(self, StrategyId strategy_id) int¶
The position ID count for the given strategy ID.
- Parameters:
strategy_id (StrategyId) – The strategy ID for the position count.
- Return type:
int
- register_client(self, ExecutionClient client) void¶
Register the given execution client with the execution engine.
If the client.venue is
Noneand a default routing client has not been previously registered then will be registered as such.- Parameters:
client (ExecutionClient) – The execution client to register.
- Raises:
ValueError – If client is already registered with the execution engine.
- register_default_client(self, ExecutionClient client) void¶
Register the given client as the default routing client (when a specific venue routing cannot be found).
Any existing default routing client will be overwritten.
- Parameters:
client (ExecutionClient) – The client to register.
- register_external_order_claims(self, Strategy strategy) void¶
Register the given strategies external order claim instrument IDs (if any)
- Parameters:
strategy (Strategy) – The strategy for the registration.
- Raises:
InvalidConfiguration – If a strategy is already registered to claim external orders for an instrument ID.
- register_oms_type(self, Strategy strategy) void¶
Register the given trading strategies OMS (Order Management System) type.
- Parameters:
strategy (Strategy) – The strategy for the registration.
- register_venue_routing(self, ExecutionClient client, Venue venue) void¶
Register the given client to route orders to the given venue.
Any existing client in the routing map for the given venue will be overwritten.
- Parameters:
venue (Venue) – The venue to route orders to.
client (ExecutionClient) – The client for the venue routing.
- registered_clients¶
list[ClientId]
Return the execution clients registered with the engine.
- Return type:
list[ClientId]
- Type:
- report_count¶
‘int’
The total count of reports received by the engine.
- Returns:
int
- Type:
- reset(self) void¶
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component will remain in a
RESETTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- resume(self) void¶
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component will remain in a
RESUMINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- set_convert_quote_qty_to_base(self, bool value) void¶
Set the convert_quote_qty_to_base flag with the given value.
- Parameters:
value (bool) – The value to set.
- set_manage_own_order_books(self, bool value) void¶
Set the manage_own_order_books setting with the given value.
- Parameters:
value (bool) – The value to set.
- shutdown_system(self, str reason=None) void¶
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters:
reason (str, optional) – The reason for issuing the shutdown command.
- snapshot_orders¶
If order state snapshots should be persisted.
- Returns:
bool
- snapshot_positions¶
If position state snapshots should be persisted.
- Returns:
bool
- snapshot_positions_interval_secs¶
The interval (seconds) at which additional position state snapshots are persisted.
- Returns:
double
- snapshot_positions_timer_name¶
- start(self) void¶
Start the component.
While executing on_start() any exception will be logged and reraised, then the component will remain in a
STARTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- state¶
ComponentState
Return the components current state.
- Return type:
ComponentState
- Type:
- stop(self) void¶
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component will remain in a
STOPPINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- stop_clients(self) void¶
Stop the registered clients.
- trader_id¶
The trader ID associated with the component.
- Returns:
TraderId
- type¶
The components type.
- Returns:
type
- class LiveRiskEngine¶
Bases:
RiskEngineProvides a high-performance asynchronous live risk engine.
- Parameters:
loop (asyncio.AbstractEventLoop) – The event loop for the engine.
portfolio (PortfolioFacade) – The portfolio for the engine.
msgbus (MessageBus) – The message bus for the engine.
cache (CacheFacade) – The read-only cache for the engine.
clock (LiveClock) – The clock for the engine.
config (LiveRiskEngineConfig) – The configuration for the instance.
- Raises:
TypeError – If config is not of type LiveRiskEngineConfig.
- get_cmd_queue_task() Task | None¶
Return the internal command queue task for the engine.
- Return type:
asyncio.Task or
None
- get_evt_queue_task() Task | None¶
Return the internal event queue task for the engine.
- Return type:
asyncio.Task or
None
- cmd_qsize() int¶
Return the number of Command messages buffered on the internal queue.
- Return type:
int
- evt_qsize() int¶
Return the number of Event messages buffered on the internal queue.
- Return type:
int
- kill() None¶
Kill the engine by abruptly canceling the queue task and calling stop.
- execute(command: Command) None¶
Execute the given command.
If the internal queue is at or near capacity, it logs a warning (throttled) and schedules an asynchronous put() operation. This ensures all messages are eventually enqueued and processed without blocking the caller when the queue is full.
- Parameters:
command (Command) – The command to execute.
Warning
This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.
- process(event: Event) None¶
Process the given event.
If the internal queue is at or near capacity, it logs a warning (throttled) and schedules an asynchronous put() operation. This ensures all messages are eventually enqueued and processed without blocking the caller when the queue is full.
- Parameters:
event (Event) – The event to process.
- command_count¶
The total count of commands received by the engine.
- Returns:
int
- debug¶
If debug mode is active (will provide extra debug logging).
- Returns:
bool
- degrade(self) void¶
Degrade the component.
While executing on_degrade() any exception will be logged and reraised, then the component will remain in a
DEGRADINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- dispose(self) void¶
Dispose of the component.
While executing on_dispose() any exception will be logged and reraised, then the component will remain in a
DISPOSINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- event_count¶
The total count of events received by the engine.
- Returns:
int
- fault(self) void¶
Fault the component.
Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.
While executing on_fault() any exception will be logged and reraised, then the component will remain in a
FAULTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- classmethod fully_qualified_name(cls) str¶
Return the fully qualified name for the components class.
- Return type:
str
References
- id¶
The components ID.
- Returns:
ComponentId
- is_bypassed¶
If the risk engine is completely bypassed.
- Returns:
bool
- is_degraded¶
bool
Return whether the current component state is
DEGRADED.- Return type:
bool
- Type:
- is_disposed¶
bool
Return whether the current component state is
DISPOSED.- Return type:
bool
- Type:
- is_faulted¶
bool
Return whether the current component state is
FAULTED.- Return type:
bool
- Type:
- is_initialized¶
bool
Return whether the component has been initialized (component.state >=
INITIALIZED).- Return type:
bool
- Type:
- is_running¶
bool
Return whether the current component state is
RUNNING.- Return type:
bool
- Type:
- is_stopped¶
bool
Return whether the current component state is
STOPPED.- Return type:
bool
- Type:
- max_notional_per_order(self, InstrumentId instrument_id)¶
Return the current maximum notional per order for the given instrument ID.
- Return type:
Decimal or
None
- max_notionals_per_order(self) dict¶
Return the current maximum notionals per order settings.
- Return type:
dict[InstrumentId, Decimal]
- max_order_modify_rate(self) tuple¶
Return the current maximum order modify rate limit setting.
- Returns:
The limit per timedelta interval.
- Return type:
(int, timedelta)
- max_order_submit_rate(self) tuple¶
Return the current maximum order submit rate limit setting.
- Returns:
The limit per timedelta interval.
- Return type:
(int, timedelta)
- reset(self) void¶
Reset the component.
All stateful fields are reset to their initial value.
While executing on_reset() any exception will be logged and reraised, then the component will remain in a
RESETTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- resume(self) void¶
Resume the component.
While executing on_resume() any exception will be logged and reraised, then the component will remain in a
RESUMINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- set_max_notional_per_order(self, InstrumentId instrument_id, new_value) void¶
Set the maximum notional value per order for the given instrument ID.
Passing a new_value of
Nonewill disable the pre-trade risk max notional check.- Parameters:
instrument_id (InstrumentId) – The instrument ID for the max notional.
new_value (integer, float, string or Decimal) – The max notional value to set.
- Raises:
decimal.InvalidOperation – If new_value not a valid input for decimal.Decimal.
ValueError – If new_value is not
Noneand not positive.
- set_trading_state(self, TradingState state) void¶
Set the trading state for the engine.
- Parameters:
state (TradingState) – The state to set.
- shutdown_system(self, str reason=None) void¶
Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.
The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.
- Parameters:
reason (str, optional) – The reason for issuing the shutdown command.
- start(self) void¶
Start the component.
While executing on_start() any exception will be logged and reraised, then the component will remain in a
STARTINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- state¶
ComponentState
Return the components current state.
- Return type:
ComponentState
- Type:
- stop(self) void¶
Stop the component.
While executing on_stop() any exception will be logged and reraised, then the component will remain in a
STOPPINGstate.Warning
Do not override.
If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.
- trader_id¶
The trader ID associated with the component.
- Returns:
TraderId
- trading_state¶
The current trading state for the engine.
- Returns:
TradingState
- type¶
The components type.
- Returns:
type
- class TradingNode¶
Bases:
objectProvides an asynchronous network node for live trading.
- Parameters:
config (TradingNodeConfig, optional) – The configuration for the instance.
loop (asyncio.AbstractEventLoop, optional) – The event loop for the node. If
Nonethen will get the running event loop internally.
- property machine_id: str¶
Return the nodes machine ID.
- Return type:
str
- property cache: CacheFacade¶
Return the nodes internal read-only cache.
- Return type:
- property portfolio: PortfolioFacade¶
Return the nodes internal read-only portfolio.
- Return type:
- is_running() bool¶
Return whether the trading node is running.
- Return type:
bool
- is_built() bool¶
Return whether the trading node clients are built.
- Return type:
bool
- get_event_loop() AbstractEventLoop | None¶
Return the event loop of the trading node.
- Return type:
asyncio.AbstractEventLoop or
None
- add_stream_processor(callback: Callable) None¶
Add the given stream processor callback.
- Parameters:
callback (Callable) – The callback to add.
- add_data_client_factory(name: str, factory: type[LiveDataClientFactory]) None¶
Add the given data client factory to the node.
- Parameters:
name (str) – The name of the client factory.
factory (type[LiveDataClientFactory]) – The factory class to add.
- Raises:
ValueError – If name is not a valid string.
KeyError – If name has already been added.
- add_exec_client_factory(name: str, factory: type[LiveExecClientFactory]) None¶
Add the given execution client factory to the node.
- Parameters:
name (str) – The name of the client factory.
factory (type[LiveExecutionClientFactory]) – The factory class to add.
- Raises:
ValueError – If name is not a valid string.
KeyError – If name has already been added.
- build() None¶
Build the nodes clients.
- run(raise_exception: bool = False) None¶
Start and run the trading node.
- Parameters:
raise_exception (bool, default False) – If runtime exceptions should be re-raised as well as being logged.
- publish_bus_message(bus_msg: BusMessage) None¶
Publish bus message on the internal message bus.
Note the message will not be published externally.
- Parameters:
bus_msg (nautilus_pyo3.BusMessage) – The bus message to publish.
- async run_async() None¶
Start and run the trading node asynchronously.
- stop() None¶
Stop the trading node gracefully.
After a specified delay the internal Trader residual state will be checked.
If save strategy is configured, then strategy states will be saved.
- async stop_async() None¶
Stop the trading node gracefully, asynchronously.
After a specified delay the internal Trader residual state will be checked.
If save strategy is configured, then strategy states will be saved.
- dispose() None¶
Dispose of the trading node.
Gracefully shuts down the executor and event loop.
- class TradingNodeBuilder¶
Bases:
objectProvides building services for a trading node.
- Parameters:
loop (asyncio.AbstractEventLoop) – The event loop for the clients.
data_engine (LiveDataEngine) – The data engine for the trading node.
exec_engine (LiveExecutionEngine) – The execution engine for the trading node.
portfolio (Portfolio) – The portfolio for the trading node.
msgbus (MessageBus) – The message bus for the trading node.
cache (Cache) – The cache for building clients.
clock (LiveClock) – The clock for building clients.
logger (Logger) – The logger for building clients.
log (Logger) – The trading nodes logger.
- add_data_client_factory(name: str, factory: type[LiveDataClientFactory]) None¶
Add the given data client factory to the builder.
- Parameters:
name (str) – The name of the client.
factory (type[LiveDataClientFactory]) – The factory to add.
- Raises:
ValueError – If name is not a valid string.
KeyError – If name has already been added.
- add_exec_client_factory(name: str, factory: type[LiveExecClientFactory]) None¶
Add the given client factory to the builder.
- Parameters:
name (str) – The name of the client.
factory (type[LiveExecClientFactory]) – The factory to add.
- Raises:
ValueError – If name is not a valid string.
KeyError – If name has already been added.
- build_data_clients(config: dict[str, LiveDataClientConfig]) None¶
Build the data clients with the given configuration.
- Parameters:
config (dict[str, ImportableConfig | LiveDataClientConfig]) – The data clients configuration.
- build_exec_clients(config: dict[str, LiveExecClientConfig]) None¶
Build the execution clients with the given configuration.
- Parameters:
config (dict[str, ImportableConfig | LiveExecClientConfig]) – The execution clients configuration.