Skip to main content
Version: latest

Live

The live subpackage groups all engine and client implementations for live trading.

Generally a common event loop is passed into each live engine to support the overarching design of a single efficient event loop, by default uvloop.

The LiveDataClient class is responsible for interfacing with a particular API which may be presented directly by a venue, or through a broker intermediary.

It could also be possible to write clients for specialized data providers.

class LiveDataClient

Bases: DataClient

The base class for all live data clients.

  • Parameters:
    • loop (asyncio.AbstractEventLoop) – The event loop for the client.
    • client_id (ClientId) – The client ID.
    • venue (Venue or None) – The client venue. If multi-venue then can be None.
    • msgbus (MessageBus) – The message bus for the client.
    • cache (Cache) – The cache for the client.
    • clock (LiveClock) – The clock for the client.
    • config (NautilusConfig , optional) – The configuration for the instance.

WARNING

This class should not be used directly, but through a concrete subclass.

async run_after_delay(delay: float, coro: Coroutine) → None

Run the given coroutine after a delay.

  • Parameters:
    • delay (float) – The delay (seconds) before running the coroutine.
    • coro (Coroutine) – The coroutine to run after the initial delay.

create_task(coro: ~collections.abc.Coroutine, log_msg: str | None = None, actions: ~collections.abc.Callable | None = None, success_msg: str | None = None, success_color: ~nautilus_trader.core.rust.common.LogColor = <LogColor.NORMAL: 0>) → Task

Run the given coroutine with error handling and optional callback actions when done.

  • Parameters:
    • coro (Coroutine) – The coroutine to run.
    • log_msg (str , optional) – The log message for the task.
    • actions (Callable , optional) – The actions callback to run when the coroutine is done.
    • success_msg (str , optional) – The log message to write on actions success.
    • success_color (LogColor, default NORMAL) – The log message color for actions success.
  • Return type: asyncio.Task

connect() → None

Connect the client.

disconnect() → None

Disconnect the client.

subscribe(self, DataType data_type, dict params=None) → void

Subscribe to data for the given data type.

  • Parameters: data_type (DataType) – The data type for the subscription.

unsubscribe(self, DataType data_type, dict params=None) → void

Unsubscribe from data for the given data type.

  • Parameters: data_type (DataType) – The data type for the subscription.

request(self, DataType data_type, UUID4 correlation_id, dict params=None) → void

Request data for the given data type.

  • Parameters:
    • data_type (DataType) – The data type for the subscription.
    • correlation_id (UUID4) – The correlation ID for the response.

degrade(self) → void

Degrade the component.

While executing on_degrade() any exception will be logged and reraised, then the component will remain in a DEGRADING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

dispose(self) → void

Dispose of the component.

While executing on_dispose() any exception will be logged and reraised, then the component will remain in a DISPOSING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

fault(self) → void

Fault the component.

Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.

While executing on_fault() any exception will be logged and reraised, then the component will remain in a FAULTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

classmethod fully_qualified_name(cls) → str

Return the fully qualified name for the components class.

  • Return type: str

id

The components ID.

  • Returns: ComponentId

is_connected

If the client is connected.

  • Returns: bool

is_degraded

bool

Return whether the current component state is DEGRADED.

  • Return type: bool
  • Type: Component.is_degraded

is_disposed

bool

Return whether the current component state is DISPOSED.

  • Return type: bool
  • Type: Component.is_disposed

is_faulted

bool

Return whether the current component state is FAULTED.

  • Return type: bool
  • Type: Component.is_faulted

is_initialized

bool

Return whether the component has been initialized (component.state >= INITIALIZED).

  • Return type: bool
  • Type: Component.is_initialized

is_running

bool

Return whether the current component state is RUNNING.

  • Return type: bool
  • Type: Component.is_running

is_stopped

bool

Return whether the current component state is STOPPED.

  • Return type: bool
  • Type: Component.is_stopped

reset(self) → void

Reset the component.

All stateful fields are reset to their initial value.

While executing on_reset() any exception will be logged and reraised, then the component will remain in a RESETTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

resume(self) → void

Resume the component.

While executing on_resume() any exception will be logged and reraised, then the component will remain in a RESUMING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

shutdown_system(self, unicode reason=None) → void

Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.

The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.

  • Parameters: reason (str , optional) – The reason for issuing the shutdown command.

start(self) → void

Start the component.

While executing on_start() any exception will be logged and reraised, then the component will remain in a STARTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

state

ComponentState

Return the components current state.

  • Return type: ComponentState
  • Type: Component.state

stop(self) → void

Stop the component.

While executing on_stop() any exception will be logged and reraised, then the component will remain in a STOPPING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

subscribed_custom_data(self) → list

Return the custom data types subscribed to.

trader_id

The trader ID associated with the component.

  • Returns: TraderId

type

The components type.

  • Returns: type

venue

The clients venue ID (if applicable).

  • Returns: Venue or None

class LiveMarketDataClient

Bases: MarketDataClient

The base class for all live data clients.

  • Parameters:
    • loop (asyncio.AbstractEventLoop) – The event loop for the client.
    • client_id (ClientId) – The client ID.
    • venue (Venue or None) – The client venue. If multi-venue then can be None.
    • msgbus (MessageBus) – The message bus for the client.
    • cache (Cache) – The cache for the client.
    • clock (LiveClock) – The clock for the client.
    • instrument_provider (InstrumentProvider) – The instrument provider for the client.
    • config (NautilusConfig , optional) – The configuration for the instance.

WARNING

This class should not be used directly, but through a concrete subclass.

async run_after_delay(delay: float, coro: Coroutine) → None

Run the given coroutine after a delay.

  • Parameters:
    • delay (float) – The delay (seconds) before running the coroutine.
    • coro (Coroutine) – The coroutine to run after the initial delay.

create_task(coro: ~collections.abc.Coroutine, log_msg: str | None = None, actions: ~collections.abc.Callable | None = None, success_msg: str | None = None, success_color: ~nautilus_trader.core.rust.common.LogColor = <LogColor.NORMAL: 0>) → Task

Run the given coroutine with error handling and optional callback actions when done.

  • Parameters:
    • coro (Coroutine) – The coroutine to run.
    • log_msg (str , optional) – The log message for the task.
    • actions (Callable , optional) – The actions callback to run when the coroutine is done.
    • success_msg (str , optional) – The log message to write on actions success.
    • success_color (LogColor, default NORMAL) – The log message color for actions success.
  • Return type: asyncio.Task

connect() → None

Connect the client.

disconnect() → None

Disconnect the client.

subscribe(self, DataType data_type, dict params=None) → void

Subscribe to data for the given data type.

  • Parameters: data_type (DataType) – The data type for the subscription.

subscribe_instruments(self, dict params=None) → void

Subscribe to all Instrument data.

  • Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_instrument(self, InstrumentId instrument_id, dict params=None) → void

Subscribe to the Instrument with the given instrument ID.

  • Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_order_book_deltas(self, InstrumentId instrument_id, BookType book_type, int depth=0, dict params=None) → void

Subscribe to OrderBookDeltas data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The order book instrument to subscribe to.
    • book_type (BookType {L1_MBP, L2_MBP, L3_MBO}) – The order book type.
    • depth (int , optional , default None) – The maximum depth for the subscription.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_order_book_snapshots(self, InstrumentId instrument_id, BookType book_type, int depth=0, dict params=None) → void

Subscribe to OrderBook snapshots data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The order book instrument to subscribe to.
    • book_type (BookType {L1_MBP, L2_MBP, L3_MBO}) – The order book level.
    • depth (int , optional) – The maximum depth for the order book. A depth of 0 is maximum depth.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_quote_ticks(self, InstrumentId instrument_id, dict params=None) → void

Subscribe to QuoteTick data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument to subscribe to.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_trade_ticks(self, InstrumentId instrument_id, dict params=None) → void

Subscribe to TradeTick data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument to subscribe to.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_bars(self, BarType bar_type, dict params=None) → void

Subscribe to Bar data for the given bar type.

  • Parameters:
    • bar_type (BarType) – The bar type to subscribe to.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_instrument_status(self, InstrumentId instrument_id, dict params=None) → void

Subscribe to InstrumentStatus data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument to subscribe to.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

subscribe_instrument_close(self, InstrumentId instrument_id, dict params=None) → void

Subscribe to InstrumentClose updates for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument to subscribe to.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe(self, DataType data_type, dict params=None) → void

Unsubscribe from data for the given data type.

  • Parameters: data_type (DataType) – The data type for the subscription.

unsubscribe_instruments(self, dict params=None) → void

Unsubscribe from all Instrument data.

  • Parameters: params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_instrument(self, InstrumentId instrument_id, dict params=None) → void

Unsubscribe from Instrument data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The instrument to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_order_book_deltas(self, InstrumentId instrument_id, dict params=None) → void

Unsubscribe from OrderBookDeltas data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The order book instrument to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_order_book_snapshots(self, InstrumentId instrument_id, dict params=None) → void

Unsubscribe from OrderBook snapshots data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The order book instrument to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_quote_ticks(self, InstrumentId instrument_id, dict params=None) → void

Unsubscribe from QuoteTick data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_trade_ticks(self, InstrumentId instrument_id, dict params=None) → void

Unsubscribe from TradeTick data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_bars(self, BarType bar_type, dict params=None) → void

Unsubscribe from Bar data for the given bar type.

  • Parameters:
    • bar_type (BarType) – The bar type to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_instrument_status(self, InstrumentId instrument_id, dict params=None) → void

Unsubscribe from InstrumentStatus data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The instrument status updates to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

unsubscribe_instrument_close(self, InstrumentId instrument_id, dict params=None) → void

Unsubscribe from InstrumentClose data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument to unsubscribe from.
    • params (dict *[*str , Any ] , optional) – Additional params for the subscription.

request(self, DataType data_type, UUID4 correlation_id, dict params=None) → void

Request data for the given data type.

  • Parameters:
    • data_type (DataType) – The data type for the subscription.
    • correlation_id (UUID4) – The correlation ID for the response.

request_instrument(self, InstrumentId instrument_id, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void

Request Instrument data for the given instrument ID.

  • Parameters:
    • instrument_id (InstrumentId) – The instrument ID for the request.
    • correlation_id (UUID4) – The correlation ID for the request.
    • start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
    • end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
    • params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.

request_instruments(self, Venue venue, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void

Request all Instrument data for the given venue.

  • Parameters:
    • venue (Venue) – The venue for the request.
    • correlation_id (UUID4) – The correlation ID for the request.
    • start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
    • end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
    • params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.

request_quote_ticks(self, InstrumentId instrument_id, int limit, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void

Request historical QuoteTick data.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument ID for the request.
    • limit (int) – The limit for the number of returned ticks.
    • correlation_id (UUID4) – The correlation ID for the request.
    • start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
    • end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
    • params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.

request_trade_ticks(self, InstrumentId instrument_id, int limit, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void

Request historical TradeTick data.

  • Parameters:
    • instrument_id (InstrumentId) – The tick instrument ID for the request.
    • limit (int) – The limit for the number of returned ticks.
    • correlation_id (UUID4) – The correlation ID for the request.
    • start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
    • end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
    • params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.

request_bars(self, BarType bar_type, int limit, UUID4 correlation_id, datetime start=None, datetime end=None, dict params=None) → void

Request historical Bar data. To load historical data from a catalog, you can pass a list[DataCatalogConfig] to the TradingNodeConfig or the BacktestEngineConfig.

  • Parameters:
    • bar_type (BarType) – The bar type for the request.
    • limit (int) – The limit for the number of returned bars.
    • correlation_id (UUID4) – The correlation ID for the request.
    • start (datetime , optional) – The start datetime (UTC) of request time range (inclusive).
    • end (datetime , optional) – The end datetime (UTC) of request time range. The inclusiveness depends on individual data client implementation.
    • params (dict *[*str , Any ] , optional) – Additional params to be sent with the request.

request_order_book_snapshot(self, InstrumentId instrument_id, int limit, UUID4 correlation_id, dict params=None) → void

Request order book snapshot data.

  • Parameters:
    • instrument_id (InstrumentId) – The instrument ID for the order book snapshot request.
    • limit (int) – The limit on the depth of the order book snapshot.
    • correction_id (UUID4) – The correlation ID for the request.

degrade(self) → void

Degrade the component.

While executing on_degrade() any exception will be logged and reraised, then the component will remain in a DEGRADING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

dispose(self) → void

Dispose of the component.

While executing on_dispose() any exception will be logged and reraised, then the component will remain in a DISPOSING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

fault(self) → void

Fault the component.

Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.

While executing on_fault() any exception will be logged and reraised, then the component will remain in a FAULTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

classmethod fully_qualified_name(cls) → str

Return the fully qualified name for the components class.

  • Return type: str

id

The components ID.

  • Returns: ComponentId

is_connected

If the client is connected.

  • Returns: bool

is_degraded

bool

Return whether the current component state is DEGRADED.

  • Return type: bool
  • Type: Component.is_degraded

is_disposed

bool

Return whether the current component state is DISPOSED.

  • Return type: bool
  • Type: Component.is_disposed

is_faulted

bool

Return whether the current component state is FAULTED.

  • Return type: bool
  • Type: Component.is_faulted

is_initialized

bool

Return whether the component has been initialized (component.state >= INITIALIZED).

  • Return type: bool
  • Type: Component.is_initialized

is_running

bool

Return whether the current component state is RUNNING.

  • Return type: bool
  • Type: Component.is_running

is_stopped

bool

Return whether the current component state is STOPPED.

  • Return type: bool
  • Type: Component.is_stopped

reset(self) → void

Reset the component.

All stateful fields are reset to their initial value.

While executing on_reset() any exception will be logged and reraised, then the component will remain in a RESETTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

resume(self) → void

Resume the component.

While executing on_resume() any exception will be logged and reraised, then the component will remain in a RESUMING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

shutdown_system(self, unicode reason=None) → void

Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.

The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.

  • Parameters: reason (str , optional) – The reason for issuing the shutdown command.

start(self) → void

Start the component.

While executing on_start() any exception will be logged and reraised, then the component will remain in a STARTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

state

ComponentState

Return the components current state.

  • Return type: ComponentState
  • Type: Component.state

stop(self) → void

Stop the component.

While executing on_stop() any exception will be logged and reraised, then the component will remain in a STOPPING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

subscribed_bars(self) → list

Return the bar types subscribed to.

subscribed_custom_data(self) → list

Return the custom data types subscribed to.

subscribed_instrument_close(self) → list

Return the instrument closes subscribed to.

subscribed_instrument_status(self) → list

Return the status update instruments subscribed to.

subscribed_instruments(self) → list

Return the instruments subscribed to.

subscribed_order_book_deltas(self) → list

Return the order book delta instruments subscribed to.

subscribed_order_book_snapshots(self) → list

Return the order book snapshot instruments subscribed to.

subscribed_quote_ticks(self) → list

Return the quote tick instruments subscribed to.

subscribed_trade_ticks(self) → list

Return the trade tick instruments subscribed to.

trader_id

The trader ID associated with the component.

  • Returns: TraderId

type

The components type.

  • Returns: type

venue

The clients venue ID (if applicable).

  • Returns: Venue or None

class LiveDataEngine

Bases: DataEngine

Provides a high-performance asynchronous live data engine.

  • Parameters:
    • loop (asyncio.AbstractEventLoop) – The event loop for the engine.
    • msgbus (MessageBus) – The message bus for the engine.
    • cache (Cache) – The cache for the engine.
    • clock (LiveClock) – The clock for the engine.
    • config (LiveDataEngineConfig , optional) – The configuration for the instance.
  • Raises: TypeError – If config is not of type LiveDataEngineConfig.

connect() → None

Connect the engine by calling connect on all registered clients.

disconnect() → None

Disconnect the engine by calling disconnect on all registered clients.

get_cmd_queue_task() → Task | None

Return the internal command queue task for the engine.

  • Return type: asyncio.Task or None

get_req_queue_task() → Task | None

Return the internal request queue task for the engine.

  • Return type: asyncio.Task or None

get_res_queue_task() → Task | None

Return the internal response queue task for the engine.

  • Return type: asyncio.Task or None

get_data_queue_task() → Task | None

Return the internal data queue task for the engine.

  • Return type: asyncio.Task or None

cmd_qsize() → int

Return the number of DataCommand objects buffered on the internal queue.

  • Return type: int

req_qsize() → int

Return the number of DataRequest objects buffered on the internal queue.

  • Return type: int

res_qsize() → int

Return the number of DataResponse objects buffered on the internal queue.

  • Return type: int

data_qsize() → int

Return the number of Data objects buffered on the internal queue.

  • Return type: int

kill() → None

Kill the engine by abruptly canceling the queue tasks and calling stop.

execute(command: DataCommand) → None

Execute the given data command.

If the internal queue is already full then will log a warning and block until queue size reduces.

  • Parameters: command (DataCommand) – The command to execute.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

request(request: DataRequest) → None

Handle the given request.

If the internal queue is already full then will log a warning and block until queue size reduces.

  • Parameters: request (DataRequest) – The request to handle.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

response(response: DataResponse) → None

Handle the given response.

If the internal queue is already full then will log a warning and block until queue size reduces.

  • Parameters: response (DataResponse) – The response to handle.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

process(data: Data) → None

Process the given data.

If the internal queue is already full then will log a warning and block until queue size reduces.

  • Parameters: data (Data) – The data to process.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

check_connected(self) → bool

Check all of the engines clients are connected.

  • Returns: True if all clients connected, else False.
  • Return type: bool

check_disconnected(self) → bool

Check all of the engines clients are disconnected.

  • Returns: True if all clients disconnected, else False.
  • Return type: bool

command_count

The total count of data commands received by the engine.

  • Returns: int

data_count

The total count of data stream objects received by the engine.

  • Returns: int

debug

If debug mode is active (will provide extra debug logging).

  • Returns: bool

default_client

ClientId | None

Return the default data client registered with the engine.

  • Return type: ClientId or None
  • Type: DataEngine.default_client

degrade(self) → void

Degrade the component.

While executing on_degrade() any exception will be logged and reraised, then the component will remain in a DEGRADING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

deregister_client(self, DataClient client) → void

Deregister the given data client from the data engine.

  • Parameters: client (DataClient) – The data client to deregister.

dispose(self) → void

Dispose of the component.

While executing on_dispose() any exception will be logged and reraised, then the component will remain in a DISPOSING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

fault(self) → void

Fault the component.

Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.

While executing on_fault() any exception will be logged and reraised, then the component will remain in a FAULTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

classmethod fully_qualified_name(cls) → str

Return the fully qualified name for the components class.

  • Return type: str

id

The components ID.

  • Returns: ComponentId

is_degraded

bool

Return whether the current component state is DEGRADED.

  • Return type: bool
  • Type: Component.is_degraded

is_disposed

bool

Return whether the current component state is DISPOSED.

  • Return type: bool
  • Type: Component.is_disposed

is_faulted

bool

Return whether the current component state is FAULTED.

  • Return type: bool
  • Type: Component.is_faulted

is_initialized

bool

Return whether the component has been initialized (component.state >= INITIALIZED).

  • Return type: bool
  • Type: Component.is_initialized

is_running

bool

Return whether the current component state is RUNNING.

  • Return type: bool
  • Type: Component.is_running

is_stopped

bool

Return whether the current component state is STOPPED.

  • Return type: bool
  • Type: Component.is_stopped

register_catalog(self, catalog: ParquetDataCatalog, unicode name: str = u'catalog_0') → None

Register the given data catalog with the engine.

  • Parameters:
    • catalog (ParquetDataCatalog) – The data catalog to register.
    • name (str , default 'catalog_0') – The name of the catalog to register.

register_client(self, DataClient client) → void

Register the given data client with the data engine.

  • Parameters: client (DataClient) – The client to register.
  • Raises: ValueError – If client is already registered.

register_default_client(self, DataClient client) → void

Register the given client as the default routing client (when a specific venue routing cannot be found).

Any existing default routing client will be overwritten.

  • Parameters: client (DataClient) – The client to register.

register_venue_routing(self, DataClient client, Venue venue) → void

Register the given client to route messages to the given venue.

Any existing client in the routing map for the given venue will be overwritten.

  • Parameters:
    • venue (Venue) – The venue to route messages to.
    • client (DataClient) – The client for the venue routing.

registered_clients

list[ClientId]

Return the execution clients registered with the engine.

  • Return type: list[ClientId]
  • Type: DataEngine.registered_clients

request_count

The total count of data requests received by the engine.

  • Returns: int

reset(self) → void

Reset the component.

All stateful fields are reset to their initial value.

While executing on_reset() any exception will be logged and reraised, then the component will remain in a RESETTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

response_count

The total count of data responses received by the engine.

  • Returns: int

resume(self) → void

Resume the component.

While executing on_resume() any exception will be logged and reraised, then the component will remain in a RESUMING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

shutdown_system(self, unicode reason=None) → void

Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.

The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.

  • Parameters: reason (str , optional) – The reason for issuing the shutdown command.

start(self) → void

Start the component.

While executing on_start() any exception will be logged and reraised, then the component will remain in a STARTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

state

ComponentState

Return the components current state.

  • Return type: ComponentState
  • Type: Component.state

stop(self) → void

Stop the component.

While executing on_stop() any exception will be logged and reraised, then the component will remain in a STOPPING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

stop_clients(self) → void

Stop the registered clients.

subscribed_bars(self) → list

Return the bar types subscribed to.

subscribed_custom_data(self) → list

Return the custom data types subscribed to.

subscribed_instrument_close(self) → list

Return the close price instruments subscribed to.

subscribed_instrument_status(self) → list

Return the status update instruments subscribed to.

subscribed_instruments(self) → list

Return the instruments subscribed to.

subscribed_order_book_deltas(self) → list

Return the order book delta instruments subscribed to.

subscribed_order_book_snapshots(self) → list

Return the order book snapshot instruments subscribed to.

subscribed_quote_ticks(self) → list

Return the quote tick instruments subscribed to.

subscribed_synthetic_quotes(self) → list

Return the synthetic instrument quotes subscribed to.

subscribed_synthetic_trades(self) → list

Return the synthetic instrument trades subscribed to.

subscribed_trade_ticks(self) → list

Return the trade tick instruments subscribed to.

trader_id

The trader ID associated with the component.

  • Returns: TraderId

type

The components type.

  • Returns: type

The LiveExecutionClient class is responsible for interfacing with a particular API which may be presented directly by a venue, or through a broker intermediary.

class LiveExecutionClient

Bases: ExecutionClient

The base class for all live execution clients.

  • Parameters:
    • loop (asyncio.AbstractEventLoop) – The event loop for the client.
    • client_id (ClientId) – The client ID.
    • venue (Venue or None) – The client venue. If multi-venue then can be None.
    • instrument_provider (InstrumentProvider) – The instrument provider for the client.
    • account_type (AccountType) – The account type for the client.
    • base_currency (Currency , optional) – The account base currency for the client. Use None for multi-currency accounts.
    • msgbus (MessageBus) – The message bus for the client.
    • cache (Cache) – The cache for the client.
    • clock (LiveClock) – The clock for the client.
    • config (NautilusConfig , optional) – The configuration for the instance.
  • Raises: ValueError – If oms_type is UNSPECIFIED (must be specified).

WARNING

This class should not be used directly, but through a concrete subclass.

async run_after_delay(delay: float, coro: Coroutine) → None

Run the given coroutine after a delay.

  • Parameters:
    • delay (float) – The delay (seconds) before running the coroutine.
    • coro (Coroutine) – The coroutine to run after the initial delay.

create_task(coro: ~collections.abc.Coroutine, log_msg: str | None = None, actions: ~collections.abc.Callable | None = None, success_msg: str | None = None, success_color: ~nautilus_trader.core.rust.common.LogColor = <LogColor.NORMAL: 0>) → Task

Run the given coroutine with error handling and optional callback actions when done.

  • Parameters:
    • coro (Coroutine) – The coroutine to run.
    • log_msg (str , optional) – The log message for the task.
    • actions (Callable , optional) – The actions callback to run when the coroutine is done.
    • success_msg (str , optional) – The log message to write on actions success.
    • success_color (str, default NORMAL) – The log message color for actions success.
  • Return type: asyncio.Task

connect() → None

Connect the client.

disconnect() → None

Disconnect the client.

submit_order(self, SubmitOrder command) → void

Submit the order contained in the given command for execution.

  • Parameters: command (SubmitOrder) – The command to execute.

submit_order_list(self, SubmitOrderList command) → void

Submit the order list contained in the given command for execution.

modify_order(self, ModifyOrder command) → void

Modify the order with parameters contained in the command.

  • Parameters: command (ModifyOrder) – The command to execute.

cancel_order(self, CancelOrder command) → void

Cancel the order with the client order ID contained in the given command.

  • Parameters: command (CancelOrder) – The command to execute.

cancel_all_orders(self, CancelAllOrders command) → void

Cancel all orders for the instrument ID contained in the given command.

batch_cancel_orders(self, BatchCancelOrders command) → void

Batch cancel orders for the instrument ID contained in the given command.

query_order(self, QueryOrder command) → void

Initiate a reconciliation for the queried order which will generate an OrderStatusReport.

  • Parameters: command (QueryOrder) – The command to execute.

async generate_order_status_report(instrument_id: InstrumentId, client_order_id: ClientOrderId | None = None, venue_order_id: VenueOrderId | None = None) → OrderStatusReport | None

Generate an OrderStatusReport for the given order identifier parameter(s).

If the order is not found, or an error occurs, then logs and returns None.

  • Parameters:
    • instrument_id (InstrumentId) – The instrument ID for the report.
    • client_order_id (ClientOrderId , optional) – The client order ID for the report.
    • venue_order_id (VenueOrderId , optional) – The venue order ID for the report.
  • Return type: OrderStatusReport or None
  • Raises: ValueError – If both the client_order_id and venue_order_id are None.

async generate_order_status_reports(instrument_id: InstrumentId | None = None, start: Timestamp | None = None, end: Timestamp | None = None, open_only: bool = False) → list[OrderStatusReport]

Generate a list of

`

OrderStatusReport`s with optional query filters.

The returned list may be empty if no orders match the given parameters.

  • Parameters:
    • instrument_id (InstrumentId , optional) – The instrument ID query filter.
    • start (pd.Timestamp , optional) – The start datetime (UTC) query filter.
    • end (pd.Timestamp , optional) – The end datetime (UTC) query filter.
    • open_only (bool , default False) – If the query is for open orders only.
  • Return type: list[OrderStatusReport]

async generate_fill_reports(instrument_id: InstrumentId | None = None, venue_order_id: VenueOrderId | None = None, start: Timestamp | None = None, end: Timestamp | None = None) → list[FillReport]

Generate a list of

`

FillReport`s with optional query filters.

The returned list may be empty if no trades match the given parameters.

  • Parameters:
    • instrument_id (InstrumentId , optional) – The instrument ID query filter.
    • venue_order_id (VenueOrderId , optional) – The venue order ID (assigned by the venue) query filter.
    • start (pd.Timestamp , optional) – The start datetime (UTC) query filter.
    • end (pd.Timestamp , optional) – The end datetime (UTC) query filter.
  • Return type: list[FillReport]

async generate_position_status_reports(instrument_id: InstrumentId | None = None, start: Timestamp | None = None, end: Timestamp | None = None) → list[PositionStatusReport]

Generate a list of

`

PositionStatusReport`s with optional query filters.

The returned list may be empty if no positions match the given parameters.

  • Parameters:
    • instrument_id (InstrumentId , optional) – The instrument ID query filter.
    • start (pd.Timestamp , optional) – The start datetime (UTC) query filter.
    • end (pd.Timestamp , optional) – The end datetime (UTC) query filter.
  • Return type: list[PositionStatusReport]

async generate_mass_status(lookback_mins: int | None = None) → ExecutionMassStatus | None

Generate an ExecutionMassStatus report.

  • Parameters: lookback_mins (int , optional) – The maximum lookback for querying closed orders, trades and positions.
  • Return type: ExecutionMassStatus or None

account_id

The clients account ID.

  • Returns: AccountId or None

account_type

The clients account type.

  • Returns: AccountType

base_currency

The clients account base currency (None for multi-currency accounts).

  • Returns: Currency or None

degrade(self) → void

Degrade the component.

While executing on_degrade() any exception will be logged and reraised, then the component will remain in a DEGRADING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

dispose(self) → void

Dispose of the component.

While executing on_dispose() any exception will be logged and reraised, then the component will remain in a DISPOSING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

fault(self) → void

Fault the component.

Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.

While executing on_fault() any exception will be logged and reraised, then the component will remain in a FAULTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

classmethod fully_qualified_name(cls) → str

Return the fully qualified name for the components class.

  • Return type: str

generate_account_state(self, list balances, list margins, bool reported, uint64_t ts_event, dict info=None) → void

Generate an AccountState event and publish on the message bus.

  • Parameters:
    • balances (list [AccountBalance ]) – The account balances.
    • margins (list [MarginBalance ]) – The margin balances.
    • reported (bool) – If the balances are reported directly from the exchange.
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the account state event occurred.
    • info (dict *[*str , object ]) – The additional implementation specific account information.

generate_order_accepted(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) → void

Generate an OrderAccepted event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order accepted event occurred.

generate_order_cancel_rejected(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, unicode reason, uint64_t ts_event) → void

Generate an OrderCancelRejected event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • reason (str) – The order cancel rejected reason.
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order cancel rejected event occurred.

generate_order_canceled(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) → void

Generate an OrderCanceled event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when order canceled event occurred.

generate_order_expired(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) → void

Generate an OrderExpired event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order expired event occurred.

generate_order_filled(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, PositionId venue_position_id: PositionId | None, TradeId trade_id, OrderSide order_side, OrderType order_type, Quantity last_qty, Price last_px, Currency quote_currency, Money commission, LiquiditySide liquidity_side, uint64_t ts_event, dict info=None) → void

Generate an OrderFilled event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • trade_id (TradeId) – The trade ID.
    • venue_position_id (PositionId or None) – The venue position ID associated with the order. If the trading venue has assigned a position ID / ticket then pass that here, otherwise pass None and the execution engine OMS will handle position ID resolution.
    • order_side (OrderSide {BUY, SELL}) – The execution order side.
    • order_type (OrderType) – The execution order type.
    • last_qty (Quantity) – The fill quantity for this execution.
    • last_px (Price) – The fill price for this execution (not average price).
    • quote_currency (Currency) – The currency of the price.
    • commission (Money) – The fill commission.
    • liquidity_side (LiquiditySide {NO_LIQUIDITY_SIDE, MAKER, TAKER}) – The execution liquidity side.
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order filled event occurred.
    • info (dict *[*str , object ] , optional) – The additional fill information.

generate_order_modify_rejected(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, unicode reason, uint64_t ts_event) → void

Generate an OrderModifyRejected event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • reason (str) – The order update rejected reason.
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order update rejection event occurred.

generate_order_rejected(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, unicode reason, uint64_t ts_event) → void

Generate an OrderRejected event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • reason (datetime) – The order rejected reason.
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order rejected event occurred.

generate_order_submitted(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, uint64_t ts_event) → void

Generate an OrderSubmitted event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order submitted event occurred.

generate_order_triggered(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, uint64_t ts_event) → void

Generate an OrderTriggered event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order triggered event occurred.

generate_order_updated(self, StrategyId strategy_id, InstrumentId instrument_id, ClientOrderId client_order_id, VenueOrderId venue_order_id, Quantity quantity, Price price, Price trigger_price, uint64_t ts_event, bool venue_order_id_modified=False) → void

Generate an OrderUpdated event and send it to the ExecutionEngine.

  • Parameters:
    • strategy_id (StrategyId) – The strategy ID associated with the event.
    • instrument_id (InstrumentId) – The instrument ID.
    • client_order_id (ClientOrderId) – The client order ID.
    • venue_order_id (VenueOrderId) – The venue order ID (assigned by the venue).
    • quantity (Quantity) – The orders current quantity.
    • price (Price) – The orders current price.
    • trigger_price (Price or None) – The orders current trigger price.
    • ts_event (uint64_t) – UNIX timestamp (nanoseconds) when the order update event occurred.
    • venue_order_id_modified (bool) – If the ID was modified for this event.

get_account(self) → Account

Return the account for the client (if registered).

  • Return type: Account or None

id

The components ID.

  • Returns: ComponentId

is_connected

If the client is connected.

  • Returns: bool

is_degraded

bool

Return whether the current component state is DEGRADED.

  • Return type: bool
  • Type: Component.is_degraded

is_disposed

bool

Return whether the current component state is DISPOSED.

  • Return type: bool
  • Type: Component.is_disposed

is_faulted

bool

Return whether the current component state is FAULTED.

  • Return type: bool
  • Type: Component.is_faulted

is_initialized

bool

Return whether the component has been initialized (component.state >= INITIALIZED).

  • Return type: bool
  • Type: Component.is_initialized

is_running

bool

Return whether the current component state is RUNNING.

  • Return type: bool
  • Type: Component.is_running

is_stopped

bool

Return whether the current component state is STOPPED.

  • Return type: bool
  • Type: Component.is_stopped

oms_type

The venues order management system type.

  • Returns: OmsType

reset(self) → void

Reset the component.

All stateful fields are reset to their initial value.

While executing on_reset() any exception will be logged and reraised, then the component will remain in a RESETTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

resume(self) → void

Resume the component.

While executing on_resume() any exception will be logged and reraised, then the component will remain in a RESUMING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

shutdown_system(self, unicode reason=None) → void

Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.

The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.

  • Parameters: reason (str , optional) – The reason for issuing the shutdown command.

start(self) → void

Start the component.

While executing on_start() any exception will be logged and reraised, then the component will remain in a STARTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

state

ComponentState

Return the components current state.

  • Return type: ComponentState
  • Type: Component.state

stop(self) → void

Stop the component.

While executing on_stop() any exception will be logged and reraised, then the component will remain in a STOPPING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

trader_id

The trader ID associated with the component.

  • Returns: TraderId

type

The components type.

  • Returns: type

venue

The clients venue ID (if not a routing client).

  • Returns: Venue or None

class LiveExecutionEngine

Bases: ExecutionEngine

Provides a high-performance asynchronous live execution engine.

  • Parameters:
    • loop (asyncio.AbstractEventLoop) – The event loop for the engine.
    • msgbus (MessageBus) – The message bus for the engine.
    • cache (Cache) – The cache for the engine.
    • clock (LiveClock) – The clock for the engine.
    • config (LiveExecEngineConfig , optional) – The configuration for the instance.
  • Raises: TypeError – If config is not of type LiveExecEngineConfig.

property reconciliation : bool

Return whether the reconciliation process will be run on start.

  • Return type: bool

connect() → None

Connect the engine by calling connect on all registered clients.

disconnect() → None

Disconnect the engine by calling disconnect on all registered clients.

get_cmd_queue_task() → Task | None

Return the internal command queue task for the engine.

  • Return type: asyncio.Task or None

get_evt_queue_task() → Task | None

Return the internal event queue task for the engine.

  • Return type: asyncio.Task or None

get_inflight_check_task() → Task | None

Return the internal in-flight check task for the engine.

  • Return type: asyncio.Task or None

get_open_check_task() → Task | None

Return the open check task for the engine.

  • Return type: asyncio.Task or None

cmd_qsize() → int

Return the number of Command messages buffered on the internal queue.

  • Return type: int

evt_qsize() → int

Return the number of Event messages buffered on the internal queue.

  • Return type: int

kill() → None

Kill the engine by abruptly canceling the queue task and calling stop.

execute(command: TradingCommand) → None

Execute the given command.

If the internal queue is already full then will log a warning and block until queue size reduces.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

process(event: OrderEvent) → None

Process the given event.

If the internal queue is already full then will log a warning and block until queue size reduces.

  • Parameters: event (OrderEvent) – The event to process.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

async reconcile_state(timeout_secs: float = 10.0) → bool

Reconcile the internal execution state with all execution clients (external state).

  • Parameters: timeout_secs (double , default 10.0) – The timeout (seconds) for reconciliation to complete.
  • Returns: True if states reconcile within timeout, else False.
  • Return type: bool
  • Raises: ValueError – If timeout_secs is not positive (> 0).

reconcile_report(report: ExecutionReport) → bool

Reconcile the given execution report.

  • Parameters: report (ExecutionReport) – The execution report to check.
  • Returns: True if reconciliation successful, else False.
  • Return type: bool

reconcile_mass_status(report: ExecutionMassStatus) → None

Reconcile the given execution mass status report.

check_connected(self) → bool

Check all of the engines clients are connected.

  • Returns: True if all clients connected, else False.
  • Return type: bool

check_disconnected(self) → bool

Check all of the engines clients are disconnected.

  • Returns: True if all clients disconnected, else False.
  • Return type: bool

check_integrity(self) → bool

Check integrity of data within the cache and clients.

  • Returns: True if checks pass, else False.
  • Return type: bool

check_residuals(self) → bool

Check for any residual open state and log warnings if found.

‘Open state’ is considered to be open orders and open positions.

  • Returns: True if residuals exist, else False.
  • Return type: bool

command_count

The total count of commands received by the engine.

  • Returns: int

debug

If debug mode is active (will provide extra debug logging).

  • Returns: bool

default_client

ClientId | None

Return the default execution client registered with the engine.

  • Return type: ClientId or None
  • Type: ExecutionEngine.default_client

degrade(self) → void

Degrade the component.

While executing on_degrade() any exception will be logged and reraised, then the component will remain in a DEGRADING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

deregister_client(self, ExecutionClient client) → void

Deregister the given execution client from the execution engine.

  • Parameters: client (ExecutionClient) – The execution client to deregister.
  • Raises: ValueError – If client is not registered with the execution engine.

dispose(self) → void

Dispose of the component.

While executing on_dispose() any exception will be logged and reraised, then the component will remain in a DISPOSING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

event_count

The total count of events received by the engine.

  • Returns: int

fault(self) → void

Fault the component.

Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.

While executing on_fault() any exception will be logged and reraised, then the component will remain in a FAULTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

flush_db(self) → void

Flush the execution database which permanently removes all persisted data.

WARNING

Permanent data loss.

classmethod fully_qualified_name(cls) → str

Return the fully qualified name for the components class.

  • Return type: str

get_external_order_claim(self, InstrumentId instrument_id) → StrategyId

Get any external order claim for the given instrument ID.

  • Parameters: instrument_id (InstrumentId) – The instrument ID for the claim.
  • Return type: StrategyId or None

get_external_order_claims_instruments(self) → set

Get all external order claims instrument IDs.

id

The components ID.

  • Returns: ComponentId

is_degraded

bool

Return whether the current component state is DEGRADED.

  • Return type: bool
  • Type: Component.is_degraded

is_disposed

bool

Return whether the current component state is DISPOSED.

  • Return type: bool
  • Type: Component.is_disposed

is_faulted

bool

Return whether the current component state is FAULTED.

  • Return type: bool
  • Type: Component.is_faulted

is_initialized

bool

Return whether the component has been initialized (component.state >= INITIALIZED).

  • Return type: bool
  • Type: Component.is_initialized

is_running

bool

Return whether the current component state is RUNNING.

  • Return type: bool
  • Type: Component.is_running

is_stopped

bool

Return whether the current component state is STOPPED.

  • Return type: bool
  • Type: Component.is_stopped

load_cache(self) → void

Load the cache up from the execution database.

position_id_count(self, StrategyId strategy_id) → int

The position ID count for the given strategy ID.

  • Parameters: strategy_id (StrategyId) – The strategy ID for the position count.
  • Return type: int

register_client(self, ExecutionClient client) → void

Register the given execution client with the execution engine.

If the client.venue is None and a default routing client has not been previously registered then will be registered as such.

  • Parameters: client (ExecutionClient) – The execution client to register.
  • Raises: ValueError – If client is already registered with the execution engine.

register_default_client(self, ExecutionClient client) → void

Register the given client as the default routing client (when a specific venue routing cannot be found).

Any existing default routing client will be overwritten.

register_external_order_claims(self, Strategy strategy) → void

Register the given strategies external order claim instrument IDs (if any)

  • Parameters: strategy (Strategy) – The strategy for the registration.
  • Raises: InvalidConfiguration – If a strategy is already registered to claim external orders for an instrument ID.

register_oms_type(self, Strategy strategy) → void

Register the given trading strategies OMS (Order Management System) type.

  • Parameters: strategy (Strategy) – The strategy for the registration.

register_venue_routing(self, ExecutionClient client, Venue venue) → void

Register the given client to route orders to the given venue.

Any existing client in the routing map for the given venue will be overwritten.

  • Parameters:
    • venue (Venue) – The venue to route orders to.
    • client (ExecutionClient) – The client for the venue routing.

registered_clients

list[ClientId]

Return the execution clients registered with the engine.

  • Return type: list[ClientId]
  • Type: ExecutionEngine.registered_clients

report_count

‘int’ The total count of reports received by the engine.

  • Returns: int
  • Type: report_count

reset(self) → void

Reset the component.

All stateful fields are reset to their initial value.

While executing on_reset() any exception will be logged and reraised, then the component will remain in a RESETTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

resume(self) → void

Resume the component.

While executing on_resume() any exception will be logged and reraised, then the component will remain in a RESUMING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

shutdown_system(self, unicode reason=None) → void

Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.

The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.

  • Parameters: reason (str , optional) – The reason for issuing the shutdown command.

snapshot_orders

If order state snapshots should be persisted.

  • Returns: bool

snapshot_positions

If position state snapshots should be persisted.

  • Returns: bool

snapshot_positions_interval_secs

The interval (seconds) at which additional position state snapshots are persisted.

  • Returns: double

snapshot_positions_timer_name

start(self) → void

Start the component.

While executing on_start() any exception will be logged and reraised, then the component will remain in a STARTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

state

ComponentState

Return the components current state.

  • Return type: ComponentState
  • Type: Component.state

stop(self) → void

Stop the component.

While executing on_stop() any exception will be logged and reraised, then the component will remain in a STOPPING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

stop_clients(self) → void

Stop the registered clients.

trader_id

The trader ID associated with the component.

  • Returns: TraderId

type

The components type.

  • Returns: type

class LiveRiskEngine

Bases: RiskEngine

Provides a high-performance asynchronous live risk engine.

  • Parameters:
    • loop (asyncio.AbstractEventLoop) – The event loop for the engine.
    • portfolio (PortfolioFacade) – The portfolio for the engine.
    • msgbus (MessageBus) – The message bus for the engine.
    • cache (CacheFacade) – The read-only cache for the engine.
    • clock (LiveClock) – The clock for the engine.
    • config (LiveRiskEngineConfig) – The configuration for the instance.
  • Raises: TypeError – If config is not of type LiveRiskEngineConfig.

get_cmd_queue_task() → Task | None

Return the internal command queue task for the engine.

  • Return type: asyncio.Task or None

get_evt_queue_task() → Task | None

Return the internal event queue task for the engine.

  • Return type: asyncio.Task or None

cmd_qsize() → int

Return the number of Command messages buffered on the internal queue.

  • Return type: int

evt_qsize() → int

Return the number of Event messages buffered on the internal queue.

  • Return type: int

kill() → None

Kill the engine by abruptly canceling the queue task and calling stop.

execute(command: Command) → None

Execute the given command.

If the internal queue is already full then will log a warning and block until queue size reduces.

  • Parameters: command (Command) – The command to execute.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

process(event: Event) → None

Process the given event.

If the internal queue is already full then will log a warning and block until queue size reduces.

  • Parameters: event (Event) – The event to process.

WARNING

This method is not thread-safe and should only be called from the same thread the event loop is running on. Calling it from a different thread may lead to unexpected behavior.

command_count

The total count of commands received by the engine.

  • Returns: int

debug

If debug mode is active (will provide extra debug logging).

  • Returns: bool

degrade(self) → void

Degrade the component.

While executing on_degrade() any exception will be logged and reraised, then the component will remain in a DEGRADING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

dispose(self) → void

Dispose of the component.

While executing on_dispose() any exception will be logged and reraised, then the component will remain in a DISPOSING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

event_count

The total count of events received by the engine.

  • Returns: int

fault(self) → void

Fault the component.

Calling this method multiple times has the same effect as calling it once (it is idempotent). Once called, it cannot be reversed, and no other methods should be called on this instance.

While executing on_fault() any exception will be logged and reraised, then the component will remain in a FAULTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

classmethod fully_qualified_name(cls) → str

Return the fully qualified name for the components class.

  • Return type: str

id

The components ID.

  • Returns: ComponentId

is_bypassed

If the risk engine is completely bypassed.

  • Returns: bool

is_degraded

bool

Return whether the current component state is DEGRADED.

  • Return type: bool
  • Type: Component.is_degraded

is_disposed

bool

Return whether the current component state is DISPOSED.

  • Return type: bool
  • Type: Component.is_disposed

is_faulted

bool

Return whether the current component state is FAULTED.

  • Return type: bool
  • Type: Component.is_faulted

is_initialized

bool

Return whether the component has been initialized (component.state >= INITIALIZED).

  • Return type: bool
  • Type: Component.is_initialized

is_running

bool

Return whether the current component state is RUNNING.

  • Return type: bool
  • Type: Component.is_running

is_stopped

bool

Return whether the current component state is STOPPED.

  • Return type: bool
  • Type: Component.is_stopped

max_notional_per_order(self, InstrumentId instrument_id)

Return the current maximum notional per order for the given instrument ID.

  • Return type: Decimal or None

max_notionals_per_order(self) → dict

Return the current maximum notionals per order settings.

max_order_modify_rate(self) → tuple

Return the current maximum order modify rate limit setting.

  • Returns: The limit per timedelta interval.
  • Return type: (int, timedelta)

max_order_submit_rate(self) → tuple

Return the current maximum order submit rate limit setting.

  • Returns: The limit per timedelta interval.
  • Return type: (int, timedelta)

reset(self) → void

Reset the component.

All stateful fields are reset to their initial value.

While executing on_reset() any exception will be logged and reraised, then the component will remain in a RESETTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

resume(self) → void

Resume the component.

While executing on_resume() any exception will be logged and reraised, then the component will remain in a RESUMING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

set_max_notional_per_order(self, InstrumentId instrument_id, new_value) → void

Set the maximum notional value per order for the given instrument ID.

Passing a new_value of None will disable the pre-trade risk max notional check.

  • Parameters:
    • instrument_id (InstrumentId) – The instrument ID for the max notional.
    • new_value (integer , float , string or Decimal) – The max notional value to set.
  • Raises:
    • decimal.InvalidOperation – If new_value not a valid input for decimal.Decimal.
    • ValueError – If new_value is not None and not positive.

set_trading_state(self, TradingState state) → void

Set the trading state for the engine.

  • Parameters: state (TradingState) – The state to set.

shutdown_system(self, unicode reason=None) → void

Initiate a system-wide shutdown by generating and publishing a ShutdownSystem command.

The command is handled by the system’s NautilusKernel, which will invoke either stop (synchronously) or stop_async (asynchronously) depending on the execution context and the presence of an active event loop.

  • Parameters: reason (str , optional) – The reason for issuing the shutdown command.

start(self) → void

Start the component.

While executing on_start() any exception will be logged and reraised, then the component will remain in a STARTING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

state

ComponentState

Return the components current state.

  • Return type: ComponentState
  • Type: Component.state

stop(self) → void

Stop the component.

While executing on_stop() any exception will be logged and reraised, then the component will remain in a STOPPING state.

WARNING

Do not override.

If the component is not in a valid state from which to execute this method, then the component state will not change, and an error will be logged.

trader_id

The trader ID associated with the component.

  • Returns: TraderId

trading_state

The current trading state for the engine.

  • Returns: TradingState

type

The components type.

  • Returns: type

class TradingNode

Bases: object

Provides an asynchronous network node for live trading.

  • Parameters:
    • config (TradingNodeConfig , optional) – The configuration for the instance.
    • loop (asyncio.AbstractEventLoop , optional) – The event loop for the node. If None then will get the running event loop internally.

property trader_id : TraderId

Return the nodes trader ID.

property machine_id : str

Return the nodes machine ID.

  • Return type: str

property instance_id : UUID4

Return the nodes instance ID.

property trader : Trader

Return the nodes internal trader.

property cache : CacheFacade

Return the nodes internal read-only cache.

property portfolio : PortfolioFacade

Return the nodes internal read-only portfolio.

property is_running : bool

Return whether the trading node is running.

  • Return type: bool

property is_built : bool

Return whether the trading node clients are built.

  • Return type: bool

get_event_loop() → AbstractEventLoop | None

Return the event loop of the trading node.

  • Return type: asyncio.AbstractEventLoop or None

get_logger() → Logger

Return the logger for the trading node.

add_data_client_factory(name: str, factory: type[LiveDataClientFactory]) → None

Add the given data client factory to the node.

  • Parameters:
    • name (str) – The name of the client factory.
    • factory (type *[*LiveDataClientFactory ]) – The factory class to add.
  • Raises:
    • ValueError – If name is not a valid string.
    • KeyError – If name has already been added.

add_exec_client_factory(name: str, factory: type[LiveExecClientFactory]) → None

Add the given execution client factory to the node.

  • Parameters:
    • name (str) – The name of the client factory.
    • factory (type *[*LiveExecutionClientFactory ]) – The factory class to add.
  • Raises:
    • ValueError – If name is not a valid string.
    • KeyError – If name has already been added.

build() → None

Build the nodes clients.

run(raise_exception=False) → None

Start and run the trading node.

publish_bus_message(bus_msg: BusMessage) → None

Publish bus message on the internal message bus.

Note the message will not be published externally.

  • Parameters: bus_msg (nautilus_pyo3.BusMessage) – The bus message to publish.

async run_async() → None

Start and run the trading node asynchronously.

stop() → None

Stop the trading node gracefully.

After a specified delay the internal Trader residual state will be checked.

If save strategy is configured, then strategy states will be saved.

async stop_async() → None

Stop the trading node gracefully, asynchronously.

After a specified delay the internal Trader residual state will be checked.

If save strategy is configured, then strategy states will be saved.

dispose() → None

Dispose of the trading node.

Gracefully shuts down the executor and event loop.

class TradingNodeBuilder

Bases: object

Provides building services for a trading node.

  • Parameters:
    • loop (asyncio.AbstractEventLoop) – The event loop for the clients.
    • data_engine (LiveDataEngine) – The data engine for the trading node.
    • exec_engine (LiveExecutionEngine) – The execution engine for the trading node.
    • portfolio (Portfolio) – The portfolio for the trading node.
    • msgbus (MessageBus) – The message bus for the trading node.
    • cache (Cache) – The cache for building clients.
    • clock (LiveClock) – The clock for building clients.
    • logger (Logger) – The logger for building clients.
    • log (Logger) – The trading nodes logger.

add_data_client_factory(name: str, factory: type[LiveDataClientFactory]) → None

Add the given data client factory to the builder.

  • Parameters:
    • name (str) – The name of the client.
    • factory (type *[*LiveDataClientFactory ]) – The factory to add.
  • Raises:
    • ValueError – If name is not a valid string.
    • KeyError – If name has already been added.

add_exec_client_factory(name: str, factory: type[LiveExecClientFactory]) → None

Add the given client factory to the builder.

  • Parameters:
    • name (str) – The name of the client.
    • factory (type *[*LiveExecClientFactory ]) – The factory to add.
  • Raises:
    • ValueError – If name is not a valid string.
    • KeyError – If name has already been added.

build_data_clients(config: dict[str, LiveDataClientConfig]) → None

Build the data clients with the given configuration.

build_exec_clients(config: dict[str, LiveExecClientConfig]) → None

Build the execution clients with the given configuration.