Adapters
Introduction
This developer guide provides specifications for how to build an integration adapter for the NautilusTrader platform.
Adapters connect to trading venues and data providers, translating their native APIs into the platform’s unified interface and normalized domain model.
Structure of an adapter
NautilusTrader adapters follow a layered architecture pattern with:
- Rust core for networking clients and performance-sensitive operations.
- Python layer for integrating Rust clients into the platform's data and execution engines.
Rust core (crates/adapters/your_adapter/)
The Rust layer handles:
- HTTP client: Raw API communication, request signing, rate limiting.
- WebSocket client: Low-latency streaming connections, message parsing.
- Parsing: Fast conversion of venue data to Nautilus domain models.
- Python bindings: PyO3 exports to make Rust functionality available to Python.
Typical Rust structure:
crates/adapters/your_adapter/
├── src/
│ ├── common/ # Shared types and utilities
│ │ ├── consts.rs # Venue constants / broker IDs
│ │ ├── credential.rs # API key storage and signing helpers
│ │ ├── enums.rs # Venue enums mirrored in REST/WS payloads
│ │ ├── error.rs # Adapter-level error aggregation (when applicable)
│ │ ├── models.rs # Shared model types
│ │ ├── parse.rs # Shared parsing helpers
│ │ ├── retry.rs # Retry classification (when applicable)
│ │ ├── urls.rs # Environment & product aware base-url resolvers
│ │ └── testing.rs # Fixtures reused across unit tests
│ ├── http/ # HTTP client implementation
│ │ ├── client.rs # HTTP client with authentication
│ │ ├── error.rs # HTTP-specific error types
│ │ ├── models.rs # Structs for REST payloads
│ │ ├── parse.rs # Response parsing functions
│ │ └── query.rs # Request and query builders
│ ├── websocket/ # WebSocket implementation
│ │ ├── client.rs # WebSocket client
│ │ ├── dispatch.rs # Execution event dispatch and order routing
│ │ ├── enums.rs # WebSocket-specific enums
│ │ ├── error.rs # WebSocket-specific error types
│ │ ├── handler.rs # Feed handler (I/O boundary)
│ │ ├── messages.rs # Frame and message enums
│ │ ├── parse.rs # Message parsing functions
│ │ └── subscription.rs # Subscription topic helpers (optional)
│ ├── python/ # PyO3 Python bindings
│ │ ├── enums.rs # Python-exposed enums
│ │ ├── http.rs # Python HTTP client bindings
│ │ ├── urls.rs # Python URL helpers
│ │ ├── websocket.rs # Python WebSocket client bindings
│ │ └── mod.rs # Module exports
│ ├── config.rs # Configuration structures
│ ├── data.rs # Data client implementation
│ ├── execution.rs # Execution client implementation
│ ├── factories.rs # Factory functions
│ └── lib.rs # Library entry point
├── tests/ # Integration tests with mock servers
│ ├── data_client.rs # Data client integration tests
│ ├── exec_client.rs # Execution client integration tests
│ ├── http.rs # HTTP client integration tests
│ └── websocket.rs # WebSocket client integration tests
└── test_data/ # Canonical venue payloadsPython layer (nautilus_trader/adapters/your_adapter)
The Python layer provides the integration interface through these components:
- Instrument Provider: Supplies instrument definitions via
InstrumentProvider. - Data Client: Handles market data feeds and historical data requests via
LiveDataClientandLiveMarketDataClient. - Execution Client: Manages order execution via
LiveExecutionClient. - Factories: Converts venue-specific data to Nautilus domain models.
- Configuration: User-facing configuration classes for client settings.
Typical Python structure:
nautilus_trader/adapters/your_adapter/
├── config.py # Configuration classes
├── constants.py # Adapter constants
├── data.py # LiveDataClient/LiveMarketDataClient
├── execution.py # LiveExecutionClient
├── factories.py # Instrument factories
├── providers.py # InstrumentProvider
└── __init__.py # Package initializationAdapter implementation sequence
Follow this dependency-driven order when building an adapter. Each phase builds on the previous one. Implement the Rust core before any Python layer.
Phase 1: Rust core infrastructure
Build the low-level networking and parsing foundation.
| Step | Component | Description |
|---|---|---|
| 1.1 | HTTP error types | Define HTTP‑specific error enum with retryable/non‑retryable variants (http/error.rs). |
| 1.2 | HTTP client | Implement credentials, request signing, rate limiting, and retry logic. |
| 1.3 | HTTP API models | Define request/response structs for REST endpoints (http/models.rs, http/query.rs). |
| 1.4 | HTTP parsing | Convert venue responses to Nautilus domain models (http/parse.rs, common/parse.rs). |
| 1.5 | WebSocket error types | Define WebSocket‑specific error enum (websocket/error.rs). |
| 1.6 | WebSocket client | Implement connection lifecycle, authentication, heartbeat, and reconnection. |
| 1.7 | WebSocket messages | Define streaming payload types (websocket/messages.rs). |
| 1.8 | WebSocket parsing | Convert stream messages to Nautilus domain models (websocket/parse.rs). |
| 1.9 | Python bindings | Expose Rust functionality via PyO3 (python/mod.rs). |
Milestone: Rust crate compiles, unit tests pass, HTTP/WebSocket clients can authenticate and stream/request raw data.
Phase 2: Instrument definitions
Instruments are the foundation: both data and execution clients depend on them.
| Step | Component | Description |
|---|---|---|
| 2.1 | Instrument parsing | Parse venue instrument definitions into Nautilus types (spot, perpetual, future, option). |
| 2.2 | Instrument provider | Implement InstrumentProvider to load, filter, and cache instruments. |
| 2.3 | Symbol mapping | Handle venue‑specific symbol formats and Nautilus InstrumentId conversion. |
Milestone: InstrumentProvider.load_all_async() returns valid Nautilus instruments.
Phase 3: Market data
Build data subscriptions and historical data requests.
| Step | Component | Description |
|---|---|---|
| 3.1 | Public WebSocket streams | Subscribe to order books, trades, tickers, and other public channels. |
| 3.2 | Historical data requests | Fetch historical bars, trades, and order book snapshots via HTTP. |
| 3.3 | Data client (Python) | Implement LiveDataClient or LiveMarketDataClient wiring Rust clients to the data engine. |
Milestone: Data client connects, subscribes to instruments, and emits market data to the platform.
Phase 4: Order execution
Build order management and account state.
| Step | Component | Description |
|---|---|---|
| 4.1 | Private WebSocket streams | Subscribe to order updates, fills, positions, and account balance changes. |
| 4.2 | Basic order submission | Implement market and limit orders via HTTP or WebSocket. |
| 4.3 | Order modification/cancel | Implement order amendment and cancellation. |
| 4.4 | Execution client (Python) | Implement LiveExecutionClient wiring Rust clients to the execution engine. |
| 4.5 | Execution reconciliation | Generate order, fill, and position status reports for startup reconciliation. |
Milestone: Execution client submits orders, receives fills, and reconciles state on connect.
Phase 5: Advanced features
Extend coverage based on venue capabilities.
| Step | Component | Description |
|---|---|---|
| 5.1 | Advanced order types | Conditional orders, stop‑loss, take‑profit, trailing stops, iceberg, etc. |
| 5.2 | Batch operations | Batch order submission, batch cancellation, mass cancel. |
| 5.3 | Venue‑specific features | Options chains, funding rates, liquidations, or other venue‑specific data. |
Phase 6: Configuration and factories
Wire everything together for production usage.
| Step | Component | Description |
|---|---|---|
| 6.1 | Configuration classes | Create LiveDataClientConfig and LiveExecClientConfig subclasses. |
| 6.2 | Factory functions | Implement factory functions to instantiate clients from configuration. |
| 6.3 | Environment variables | Support credential resolution from environment variables. |
Phase 7: Testing and documentation
Validate the integration and document usage.
| Step | Component | Description |
|---|---|---|
| 7.1 | Rust unit tests | Test parsers, signing helpers, and business logic in #[cfg(test)] blocks. |
| 7.2 | Rust integration tests | Test HTTP/WebSocket clients against mock Axum servers in tests/. |
| 7.3 | Python integration tests | Test data/execution clients in tests/integration_tests/adapters/<adapter>/. |
| 7.4 | Example scripts | Provide runnable examples demonstrating data subscription and order execution. |
See the Testing section for detailed test organization guidelines.
Rust adapter patterns
Common code (common/)
Group venue constants, credential helpers, enums, and reusable parsers under src/common.
Adapters such as OKX keep submodules like consts, credential, enums, and urls alongside a testing module
for fixtures, providing a single place for cross-cutting pieces.
When an adapter has multiple environments or product categories, add a dedicated common::urls helper so
REST/WebSocket base URLs stay in sync with the Python layer.
Symbol normalization (common/symbol.rs)
When a venue uses a different symbol format than Nautilus InstrumentId, place bidirectional
conversion helpers in common/symbol.rs. Two functions form the standard interface:
format_instrument_id(venue_symbol, product_type)converts a venue symbol string to a NautilusInstrumentId, appending or transforming product-type suffixes as needed (e.g.,"BTCUSDT"+Linearbecomes"BTCUSDT-LINEAR.BYBIT").format_venue_symbol(instrument_id)strips Nautilus suffixes to recover the venue-native symbol for API calls.
Common patterns across adapters:
- Suffix-based product types: Bybit appends
-SPOT,-LINEAR,-INVERSE,-OPTION. ABybitSymbolwrapper validates the suffix and normalizes to uppercase on construction. - Implicit product mapping: Binance USD-M futures append
-PERPat the Nautilus layer while COIN-M keeps the venue's existing_PERPsuffix. - Case normalization: Convert to uppercase on input when venues are case-insensitive.
Ustrinterning: Store normalized symbols asUstrfor zero-cost comparison.
For venues where the raw symbol maps 1
to anInstrumentId (no suffix gymnastics), inline
helpers in common/parse.rs are sufficient and a dedicated symbol.rs is not needed.
URL resolution
Define URL constants and resolution functions in common/urls.rs:
const VENUE_WS_URL: &str = "wss://stream.venue.com/ws";
const VENUE_TESTNET_WS_URL: &str = "wss://testnet-stream.venue.com/ws";
pub const fn get_ws_base_url(testnet: bool) -> &'static str {
if testnet { VENUE_TESTNET_WS_URL } else { VENUE_WS_URL }
}Config structs should provide override fields (base_url_http, base_url_ws, etc.) that fall back
to these defaults when unset.
Configurations (config.rs)
Expose typed config structs in src/config.rs so Python callers toggle venue-specific behaviour
(see how OKX wires demo URLs, retries, and channel flags).
Keep defaults minimal and delegate URL selection to helpers in common::urls.
All config structs (data and execution) must implement Default. This enables the
..Default::default() pattern in examples and tests, keeping only the fields that differ
from defaults visible:
let exec_config = VenueExecClientConfig {
trader_id,
account_id,
environment: VenueEnvironment::Testnet,
..Default::default()
};Default values should use sensible production defaults: credentials as None (resolved
from environment at runtime), mainnet URLs, standard timeouts. For trader_id and
account_id, use placeholder values like TraderId::from("TRADER-001") and
AccountId::from("VENUE-001").
Error taxonomy (common/error.rs)
For adapters with multiple client types, define an adapter-level error enum in common/error.rs that
aggregates component errors:
#[derive(Debug, thiserror::Error)]
pub enum VenueError {
#[error("HTTP error: {0}")]
Http(#[from] VenueHttpError),
#[error("WebSocket error: {0}")]
WebSocket(#[from] VenueWsError),
#[error("Build error: {0}")]
Build(#[from] VenueBuildError),
}This enables unified error handling at the adapter boundary while preserving component-specific error details for debugging.
Retry classification (common/retry.rs)
When an adapter needs sophisticated retry logic, define a retry classification module in common/retry.rs
that distinguishes between retryable, non-retryable, and fatal errors:
#[derive(Debug, thiserror::Error)]
pub enum VenueError {
#[error("Retryable error: {source}")]
Retryable {
#[source]
source: VenueRetryableError,
retry_after: Option<Duration>,
},
#[error("Non-retryable error: {source}")]
NonRetryable {
#[source]
source: VenueNonRetryableError,
},
#[error("Fatal error: {source}")]
Fatal {
#[source]
source: VenueFatalError,
},
}Include helper methods like from_http_status(), from_rate_limit_headers(), is_retryable(),
is_fatal(), and retry_after() to enable consistent error classification across the adapter.
See BitMEX and Bybit adapters for reference implementations.
Python exports (python/mod.rs)
Mirror the Rust surface area through PyO3 modules by re-exporting clients, enums, and helper functions.
When new functionality lands in Rust, add it to python/mod.rs so the Python layer stays in sync
(the OKX adapter is a good reference).
Python bindings (python/)
Expose Rust functionality to Python through PyO3.
Mark venue-specific structs that need Python access with #[pyclass] and implement #[pymethods] blocks with
#[getter] attributes for field access.
For async methods in the HTTP client, use pyo3_async_runtimes::tokio::future_into_py to convert Rust futures
into Python awaitables.
When returning lists of custom types, map each item with Py::new(py, item) before constructing the Python list.
Register all exported classes and enums in python/mod.rs using m.add_class::<YourType>() so they're available
to Python code.
Follow the pattern established in other adapters: prefixing Python-facing methods with py_* in Rust while using
#[pyo3(name = "method_name")] to expose them without the prefix.
When delivering instruments from WebSocket to Python, use instrument_any_to_pyobject() which returns PyO3 types
for caching.
For the reverse direction (Python→Rust), use pyobject_to_instrument_any() in cache_instrument() methods.
Never call .into_py_any() directly on InstrumentAny as it doesn't implement the required trait.
Type qualification
Adapter-specific types (enums, structs) and Nautilus domain types should not be fully qualified.
Import them at the module level and use short names (e.g., OKXContractType instead of
crate::common::enums::OKXContractType, InstrumentId instead of nautilus_model::identifiers::InstrumentId).
This keeps code concise and readable.
Only fully qualify types from anyhow and tokio to avoid ambiguity with similarly-named types from other crates.
String interning
Use ustr::Ustr for any non-unique strings the platform stores repeatedly (venues, symbols, instrument IDs) to
minimise allocations and comparisons.
Instrument cache standardization
All clients that cache instruments must implement three methods with standardized names: cache_instruments()
(plural, bulk replace), cache_instrument() (singular, upsert), and get_instrument() (retrieve by symbol).
WebSocket clients store instruments in Arc<DashMap<Ustr, InstrumentAny>> on the outer client for
thread-safe access across clones.
Testing helpers (common/testing.rs)
Store shared fixtures and payload loaders in src/common/testing.rs for use across HTTP and WebSocket unit tests.
This keeps #[cfg(test)] helpers out of production modules and encourages reuse.
Instrument status diffing (common/status.rs)
When a data client polls instrument status via REST, place the diff logic in common/status.rs
rather than inlining it in the data client. The standard function signature is:
pub fn diff_and_emit_statuses(
new_statuses: &AHashMap<InstrumentId, MarketStatusAction>,
cached_statuses: &mut AHashMap<InstrumentId, MarketStatusAction>,
subscriptions: Option<&DashSet<InstrumentId>>,
sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
ts_event: UnixNanos,
ts_init: UnixNanos,
)The function compares each entry in new_statuses against cached_statuses, emitting an
InstrumentStatus event for any instrument whose MarketStatusAction changed. Instruments
present in the cache but absent from the new snapshot are treated as removed and emit
NotAvailableForTrading. The cache always reflects the full API state.
Pass subscriptions as Some(&set) to restrict emissions to subscribed instruments, or
None to emit all changes unconditionally. The data client stores the cache in an
Arc<RwLock<AHashMap<InstrumentId, MarketStatusAction>>> and calls this function on each
poll cycle.
Factory module (factories.rs)
Complex adapters may define a factories.rs module for converting venue data to Nautilus types.
This centralizes transformation logic that would otherwise be scattered across HTTP and WebSocket
parsers:
// factories.rs
pub fn create_instrument(
venue_instrument: &VenueInstrument,
ts_init: UnixNanos,
) -> anyhow::Result<InstrumentAny> {
match venue_instrument.instrument_type {
InstrumentType::Perpetual => parse_perpetual(venue_instrument, ts_init),
InstrumentType::Future => parse_future(venue_instrument, ts_init),
InstrumentType::Option => parse_option(venue_instrument, ts_init),
}
}Use this pattern when the same venue data structures are parsed in multiple places (HTTP responses, WebSocket updates, historical data).
Connection lifecycle (connect)
Both data and execution clients follow a strict initialization order during connect() to prevent
race conditions with reconciliation and strategy startup. The platform waits for all clients to
signal connected before running reconciliation or starting strategies, so all initialization must
complete within connect().
Data event emission
Data clients emit events to the platform through an unbounded channel obtained at construction:
let data_sender = get_data_event_sender();The DataEvent enum carries all data types the client produces:
| Variant | Usage |
|---|---|
DataEvent::Instrument | Instrument definitions during bootstrap and updates. |
DataEvent::InstrumentStatus | Market status changes from polling or WS streams. |
DataEvent::Data | Market data (trades, quotes, book deltas, bars). |
DataEvent::Response | Responses to historical data requests. |
DataEvent::FundingRate | Funding rate updates for derivatives. |
Send events with self.data_sender.send(DataEvent::Instrument(instrument)). Log warnings
on send failure but do not propagate the error since a closed receiver means the system
is shutting down. Clone the sender for spawned tasks that emit data from async work.
Data client
- Fetch instruments via REST - call
bootstrap_instruments()or equivalent. - Cache locally - populate the client's internal instrument map and HTTP client cache.
- Emit to data engine - send each instrument as
DataEvent::Instrumentviadata_sender. These events are queued during startup and processed before reconciliation runs. - Cache to WebSocket - call
ws.cache_instruments()so the handler can parse messages. - Connect WebSocket - establish the streaming connection.
async fn connect(&mut self) -> anyhow::Result<()> {
let instruments = self.bootstrap_instruments().await?;
ws.cache_instruments(instruments);
ws.connect().await?;
ws.wait_until_active(10.0).await?;
// ...
}Execution client
- Initialize instruments - call
ensure_instruments_initialized_async()which checksself.core.instruments_initialized()and returns early if instruments are already cached. Otherwise it fetches instruments via REST and caches them to the HTTP client, WebSocket client, and any broadcaster clients. - Connect WebSocket - establish the private streaming connection.
- Subscribe to channels - orders, executions, positions, wallet/margin.
- Start WebSocket stream handler - begin processing incoming messages.
- Fetch account state - call
refresh_account_state()which requests balances and margins via REST, builds anAccountState, and emits it through theExecutionEventEmitter. - Await account registered - call
await_account_registered(timeout_secs)which pollsself.core.cache().account(&account_id)at 10ms intervals until the account appears or the timeout expires. This step blocks connect so the portfolio can process orders during reconciliation. - Signal connected - call
self.core.set_connected().
async fn connect(&mut self) -> anyhow::Result<()> {
self.ensure_instruments_initialized_async().await?;
self.ws_client.connect().await?;
self.ws_client.wait_until_active(10.0).await?;
// ... subscribe channels, start stream ...
self.refresh_account_state().await?;
self.await_account_registered(30.0).await?;
self.core.set_connected();
Ok(())
}Account state emission
The ExecutionEventEmitter provides two methods for emitting account state:
emit_account_state(balances, margins, reported, ts_event)builds anAccountStatefrom raw parameters using the internalOrderEventFactory, then dispatches it. Use this when the adapter has individual balance and margin values to combine.send_account_state(state)dispatches a pre-builtAccountState. Use this when the adapter already has a fully constructed state from parsing an HTTP or WebSocket payload.
HTTP client patterns
Adapters use a two-layer HTTP client architecture: a raw client for low-level API operations and a domain client for high-level logic. The split also enables efficient cloning for Python bindings.
Client structure
The architecture consists of two complementary clients:
- Raw client (
MyRawHttpClient) - Low-level API methods matching venue endpoints. - Domain client (
MyHttpClient) - High-level methods using Nautilus domain types.
use std::sync::Arc;
use nautilus_network::http::HttpClient;
// Raw HTTP client - low-level API methods matching venue endpoints
pub struct MyRawHttpClient {
base_url: String,
client: HttpClient, // Use nautilus_network::http::HttpClient, not reqwest directly
credential: Option<Credential>,
retry_manager: RetryManager<MyHttpError>,
cancellation_token: CancellationToken,
}
// Domain HTTP client - wraps raw client with Arc, provides high-level API
pub struct MyHttpClient {
pub(crate) inner: Arc<MyRawHttpClient>,
// Additional domain-specific state (e.g., instrument cache)
instruments: DashMap<InstrumentId, InstrumentAny>,
}Key points:
- Raw client (
MyRawHttpClient) contains low-level HTTP methods named to match venue endpoints (e.g.,get_instruments,get_balance,place_order). These methods take venue-specific query objects and return venue-specific response types. - Domain client (
MyHttpClient) wraps the raw client in anArcfor efficient cloning (required for Python bindings). It provides high-level methods that accept Nautilus domain types (e.g.,InstrumentId,ClientOrderId) and return domain objects. It may also cache instruments or other venue metadata. - Use
nautilus_network::http::HttpClientinstead ofreqwest::Clientdirectly for rate limiting, retry logic, and consistent error handling. - Both clients are exposed to Python, but the domain client is the primary interface.
Parser functions
Parser functions convert venue-specific data structures into Nautilus domain objects. Place them in
common/parse.rs for cross-cutting conversions (instruments, trades, bars) or http/parse.rs for
REST-specific transformations. Each parser takes venue data plus context (account IDs, timestamps,
instrument references) and returns a Nautilus domain type wrapped in Result.
Standard patterns:
- Handle string-to-numeric conversions with proper error context using
.parse::<f64>()andanyhow::Context. - Check for empty strings before parsing optional fields - venues often return
""instead of omitting fields. - Map venue enums to Nautilus enums explicitly with
matchstatements rather than implementing automatic conversions that could hide mapping errors. - Accept instrument references when precision or other metadata is required for constructing Nautilus types (quantities, prices).
- Use descriptive function names:
parse_position_status_report,parse_order_status_report,parse_trade_tick.
Place parsing helpers (parse_price_with_precision, parse_timestamp) in the same module as private functions when they're reused across multiple parsers.
Method naming and organization
The raw client mirrors venue endpoints with venue-specific parameter and response types. The domain client wraps it and exposes high-level methods that accept Nautilus domain types.
Naming conventions:
- Raw client methods: Named to match venue endpoints as closely as possible (e.g.,
get_instruments,get_balance,place_order). These methods are internal to the raw client and take venue-specific types (builders, JSON values). - Domain client methods: Named based on operation semantics (e.g.,
request_instruments,submit_order,cancel_order). These are the methods exposed to Python and take Nautilus domain objects (InstrumentId, ClientOrderId, OrderSide, etc.).
Domain method flow:
Domain methods follow a three-step pattern: build venue-specific parameters from Nautilus types, call the corresponding raw client method, then parse the response. For endpoints returning domain objects (positions, orders, trades), call parser functions from common/parse. For endpoints returning raw venue data (fee rates, balances), extract the result directly from the response envelope. Methods prefixed with request_* indicate they return domain data, while methods like submit_*, cancel_*, or modify_* perform actions and return acknowledgments.
The domain client wraps the raw client in an Arc for efficient cloning required by Python bindings.
Query parameter builders
Use the derive_builder crate with proper defaults and ergonomic Option handling:
use derive_builder::Builder;
#[derive(Clone, Debug, Deserialize, Serialize, Builder)]
#[serde(rename_all = "camelCase")]
#[builder(setter(into, strip_option), default)]
pub struct InstrumentsInfoParams {
pub category: ProductType,
#[serde(skip_serializing_if = "Option::is_none")]
pub symbol: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<u32>,
}
impl Default for InstrumentsInfoParams {
fn default() -> Self {
Self {
category: ProductType::Linear,
symbol: None,
limit: None,
}
}
}Key attributes:
#[builder(setter(into, strip_option), default)]- enables clean API:.symbol("BTCUSDT")instead of.symbol(Some("BTCUSDT".to_string())).#[serde(skip_serializing_if = "Option::is_none")]- omits optional fields from query strings.- Always implement
Defaultfor builder parameters.
Request signing and authentication
Keep signing logic in a Credential struct under common/credential.rs:
- Store API keys using
Ustrfor efficient comparison, secrets inBox<[u8]>with#[zeroize]. - Implement
sign()andsign_bytes()methods that compute HMAC-SHA256 signatures. - Pass the credential to the raw HTTP client; the domain client delegates signing through the inner client.
For WebSocket authentication, the handler constructs login messages using the same Credential::sign() method with a WebSocket-specific timestamp format.
Credential module structure
Each adapter's common/credential.rs must provide two things:
credential_env_vars()free function: returns environment variable names as a tuple.Credential::resolve()method: resolves credentials from config values or environment variables usingresolve_env_var_pairfromnautilus_core::env.
Config structs are DTOs and must not contain credential resolution logic. All resolution
belongs in credential.rs.
Standard layout:
use nautilus_core::env::resolve_env_var_pair;
/// Returns the environment variable names for API credentials.
pub fn credential_env_vars(is_testnet: bool) -> (&'static str, &'static str) {
if is_testnet {
("{VENUE}_TESTNET_API_KEY", "{VENUE}_TESTNET_API_SECRET")
} else {
("{VENUE}_API_KEY", "{VENUE}_API_SECRET")
}
}
impl Credential {
/// Resolves credentials from provided values or environment variables.
pub fn resolve(
api_key: Option<String>,
api_secret: Option<String>,
is_testnet: bool,
) -> Option<Self> {
let (key_var, secret_var) = credential_env_vars(is_testnet);
let (k, s) = resolve_env_var_pair(api_key, api_secret, key_var, secret_var)?;
Some(Self::new(k, s))
}
}Environment variable conventions
Adapters load API credentials from environment variables when not provided directly, avoiding hardcoded secrets.
Naming conventions:
| Environment | API Key Variable | API Secret Variable |
|---|---|---|
| Mainnet/Live | {VENUE}_API_KEY | {VENUE}_API_SECRET |
| Testnet | {VENUE}_TESTNET_API_KEY | {VENUE}_TESTNET_API_SECRET |
| Demo | {VENUE}_DEMO_API_KEY | {VENUE}_DEMO_API_SECRET |
Some venues require additional credentials:
- OKX:
OKX_API_PASSPHRASE
Key principles:
- Environment variable names must be centralized in
credential_env_vars(), never duplicated as string literals across files. - Environment variable resolution should happen in core Rust code, not Python bindings.
- Use
get_or_env_var_optfor optional credentials (returnsNoneif missing). - Use
get_or_env_varwhen credentials are required (returns error if missing). - Invalid credentials (e.g. malformed keys) must fail fast with an error, never silently degrade to unauthenticated mode.
Error handling and retry logic
Use the RetryManager from nautilus_network for consistent retry behavior.
Rate limiting
Configure rate limiting through HttpClient using LazyLock<Quota> static variables.
Naming conventions:
- REST quotas:
{VENUE}_REST_QUOTA(e.g.,OKX_REST_QUOTA,BYBIT_REST_QUOTA) - WebSocket quotas:
{VENUE}_WS_{OPERATION}_QUOTA(e.g.,OKX_WS_CONNECTION_QUOTA,OKX_WS_ORDER_QUOTA) - Rate limit keys:
{VENUE}_RATE_LIMIT_KEY_{OPERATION}(e.g.,OKX_RATE_LIMIT_KEY_SUBSCRIPTION,OKX_RATE_LIMIT_KEY_ORDER)
Standard rate limit keys for WebSocket:
| Key | Operations |
|---|---|
*_RATE_LIMIT_KEY_SUBSCRIPTION | Subscribe, unsubscribe, login. |
*_RATE_LIMIT_KEY_ORDER | Place orders (regular and algo). |
*_RATE_LIMIT_KEY_CANCEL | Cancel orders, mass cancel. |
*_RATE_LIMIT_KEY_AMEND | Amend/modify orders. |
Example:
pub static OKX_REST_QUOTA: LazyLock<Quota> =
LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
pub static OKX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
LazyLock::new(|| Quota::per_hour(NonZeroU32::new(480).unwrap()));
pub const OKX_RATE_LIMIT_KEY_ORDER: &str = "order";Pass rate limit keys when sending WebSocket messages to enforce per-operation quotas:
self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()])).awaitWebSocket client patterns
WebSocket clients handle real-time streaming data. They manage connection state, authentication, subscriptions, and reconnection logic.
Client structure
WebSocket adapters use a two-layer architecture to separate Python-accessible state from high-performance async I/O:
Connection state tracking
Track connection state using Arc<ArcSwap<AtomicU8>> to provide lock-free, race-free visibility across all clones:
use arc_swap::ArcSwap;
pub struct MyWebSocketClient {
connection_mode: Arc<ArcSwap<AtomicU8>>, // Shared connection mode (lock-free)
signal: Arc<AtomicBool>, // Cancellation signal for graceful shutdown
// ...
}Pattern breakdown:
- Outer
Arc: Shared across all clones (Python bindings clone clients before async operations). ArcSwap: Enables atomic pointer replacement via.store()without replacing the outer Arc.- Inner
Arc<AtomicU8>: The actual connection state fromWebSocketClient::connection_mode_atomic().
Initialize with a placeholder atomic (ConnectionMode::Closed), then in connect() call
.store(client.connection_mode_atomic()) to atomically swap to the real client's state.
All clones see updates instantly through lock-free .load() calls in is_active().
The underlying WebSocketClient sends a RECONNECTED sentinel message when reconnection completes, triggering resubscription logic in the handler.
Outer client ({Venue}WebSocketClient):
- Orchestrates connection lifecycle, authentication, subscriptions.
- Maintains state for Python access using
Arc<DashMap<K, V>>. - Tracks subscription state for reconnection logic.
- Stores instruments cache for replay on reconnect.
- Sends commands to handler via
cmd_txchannel. - Receives venue events via
out_rxchannel.
Inner handler ({Venue}WsFeedHandler):
- Runs in dedicated Tokio task as stateless I/O boundary.
- Owns
WebSocketClientexclusively (noRwLockneeded). - Processes commands from
cmd_rx→ serializes to JSON → sends via WebSocket. - Receives raw WebSocket messages → deserializes into
{Venue}WsFrame→ converts to{Venue}WsMessage→ emits viaout_tx. - Owns pending request state using
AHashMap<K, V>(single-threaded, no locking). - Uses
VecDeque<{Venue}WsMessage>to buffer multi-message yields from a single frame parse.
Some venues expose separate WebSocket endpoints for market data and order management
(different URLs, authentication flows, or message protocols). In this case, split into
two client+handler pairs under websocket/data/ and websocket/orders/ subdirectories,
each following the same two-layer pattern. Name them {Venue}MdWebSocketClient /
{Venue}MdWsFeedHandler and {Venue}OrdersWebSocketClient / {Venue}OrdersWsFeedHandler.
Communication pattern:
Key principles:
- No shared locks on hot path: Handler owns
WebSocketClient, client sends commands via lock-free mpsc channel. - Command pattern for all sends: Subscriptions, orders, cancellations all route through
HandlerCommandenum. - Event pattern for state: Handler emits
{Venue}WsMessageevents (includingAuthenticated), client maintains state from events. - Pending state ownership: Handler owns
AHashMapfor matching responses (noArc<DashMap>between layers). - Message buffering: Handler uses
VecDeque<{Venue}WsMessage>for frames that produce multiple output messages. Thenext()method drains the queue before polling channels. - Python constraint: Client uses
Arc<DashMap>only for state Python might query; handler usesAHashMapfor internal matching.
Authentication
Authentication state is managed through events:
- Handler processes
Loginresponse → returns{Venue}WsMessage::Authenticatedimmediately. - Client receives event → updates local auth state → proceeds with subscriptions.
AuthTracker(fromnautilus_network::websocket::auth) tracks auth state across threads.
The AuthTracker struct from nautilus_network provides thread-safe authentication state:
pub struct AuthTracker {
tx: Arc<Mutex<Option<AuthResultSender>>>,
authenticated: Arc<AtomicBool>,
}AuthTracker is internally Arc-based, so cloning shares state. Both client and handler
store auth_tracker: AuthTracker and receive a .clone() of the same instance. The tracker
exposes a four-method lifecycle: begin() starts an attempt and returns a one-shot receiver,
succeed() sets the authenticated flag and notifies the receiver, fail(message) clears
the flag with an error, and invalidate() clears the flag on disconnect. Downstream
consumers query is_authenticated() for lock-free reads via the internal AtomicBool.
Note: The Authenticated message is consumed in the client's spawn loop for reconnection
flow coordination and is not forwarded to downstream consumers (data/execution clients).
Downstream consumers can query authentication state via AuthTracker if needed. The execution
client's Authenticated handler only logs at debug level with no important logic depending
on this event.
Subscription management
Shared SubscriptionState pattern
The SubscriptionState struct from nautilus_network::websocket is shared between client and handler using Arc<DashMap<>> internally for thread-safe access:
SubscriptionStateis shared viaArc: Both client and handler receive.clone()of the same instance (shallow clone of Arc pointers).- Responsibility split: Client tracks user intent (
mark_subscribe,mark_unsubscribe), handler tracks server confirmations (confirm_subscribe,confirm_unsubscribe,mark_failure). - Why both need it: Single source of truth with lock-free concurrent access, no synchronization overhead.
Subscription lifecycle
A subscription represents any topic in one of two states:
| State | Description |
|---|---|
| Pending | Subscription request sent to venue, awaiting acknowledgment. |
| Confirmed | Venue acknowledged subscription and is actively streaming data. |
State transitions follow this lifecycle:
| Trigger | Method Called | From State | To State | Notes |
|---|---|---|---|---|
| User subscribes | mark_subscribe() | — | Pending | Topic added to pending set. |
| Venue confirms | confirm() | Pending | Confirmed | Moved from pending to confirmed. |
| Venue rejects | mark_failure() | Pending | Pending | Stays pending for retry on reconnect. |
| User unsubscribes | mark_unsubscribe() | Confirmed | Pending | Temporarily pending until ack. |
| Unsubscribe ack | clear_pending() | Pending | Removed | Topic fully removed. |
Key principles:
subscription_count()reports only confirmed subscriptions, not pending ones.- Failed subscriptions remain pending and are automatically retried on reconnect.
- Both confirmed and pending subscriptions are restored after reconnection.
- Unsubscribe operations must check the
opfield in acknowledgments to avoid re-confirming topics.
Topic format patterns
Adapters use venue-specific delimiters to structure subscription topics:
| Adapter | Delimiter | Example | Pattern |
|---|---|---|---|
| BitMEX | : | trade:XBTUSD | {channel}:{symbol} |
| OKX | : | trades:BTC-USDT-SWAP | {channel}:{symbol} |
| Bybit | . | orderbook.50.BTCUSDT | {channel}.{depth}.{symbol} |
Parse topics using split_once() with the appropriate delimiter to extract channel and symbol components.
Reconnection logic
On reconnection, restore authentication and subscriptions:
-
Track subscriptions: Preserve original subscription arguments in collections (e.g.,
Arc<DashMap>) to avoid parsing topics back to arguments. -
Reconnection flow:
- Receive
{Venue}WsMessage::Reconnectedfrom handler. - If authenticated: Re-authenticate and wait for confirmation.
- Restore all tracked subscriptions via handler commands.
- Receive
Preserving subscription arguments:
Store original subscription arguments in a separate collection to enable deterministic reconnection replay without parsing topics back into arguments:
pub struct MyWebSocketClient {
subscription_state: Arc<SubscriptionState>,
subscription_args: Arc<DashMap<String, SubscriptionArgs>>, // topic → original args
// ...
}
impl MyWebSocketClient {
async fn subscribe(&self, args: SubscriptionArgs) -> Result<(), Error> {
let topic = args.to_topic();
self.subscription_state.mark_subscribe(&topic);
self.subscription_args.insert(topic.clone(), args.clone());
self.send_cmd(HandlerCommand::Subscribe(args)).await
}
async fn unsubscribe(&self, topic: &str) -> Result<(), Error> {
self.subscription_state.mark_unsubscribe(topic);
self.subscription_args.remove(topic);
self.send_cmd(HandlerCommand::Unsubscribe(topic.to_string())).await
}
async fn restore_subscriptions(&self) {
for entry in self.subscription_args.iter() {
let _ = self.send_cmd(HandlerCommand::Subscribe(entry.value().clone())).await;
}
}
}This avoids complex topic parsing and ensures subscriptions are replayed exactly as originally requested.
Ping/Pong handling
Support both WebSocket control frame pings and application-level text pings:
- Control frame pings: Handled automatically by
WebSocketClientvia thePingHandlercallback. - Text pings: Some venues (e.g., OKX) use
"ping"/"pong"text messages. Configureheartbeat_msg: Some(TEXT_PING.to_string())inWebSocketConfigand respond to incomingTEXT_PINGwithTEXT_PONGin the handler.
The handler should check for ping messages early in the message processing loop and respond immediately to maintain connection health.
Disconnection lifecycle (close)
The close() method follows a three-step shutdown sequence: signal, command, await.
impl MyWebSocketClient {
pub async fn close(&mut self) -> Result<(), MyWsError> {
tracing::debug!("Starting close process");
// 1. Send disconnect command so handler can clean up gracefully
if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
tracing::warn!("Failed to send disconnect command to handler: {e}");
}
// 2. Set stop signal so handler loop exits after processing disconnect
self.signal.store(true, Ordering::Release);
// 3. Await task handle with timeout, abort if stuck
if let Some(task_handle) = self.task_handle.take() {
match Arc::try_unwrap(task_handle) {
Ok(handle) => {
let abort_handle = handle.abort_handle();
match tokio::time::timeout(Duration::from_secs(2), handle).await {
Ok(Ok(())) => tracing::debug!("Handler task completed"),
Ok(Err(e)) => tracing::error!("Handler task error: {e:?}"),
Err(_) => {
tracing::warn!("Timeout waiting for handler task, aborting");
abort_handle.abort();
}
}
}
Err(arc_handle) => {
tracing::debug!("Cannot unwrap task handle, aborting");
arc_handle.abort();
}
}
}
Ok(())
}
}Key points:
- Send
Disconnectbefore setting the stop signal so the handler processes it before exiting. - Return
Result<(), {Venue}WsError>so callers can handle failures. - Use
Ordering::Releaseon the signal store so the handler sees the write. - Extract
abort_handlebefore awaiting so it remains available after timeout. - When
Arc::try_unwrapfails (other clones exist), abort directly.
Stream consumption (stream)
The outer client exposes a stream() method that hands ownership of out_rx to the
caller as an async stream. Data and execution clients call this once to drive their
message processing loop:
impl MyWebSocketClient {
pub fn stream(&mut self) -> impl Stream<Item = MyWsMessage> + 'static {
let rx = self
.out_rx
.take()
.expect("Stream receiver already taken or not connected");
let mut rx = Arc::try_unwrap(rx)
.expect("Cannot take ownership - other references exist");
async_stream::stream! {
while let Some(msg) = rx.recv().await {
yield msg;
}
}
}
}The data/execution client consumes the stream in a tokio::select! loop with a
cancellation token or stop signal, matching on {Venue}WsMessage variants and calling
parse functions to produce Nautilus domain types.
Subscription topic helpers (subscription.rs)
When a venue's subscription topics have complex structure (multiple parameter types,
instrument type / family / ID variants, candle width encoding), extract topic building
and parsing into websocket/subscription.rs. This keeps client.rs focused on
connection lifecycle and handler.rs focused on I/O.
For venues with simple {channel}:{symbol} topics, inline helpers in the client are
sufficient and a separate module is not needed.
Handler configuration constants
Define handler-specific tuning constants for consistent behavior:
| Constant | Purpose | Typical value |
|---|---|---|
DEFAULT_HEARTBEAT_SECS | Interval for sending keep‑alive messages. | 15-30 |
WEBSOCKET_AUTH_WINDOW_MS | Maximum age for authentication timestamps. | 5000-30000 |
BATCH_PROCESSING_LIMIT | Maximum messages processed per event loop cycle. | 100-1000 |
Place these in websocket/handler.rs or common/consts.rs depending on scope.
Message routing
The handler uses two message enums to separate wire deserialization from emitted events. The data and execution client layers convert emitted events into Nautilus domain types.
Define two enums:
-
{Venue}WsFrame: Serde-deserialized wire frames. Contains every JSON shape the venue can send (login responses, subscription acks, channel data, order responses, errors, pings). Typicallypub(super)since only the handler uses it. -
{Venue}WsMessage: Handler output events emitted onout_tx. Contains the subset of wire data the client needs plus synthetic control variants (Reconnected,Authenticated,SendFailed) that have no wire representation. This is thepubtype consumers match on.
The handler deserializes raw text into {Venue}WsFrame, handles control frames internally
(subscription acks, login, pings), and converts relevant frames into {Venue}WsMessage events
sent via out_tx. The client receives from out_rx and routes to data/execution callbacks,
which convert venue types to Nautilus domain types using parse functions.
Message type naming convention
Types prefixed with the venue name (e.g., OKX, Bitmex) contain raw exchange-specific types.
Types prefixed with Nautilus contain normalized domain types ready for the trading system.
Wire frame enum (serde-deserialized, handler-internal):
pub(super) enum MyWsFrame {
Login { event, code, msg, conn_id },
Subscription { event, arg, conn_id, code, msg },
OrderResponse { id, op, code, msg, data },
BookData { arg, action, data: Vec<MyBookMsg> },
Data { arg, data: Value },
Error { code, msg },
Ping,
Reconnected,
}Handler output enum (emitted to client):
pub enum MyWsMessage {
BookData { arg, action, data: Vec<MyBookMsg> },
ChannelData { channel, inst_id, data: Value },
Orders(Vec<MyOrderMsg>),
OrderResponse { id, op, code, msg, data },
SendFailed { request_id, client_order_id, op, error },
Instruments(Vec<MyInstrument>),
Error(MyWebSocketError),
Reconnected,
Authenticated,
}The frame enum includes every wire shape (login acks, subscription acks, pings) for
deserialization. The output enum drops shapes the handler consumes internally and adds synthetic
variants (Authenticated, SendFailed) that originate in handler logic, not on the wire.
Include OrderResponse for venue acknowledgements (place, cancel, amend) and SendFailed for
WebSocket send failures after retries are exhausted. The execution client dispatch layer converts
these into Nautilus rejection events (OrderRejected, OrderCancelRejected, etc.).
Conversion in data/exec client:
The data client's message loop matches on {Venue}WsMessage variants and calls parse functions
to produce Nautilus domain types (Data, OrderBookDeltas, etc.). The execution client's
dispatch layer handles OrderResponse, SendFailed, and Orders variants. This keeps
the handler focused on I/O and deserialization while the client layers own domain conversion.
The execution dispatch converts order and fill messages using a two-tier routing contract:
- The handler emits venue-specific order types (e.g.,
Orders(Vec<MyOrderMsg>)). - The client dispatch layer tracks which orders were submitted through this client.
- Tracked order: convert venue types to order events (
OrderAccepted,OrderCanceled,OrderFilled, etc.) and synthesize any missing lifecycle events (e.g.,OrderAcceptedbefore a fast fill). - External/unknown order: convert to reports (
OrderStatusReportorFillReport) for downstream reconciliation.
WsDispatchState
Execution dispatch state lives in a WsDispatchState struct defined in websocket/dispatch.rs.
It tracks which lifecycle events have already been emitted to prevent duplicates across
reconnections and fast-fill races:
#[derive(Debug, Default)]
pub struct WsDispatchState {
pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
pub emitted_accepted: DashSet<ClientOrderId>,
pub triggered_orders: DashSet<ClientOrderId>,
pub filled_orders: DashSet<ClientOrderId>,
clearing: AtomicBool,
}| Field | Purpose |
|---|---|
order_identities | Maps client order ID to identity metadata set at submission. |
emitted_accepted | Prevents duplicate OrderAccepted events. |
triggered_orders | Tracks conditional orders that have triggered. |
filled_orders | Prevents duplicate OrderFilled events on reconnect replay. |
clearing | Guards concurrent eviction when sets reach capacity. |
Each DashSet is bounded by a DEDUP_CAPACITY constant (typically 10,000). When a set
reaches capacity, evict_if_full() clears it atomically using a compare-exchange on the
clearing flag to prevent concurrent clears.
The dispatch_ws_message() free function in the same module routes {Venue}WsMessage
variants to the appropriate order event builders, using WsDispatchState for dedup
and OrderIdentity for tracked-vs-external classification.
Cross-source fill deduplication
WsDispatchState prevents duplicate lifecycle events within a single stream. When an
adapter receives fills from multiple sources (WebSocket user data and HTTP reconciliation),
a separate trade-ID-level dedup is needed to prevent the same fill from being emitted twice.
The BoundedDedup<T> pattern addresses this with a fixed-capacity set backed by a
VecDeque for insertion order and an AHashSet for O(1) lookup. When the set reaches
capacity, the oldest entry is evicted (FIFO). The insert() method returns true if the
value was already present, signaling a duplicate:
struct BoundedDedup<T> {
order: VecDeque<T>,
set: AHashSet<T>,
capacity: usize,
}Use this in the execution client to track trade IDs (typically as (Ustr, i64) tuples
of symbol and trade ID). A capacity of 10,000 provides sufficient coverage for most
venues without unbounded memory growth.
Error handling
Client-side error propagation
Channel send failures (client → handler) should propagate loudly as Result<(), Error>:
impl MyWebSocketClient {
async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), Error> {
self.cmd_tx.read().await.send(cmd)
.map_err(|e| Error::ClientError(format!("Handler not available: {e}")))
}
pub async fn submit_order(...) -> Result<(), Error> {
let cmd = HandlerCommand::PlaceOrder { ... };
self.send_cmd(cmd).await // Propagates channel failures
}
}Handler-side retry logic
WebSocket send failures (handler → network) should be retried by the handler using RetryManager:
pub struct MyWsFeedHandler {
inner: Option<WebSocketClient>,
retry_manager: RetryManager<MyWsError>,
// ...
}
impl MyWsFeedHandler {
async fn send_with_retry(&self, payload: String, rate_limit_keys: Option<Vec<String>>) -> Result<(), MyWsError> {
if let Some(client) = &self.inner {
self.retry_manager.execute_with_retry(
"websocket_send",
|| async {
client.send_text(payload.clone(), rate_limit_keys.clone())
.await
.map_err(|e| MyWsError::ClientError(format!("Send failed: {e}")))
},
should_retry_error,
create_timeout_error,
).await
} else {
Err(MyWsError::ClientError("No active WebSocket client".to_string()))
}
}
async fn handle_place_order(...) -> anyhow::Result<()> {
let payload = serde_json::to_string(&request)?;
match self.send_with_retry(payload, Some(vec![RATE_LIMIT_KEY])).await {
Ok(()) => Ok(()),
Err(e) => {
// Emit SendFailed so the exec client dispatch can produce OrderRejected
let _ = self.out_tx.send(MyWsMessage::SendFailed {
request_id: request_id.clone(),
client_order_id: Some(client_order_id),
op: Some(MyWsOperation::Order),
error: e.to_string(),
});
Err(anyhow::anyhow!("Failed to send order: {e}"))
}
}
}
}
fn should_retry_error(error: &MyWsError) -> bool {
match error {
MyWsError::NetworkError(_) | MyWsError::Timeout(_) => true,
MyWsError::AuthenticationError(_) | MyWsError::ParseError(_) => false,
}
}Key principles:
- Client propagates channel failures immediately (handler unavailable).
- Handler retries transient WebSocket failures (network issues, timeouts).
- Handler emits
SendFailedwhen retries are exhausted; the exec client dispatch converts these into Nautilus rejection events (OrderRejected,OrderCancelRejected). - Use
RetryManagerfromnautilus_network::retryfor consistent backoff.
Naming conventions
Adapters follow standardized naming conventions for consistency across all venue integrations.
Channel naming: raw → msg → out
WebSocket message channels follow a two-stage transformation pipeline within the handler:
| Stage | Type | Description | Example |
|---|---|---|---|
raw | Raw WebSocket frames | Bytes/text from the network layer. | raw_rx: UnboundedReceiver<Message> |
out | Venue‑specific messages | Parsed venue message types. | out_tx: UnboundedSender<MyWsMessage> |
The handler deserializes raw frames into venue-specific types and emits them on out_tx.
The data and execution client layers then convert venue types into Nautilus domain types.
Example flow:
// Client creates output channel for venue messages
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel(); // Venue messages (MyWsMessage)
// Handler receives raw frames, outputs venue messages
let handler = MyWsFeedHandler::new(
cmd_rx,
raw_rx, // Input: Message (raw WebSocket frames)
out_tx, // Output: MyWsMessage
// ...
);Channel names reflect the data transformation stage, not the destination. Use raw_* for raw
WebSocket frames (Message) and out_* for venue-specific message types.
Backpressure strategy
WebSocket channels on latency-sensitive paths are intentionally unbounded. The platform prioritizes latency and prefers an explicit crash (OOM) over delaying or dropping data.
Do not add bounded channels, buffering limits, or backpressure unless the latency requirement changes.
Field naming: inner and command channels
Structs holding references to lower-level components follow these conventions:
| Field | Type | Description |
|---|---|---|
inner | Option<WebSocketClient> | Network‑level WebSocket client (handler only, exclusively owned). |
cmd_tx | Arc<tokio::sync::RwLock<UnboundedSender<...>>> | Command channel to handler (client side). |
cmd_rx | UnboundedReceiver<HandlerCommand> | Command channel from client (handler side). |
out_tx | UnboundedSender<{Venue}WsMessage> | Output channel to client (handler side). |
out_rx | Option<Arc<UnboundedReceiver<{Venue}WsMessage>>> | Output channel from handler (client side). |
task_handle | Option<Arc<JoinHandle<()>>> | Handler task handle. |
Example:
// Client struct
pub struct MyWebSocketClient {
cmd_tx: Arc<tokio::sync::RwLock<UnboundedSender<HandlerCommand>>>,
out_rx: Option<Arc<UnboundedReceiver<MyWsMessage>>>,
task_handle: Option<Arc<JoinHandle<()>>>,
connection_mode: Arc<ArcSwap<AtomicU8>>, // Lock-free connection state
// ...
}
impl MyWebSocketClient {
async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), Error> {
self.cmd_tx.read().await.send(cmd)
.map_err(|e| Error::ClientError(format!("Handler not available: {e}")))
}
}
// Handler struct
pub(super) struct MyWsFeedHandler {
inner: Option<WebSocketClient>, // Exclusively owned - no RwLock
cmd_rx: UnboundedReceiver<HandlerCommand>,
raw_rx: UnboundedReceiver<Message>,
out_tx: UnboundedSender<MyWsMessage>,
pending_requests: AHashMap<String, RequestData>, // Single-threaded - no locks
pending_messages: VecDeque<MyWsMessage>, // Multi-message buffer
// ...
}The handler exclusively owns WebSocketClient without locks. The client sends commands via
cmd_tx (wrapped in RwLock to allow reconnection channel replacement) and receives events
via out_rx. Use a send_cmd() helper to standardize command sending.
Type naming: {Venue}Ws{TypeSuffix}
All WebSocket-related types follow a standardized naming pattern: {Venue}Ws{TypeSuffix}
{Venue}: Capitalized venue name (e.g.,OKX,Bybit,Bitmex,Hyperliquid).Ws: Abbreviated "WebSocket" (not fully spelled out).{TypeSuffix}: Full type descriptor (e.g.,Message,Error,Request,Response).
Examples:
// Correct - abbreviated Ws, full type suffix
pub enum OKXWsMessage { ... }
pub enum BybitWsError { ... }
pub struct HyperliquidWsRequest { ... }Standard type suffixes:
Message: WebSocket message enums.Error: WebSocket error types.Request: Request message types.Response: Response message types.
Tokio channel qualification:
Always fully qualify tokio channel types as tokio::sync::mpsc:: to avoid ambiguity with
similarly-named types from other crates. Never import mpsc directly at module level.
// Correct
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MyMessage>();Split WebSocket architectures
Some venues expose multiple WebSocket endpoints with distinct protocols or encodings.
When a venue requires separate connections for market data and order management, split
the websocket/ module into submodules that mirror the connection boundaries:
src/
├── websocket/
│ ├── mod.rs # Re-exports from submodules
│ ├── streams/ # Market data pub/sub connection
│ │ ├── client.rs # Streams client
│ │ ├── handler.rs # Streams feed handler
│ │ ├── messages.rs # Streams message types
│ │ └── mod.rs
│ └── trading/ # Order management + user data (authenticated WS API)
│ ├── client.rs # Trading client
│ ├── handler.rs # Trading handler
│ ├── messages.rs # Trading message types
│ ├── user_data.rs # User data stream venue types (execution reports, etc.)
│ ├── parse.rs # Parse functions for user data -> Nautilus types
│ ├── error.rs # Trading error types
│ └── mod.rsEach submodule follows the same two-layer client/handler pattern described above. The
parent websocket/mod.rs re-exports the public client types.
The trading/ module handles both order operations (place, cancel, modify) and the
user data stream (execution reports, account updates). When the venue's authenticated
WebSocket API supports session.logon and inline user data subscriptions, both
concerns share a single authenticated connection. This avoids a separate execution/
module and the deprecated REST listenKey lifecycle.
For venues where user data events arrive on a separate stream connection (e.g.,
futures APIs that return a listenKey for a dedicated stream URL), the streams/
handler dispatches both market data and user data events from the combined connection.
Naming conventions for split architectures
Type names include the submodule qualifier to avoid ambiguity:
| Submodule | Command type | Message type |
|---|---|---|
streams/ | {Venue}WsStreamsCommand | {Venue}WsMessage (venue types) |
trading/ | {Venue}WsTradingCommand | {Venue}WsTradingMessage |
The {Venue}Ws prefix follows the standard type naming convention. The qualifier
(Streams, Trading) distinguishes types that would otherwise collide across
submodules.
When to split
Split the WebSocket module when the venue has:
- Different endpoints with different protocols (e.g., SBE binary for market data, JSON for trading)
- A dedicated order management WebSocket API (
ws-apistyle) alongside pub/sub streams - User data delivered inline on the authenticated trading connection rather than via a separate listenKey stream
Do not split when a single connection handles all message types through channel-based multiplexing (the common pattern for OKX, Bybit, and similar venues).
Multi-product WebSocket management
Some venues use the same WebSocket protocol for all product types but serve them on separate endpoints (e.g., Bybit provides distinct URLs for Linear, Spot, and Inverse). In this case the data client creates one WebSocket client per product type and manages them in a map:
pub struct MyDataClient {
ws_clients: AHashMap<MyProductType, MyWebSocketClient>,
}Each client follows the same two-layer client/handler pattern. Subscription routing inspects the instrument's product type to select the correct client. On connect, the data client iterates the map to connect all clients; on disconnect, it closes them all.
This differs from the split architecture (streams/ vs trading/) which separates by
protocol or purpose. Multi-product management separates by product type while sharing
the same protocol.
Modeling venue payloads
Use the following conventions when mirroring upstream schemas in Rust.
REST models (http::models and http::query)
- Put request and response representations in
src/http/models.rsand deriveserde::Deserialize(addserde::Serializewhen the adapter sends data back). - Mirror upstream payload names with blanket casing attributes such as
#[serde(rename_all = "camelCase")]or#[serde(rename_all = "snake_case")]; only add per-field renames when the upstream key would be an invalid Rust identifier or collide with a keyword (for example#[serde(rename = "type")] pub order_type: String). - Keep helper structs for query parameters in
src/http/query.rs, derivingserde::Serializeto remain type-safe and reusing constants fromcommon::constsinstead of duplicating literals.
WebSocket messages (websocket::messages)
- Define streaming payload types in
src/websocket/messages.rs, giving each venue topic a struct or enum that mirrors the upstream JSON. - Apply the same naming guidance as REST models: rely on blanket casing renames and keep field names aligned with the venue unless syntax forces a change; consider serde helpers such as
#[serde(tag = "op")]or#[serde(flatten)]and document the choice. - Note any intentional deviations from the upstream schema in code comments and module docs so other contributors can follow the mapping quickly.
Task management
Spawning async tasks (spawn_task)
Data and execution clients spawn background tasks for WebSocket stream processing,
periodic polling, and order submission. Wrap all spawned work with a spawn_task()
method that provides error logging and handle tracking:
fn spawn_task<F>(&self, description: &'static str, fut: F)
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let runtime = get_runtime();
let handle = runtime.spawn(async move {
if let Err(e) = fut.await {
log::warn!("{description} failed: {e:?}");
}
});
let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
tasks.retain(|handle| !handle.is_finished());
tasks.push(handle);
}Store task handles in pending_tasks: Mutex<Vec<JoinHandle<()>>>. Each call to
spawn_task prunes finished handles before pushing the new one, preventing unbounded
growth. On disconnect, abort all remaining handles.
Graceful shutdown with CancellationToken
Use tokio_util::sync::CancellationToken to coordinate shutdown across multiple spawned
tasks. The client creates a token at construction and passes clones to each spawned task.
Tasks select on the token alongside their primary work:
tokio::select! {
msg = stream.next() => { /* process */ }
_ = cancellation_token.cancelled() => { break; }
}On disconnect, the client cancels the token, which signals all tasks to exit their loops.
This complements the handler-level signal: Arc<AtomicBool> pattern: AtomicBool gates
the handler's I/O loop, while CancellationToken coordinates shutdown of tasks the client
spawned outside the handler (polling loops, reconciliation tasks, stream consumers).
Reset the token on reconnect by replacing it with a fresh CancellationToken::new() so
subsequent tasks are not born cancelled.
Testing
Adapters should ship two layers of coverage: the Rust crate that talks to the venue and the Python glue that exposes it to the wider platform. Keep the suites deterministic and colocated with the production code they protect.
Key principle: The tests/ directory is reserved for integration tests that require external infrastructure (mock Axum servers, simulated network conditions).
Unit tests for parsing, serialization, and business logic belong in #[cfg(test)] blocks within source modules.
Rust testing
Layout
crates/adapters/your_adapter/
├── src/
│ ├── http/
│ │ ├── client.rs # HTTP client + unit tests
│ │ └── parse.rs # REST payload parsers + unit tests
│ └── websocket/
│ ├── client.rs # WebSocket client + unit tests
│ └── parse.rs # Streaming parsers + unit tests
├── tests/ # Integration tests (mock servers)
│ ├── data_client.rs # Data client integration tests
│ ├── exec_client.rs # Execution client integration tests
│ ├── http.rs # HTTP client integration tests
│ └── websocket.rs # WebSocket client integration tests
└── test_data/ # Canonical venue payloads used by the suites
├── http_{method}_{endpoint}.json # Full venue responses with retCode/result/time
└── ws_{message_type}.json # WebSocket message samplesTest file organization
| File | Purpose |
|---|---|
tests/data_client.rs | Integration tests for the data client. Validates data subscriptions, historical data requests, and market data parsing. |
tests/exec_client.rs | Integration tests for the execution client. Validates order submission, modification, cancellation, and execution reports. |
tests/http.rs | Low‑level HTTP client tests. Validates request signing, error handling, and response parsing against mock Axum servers. |
tests/websocket.rs | WebSocket client tests. Validates connection lifecycle, authentication, subscriptions, and message routing. |
Guidelines:
- Place unit tests next to the module they exercise (
#[cfg(test)]blocks). Usesrc/common/testing.rs(or an equivalent helper module) for shared fixtures so production files stay tidy. - Keep Axum-based integration suites under
crates/adapters/<adapter>/tests/, mirroring the public APIs (HTTP client, WebSocket client, data client, execution client). - Data and execution client tests (
data_client.rs,exec_client.rs) should focus on higher-level behavior: subscription workflows, order lifecycle, and domain model transformations. HTTP and WebSocket tests (http.rs,websocket.rs) focus on transport-level concerns. - Store upstream payload samples (snapshots, REST replies) under
test_data/and reference them from both unit and integration tests. Name test data files consistently:http_get_{endpoint_name}.jsonfor REST responses,ws_{message_type}.jsonfor WebSocket messages. Include complete venue response envelopes (status codes, timestamps, result wrappers) rather than just the data payload. Provide multiple realistic examples in each file - for instance, position data should include long, short, and flat positions to exercise all parser branches. - Test data sourcing: Test data must be obtained from either official API documentation examples or directly from the live API via network calls. Never fabricate or generate test data manually, as this risks missing edge cases (e.g., negative precision values, scientific notation, unexpected field types) that only appear in real venue responses.
Unit tests
Unit tests belong in #[cfg(test)] blocks within source modules, not in the tests/ directory.
What to test (in source modules):
- Deserialization of venue JSON payloads into Rust structs.
- Parsing functions that convert venue types to Nautilus domain models.
- Request signing and authentication helpers.
- Enum conversions and mapping logic.
- Price, quantity, and precision calculations.
What NOT to test:
- Standard library behavior (Vec operations, HashMap lookups, string parsing).
- Third-party crate functionality (chrono date arithmetic, serde attributes).
- Test helper code itself (fixture loaders, mock builders).
Tests should exercise production code paths. If a test only verifies that Vec::extend() works or that chrono can parse a date string, it provides no value.
WebSocket unit test coverage
WebSocket unit tests exercise three areas: message deserialization, parse dispatch, and handler
logic. Each area lives in a #[cfg(test)] block within the module it tests.
Message types (messages.rs):
- Deserialize every message variant from fixture JSON files in
test_data/. - Round-trip tests: serialize a constructed struct, deserialize the output, and assert equality.
Round-trip tests catch field renames, missing
skip_serializing_ifattributes, and precision loss that deserialization-only tests miss. - Cover edge cases in venue payloads: null optional fields, empty arrays, zero quantities.
Parse functions (parse.rs):
- Exercise the fast-path byte scanner for each type tag or discriminant value.
- Exercise the slow-path fallback (fields not at expected byte positions).
- Verify unknown type tags produce a descriptive error, not a panic.
Handler logic (handler.rs):
- Verify the handler filters internal messages (heartbeats, subscription acks, pong frames) and does not forward them to consumers.
- Verify reconnect signals trigger re-authentication and emit the
Reconnectedvariant. - Verify multi-message buffering: when a single raw frame produces multiple output messages,
all messages appear in the correct order from
next(). - Verify pending-order cleanup on error and success responses.
Integration tests
Integration tests belong in the tests/ directory and exercise the public API against mock infrastructure.
What to test (in tests/ directory):
- HTTP client requests against mock Axum servers.
- WebSocket connection lifecycle, authentication, and message routing.
- Data client subscription workflows and historical data requests.
- Execution client order submission, modification, and cancellation flows.
- Error handling and retry behavior with simulated failures.
At a minimum, review existing adapter test suites for reference patterns and verify every adapter proves the same core behaviours.
HTTP client integration coverage
- Happy paths – fetch a representative public resource (e.g., instruments or mark price) and verify the response is converted into Nautilus domain models.
- Credential guard – call a private endpoint without credentials and assert a structured error; repeat with credentials to prove success.
- Rate limiting / retry mapping – surface venue-specific rate-limit responses and assert the adapter produces
the correct
OkxError/BitmexHttpErrorvariant so the retry policy can react. - Query builders – exercise builders for paginated/time-bounded endpoints (historical trades, candles) and
assert the emitted query string matches the venue specification (
after,before,limit, etc.). - Error translation – verify non-2xx upstream responses map to adapter error enums with the original code/message attached.
WebSocket client integration coverage
- Login handshake – confirm a successful login flips the internal auth state and test failure cases where the server returns a non-zero code; the client should surface an error and avoid marking itself as authenticated.
- Ping/Pong – prove both text-based and control-frame pings trigger immediate pong responses.
- Subscription lifecycle – assert subscription requests/acks are emitted for public and private channels, and that unsubscribe calls remove entries from the cached subscription sets.
- Reconnect behaviour – simulate a disconnect and verify the client re-authenticates, restores public channels, and skips private channels that were explicitly unsubscribed pre-disconnect.
- Message routing – feed representative data/ack/error payloads through the socket and assert they arrive on the
public stream as the correct
{Venue}WsMessagevariant. - Quota tagging – (optional but recommended) validate that order/cancel/amend operations are tagged with the appropriate quota label so rate limiting can be enforced independently of subscription traffic.
CI robustness:
- Never use bare
tokio::time::sleep()with arbitrary durations. Tests become flaky under CI load and slower than necessary. - Use the
wait_until_asynctest helper to poll for conditions with timeout. Tests return immediately when the condition is met and fail deterministically on timeout rather than relying on arbitrary sleep durations. - Prefer event-driven assertions with shared state (for example, collect
subscription_events, track pending/confirmed topics, wait forconnection_counttransitions). - Use adapter-specific helpers to gate on explicit signals such as "auth confirmed" or "reconnection finished" so suites remain deterministic under load.
Data and execution client integration testing
Data (tests/data_client.rs) and execution (tests/exec_client.rs) client integration tests verify the full message flow from WebSocket through parsing to event emission.
Test infrastructure:
| Component | Purpose |
|---|---|
| Mock Axum server | Serves HTTP endpoints (instruments, fee rates, positions) and WebSocket channels. |
TestServerState | Tracks connections, subscriptions, and authentication state for assertions. |
| Thread‑local event channels | set_data_event_sender() / set_exec_event_sender() for capturing emitted events. |
wait_until_async | Polls conditions with timeout for deterministic async assertions. |
Data client coverage:
| Test scenario | Validates |
|---|---|
| Connect/disconnect | Connection lifecycle, WebSocket establishment, clean shutdown. |
| Subscribe trades | Trade tick events emitted to data channel. |
| Subscribe quotes | Quote events from ticker (LINEAR) or orderbook (SPOT). |
| Subscribe book deltas | OrderBookDeltas events from orderbook snapshots/updates. |
| Subscribe mark/index prices | Filtered by subscription state (only emit when subscribed). |
| Reset state | Subscription tracking cleared, connection terminated. |
| Instruments on connect | Instrument events emitted during connection setup. |
Execution client coverage:
| Test scenario | Validates |
|---|---|
| Connect/disconnect | Auth handshake, private + trade WS connections, subscriptions. |
| Demo mode | Only private WS connects (trade WS skipped for HTTP fallback). |
| Order submission | Order accepted/rejected events, venue ID correlation. |
| Order modification/cancel | Update and cancel acknowledgment events. |
| Position/wallet updates | PositionStatusReport and AccountState events. |
Key patterns:
- Each
#[tokio::test]runs on a fresh thread, ensuring thread-local channel isolation. - Use
wait_until_asyncfor subscription/connection state instead of arbitrary sleeps. - Drain instrument events before subscription tests to isolate assertions.
- Verify subscription state in
TestServerStatebefore asserting on emitted events.
Python testing
Layout
tests/integration_tests/adapters/your_adapter/
├── conftest.py # Shared fixtures (mock clients, test instruments)
├── test_data.py # Data client integration tests
├── test_execution.py # Execution client integration tests
├── test_providers.py # Instrument provider tests
├── test_factories.py # Factory and configuration tests
└── __init__.py # Package initializationTest file organization
| File | Purpose |
|---|---|
test_data.py | Tests for LiveDataClient and LiveMarketDataClient. Validates subscriptions, data parsing, and message handling. |
test_execution.py | Tests for LiveExecutionClient. Validates order submission, modification, cancellation, and execution reports. |
test_providers.py | Tests for InstrumentProvider. Validates instrument loading, filtering, and caching behavior. |
test_factories.py | Tests for factory functions. Validates client instantiation and configuration wiring. |
Guidelines:
- Exercise the adapter's Python surface (instrument providers, data/execution clients, factories) inside
tests/integration_tests/adapters/<adapter>/. - Mock the PyO3 boundary (
nautilus_pyo3shims, stubbed Rust clients) so tests stay fast while verifying that configuration, factory wiring, and error handling match the exported Rust API. - Mirror the Rust integration coverage: when the Rust suite adds a new behaviour (e.g., reconnection replay, error propagation), assert the Python layer performs the same sequence (connect/disconnect, submit/amend/cancel translations, venue ID hand-off, failure handling). BitMEX's Python tests provide the target level of detail.
Documentation
All adapter documentation (module-level docs, doc comments, and inline comments) should follow the Documentation Style Guide.
Rust documentation requirements
Every Rust module, struct, and public method must have documentation comments. Use third-person declarative voice (e.g., "Returns the account ID" not "Return the account ID").
- Modules: Use
//!doc comments at the top of each file (after the license header) to describe the module's purpose. - Structs: Use
///doc comments above struct definitions. Keep descriptions concise; one sentence is often sufficient. - Public methods: Every
pub fnandpub async fnmust have a///doc comment describing what the method does. Do not document individual parameters in a separate# Argumentssection. The type signatures and names should be self-explanatory. Parameters may be mentioned in the description when behavior is complex or non-obvious.
What NOT to document:
- Private methods and fields (unless complex logic warrants it).
- Individual parameters/arguments (use descriptive names instead).
- Implementation details that are obvious from the code.
- Files in the
python/module (PyO3 bindings). Documentation conventions are TBD (may use numpydoc specification).
Python adapter layer
Step-by-step guide to building the Python layer of an adapter using the provided template.
Method ordering convention
When implementing adapter classes, group methods by category in this order:
- Connection handlers:
_connect,_disconnect - Subscribe handlers:
_subscribe,_subscribe_* - Unsubscribe handlers:
_unsubscribe,_unsubscribe_* - Request handlers:
_request,_request_*
This keeps related functionality together rather than interleaving subscribe/unsubscribe pairs.
InstrumentProvider
The InstrumentProvider loads instrument definitions from the venue: all instruments, specific
instruments by ID, or a filtered subset.
from nautilus_trader.common.providers import InstrumentProvider
from nautilus_trader.model import InstrumentId
class TemplateInstrumentProvider(InstrumentProvider):
"""Example `InstrumentProvider` showing the minimal overrides required for a complete integration."""
async def load_all_async(self, filters: dict | None = None) -> None:
raise NotImplementedError("implement `load_all_async` in your adapter subclass")
async def load_ids_async(self, instrument_ids: list[InstrumentId], filters: dict | None = None) -> None:
raise NotImplementedError("implement `load_ids_async` in your adapter subclass")
async def load_async(self, instrument_id: InstrumentId, filters: dict | None = None) -> None:
raise NotImplementedError("implement `load_async` in your adapter subclass")| Method | Description |
|---|---|
load_all_async | Loads all instruments asynchronously, optionally with filters. |
load_ids_async | Loads specific instruments by their IDs. |
load_async | Loads a single instrument by its ID. |
DataClient
The LiveDataClient handles data feeds that are not market data: news feeds, custom data streams,
or other non-market sources.
from nautilus_trader.data.messages import RequestData
from nautilus_trader.data.messages import SubscribeData
from nautilus_trader.data.messages import UnsubscribeData
from nautilus_trader.live.data_client import LiveDataClient
from nautilus_trader.model import DataType
class TemplateLiveDataClient(LiveDataClient):
"""Example `LiveDataClient` showing the overridable abstract methods."""
async def _connect(self) -> None:
raise NotImplementedError("implement `_connect` in your adapter subclass")
async def _disconnect(self) -> None:
raise NotImplementedError("implement `_disconnect` in your adapter subclass")
async def _subscribe(self, command: SubscribeData) -> None:
raise NotImplementedError("implement `_subscribe` in your adapter subclass")
async def _unsubscribe(self, command: UnsubscribeData) -> None:
raise NotImplementedError("implement `_unsubscribe` in your adapter subclass")
async def _request(self, request: RequestData) -> None:
raise NotImplementedError("implement `_request` in your adapter subclass")| Method | Description |
|---|---|
_connect | Establishes a connection to the data provider. |
_disconnect | Closes the connection to the data provider. |
_subscribe | Subscribes to a specific data type. |
_unsubscribe | Unsubscribes from a specific data type. |
_request | Requests data from the provider. |
MarketDataClient
The MarketDataClient handles market-specific data: order books, top-of-book quotes and trades,
instrument status updates, and historical data requests.
from nautilus_trader.data.messages import RequestBars
from nautilus_trader.data.messages import RequestData
from nautilus_trader.data.messages import RequestInstrument
from nautilus_trader.data.messages import RequestInstruments
from nautilus_trader.data.messages import RequestOrderBookDeltas
from nautilus_trader.data.messages import RequestOrderBookDepth
from nautilus_trader.data.messages import RequestOrderBookSnapshot
from nautilus_trader.data.messages import RequestQuoteTicks
from nautilus_trader.data.messages import RequestTradeTicks
from nautilus_trader.data.messages import SubscribeBars
from nautilus_trader.data.messages import SubscribeData
from nautilus_trader.data.messages import SubscribeFundingRates
from nautilus_trader.data.messages import SubscribeIndexPrices
from nautilus_trader.data.messages import SubscribeInstrument
from nautilus_trader.data.messages import SubscribeInstrumentClose
from nautilus_trader.data.messages import SubscribeInstruments
from nautilus_trader.data.messages import SubscribeInstrumentStatus
from nautilus_trader.data.messages import SubscribeMarkPrices
from nautilus_trader.data.messages import SubscribeOrderBook
from nautilus_trader.data.messages import SubscribeQuoteTicks
from nautilus_trader.data.messages import SubscribeTradeTicks
from nautilus_trader.data.messages import UnsubscribeBars
from nautilus_trader.data.messages import UnsubscribeData
from nautilus_trader.data.messages import UnsubscribeFundingRates
from nautilus_trader.data.messages import UnsubscribeIndexPrices
from nautilus_trader.data.messages import UnsubscribeInstrument
from nautilus_trader.data.messages import UnsubscribeInstrumentClose
from nautilus_trader.data.messages import UnsubscribeInstruments
from nautilus_trader.data.messages import UnsubscribeInstrumentStatus
from nautilus_trader.data.messages import UnsubscribeMarkPrices
from nautilus_trader.data.messages import UnsubscribeOrderBook
from nautilus_trader.data.messages import UnsubscribeQuoteTicks
from nautilus_trader.data.messages import UnsubscribeTradeTicks
from nautilus_trader.live.data_client import LiveMarketDataClient
class TemplateLiveMarketDataClient(LiveMarketDataClient):
"""Example `LiveMarketDataClient` showing the overridable abstract methods."""
async def _connect(self) -> None:
raise NotImplementedError("implement `_connect` in your adapter subclass")
async def _disconnect(self) -> None:
raise NotImplementedError("implement `_disconnect` in your adapter subclass")
async def _subscribe(self, command: SubscribeData) -> None:
raise NotImplementedError("implement `_subscribe` in your adapter subclass")
async def _subscribe_instruments(self, command: SubscribeInstruments) -> None:
raise NotImplementedError("implement `_subscribe_instruments` in your adapter subclass")
async def _subscribe_instrument(self, command: SubscribeInstrument) -> None:
raise NotImplementedError("implement `_subscribe_instrument` in your adapter subclass")
async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> None:
raise NotImplementedError("implement `_subscribe_order_book_deltas` in your adapter subclass")
async def _subscribe_order_book_depth(self, command: SubscribeOrderBook) -> None:
raise NotImplementedError("implement `_subscribe_order_book_depth` in your adapter subclass")
async def _subscribe_quote_ticks(self, command: SubscribeQuoteTicks) -> None:
raise NotImplementedError("implement `_subscribe_quote_ticks` in your adapter subclass")
async def _subscribe_trade_ticks(self, command: SubscribeTradeTicks) -> None:
raise NotImplementedError("implement `_subscribe_trade_ticks` in your adapter subclass")
async def _subscribe_mark_prices(self, command: SubscribeMarkPrices) -> None:
raise NotImplementedError("implement `_subscribe_mark_prices` in your adapter subclass")
async def _subscribe_index_prices(self, command: SubscribeIndexPrices) -> None:
raise NotImplementedError("implement `_subscribe_index_prices` in your adapter subclass")
async def _subscribe_bars(self, command: SubscribeBars) -> None:
raise NotImplementedError("implement `_subscribe_bars` in your adapter subclass")
async def _subscribe_funding_rates(self, command: SubscribeFundingRates) -> None:
raise NotImplementedError("implement `_subscribe_funding_rates` in your adapter subclass")
async def _subscribe_instrument_status(self, command: SubscribeInstrumentStatus) -> None:
raise NotImplementedError("implement `_subscribe_instrument_status` in your adapter subclass")
async def _subscribe_instrument_close(self, command: SubscribeInstrumentClose) -> None:
raise NotImplementedError("implement `_subscribe_instrument_close` in your adapter subclass")
async def _subscribe_option_greeks(self, command: SubscribeOptionGreeks) -> None:
raise NotImplementedError("implement `_subscribe_option_greeks` in your adapter subclass")
async def _unsubscribe(self, command: UnsubscribeData) -> None:
raise NotImplementedError("implement `_unsubscribe` in your adapter subclass")
async def _unsubscribe_instruments(self, command: UnsubscribeInstruments) -> None:
raise NotImplementedError("implement `_unsubscribe_instruments` in your adapter subclass")
async def _unsubscribe_instrument(self, command: UnsubscribeInstrument) -> None:
raise NotImplementedError("implement `_unsubscribe_instrument` in your adapter subclass")
async def _unsubscribe_order_book_deltas(self, command: UnsubscribeOrderBook) -> None:
raise NotImplementedError("implement `_unsubscribe_order_book_deltas` in your adapter subclass")
async def _unsubscribe_order_book_depth(self, command: UnsubscribeOrderBook) -> None:
raise NotImplementedError("implement `_unsubscribe_order_book_depth` in your adapter subclass")
async def _unsubscribe_quote_ticks(self, command: UnsubscribeQuoteTicks) -> None:
raise NotImplementedError("implement `_unsubscribe_quote_ticks` in your adapter subclass")
async def _unsubscribe_trade_ticks(self, command: UnsubscribeTradeTicks) -> None:
raise NotImplementedError("implement `_unsubscribe_trade_ticks` in your adapter subclass")
async def _unsubscribe_mark_prices(self, command: UnsubscribeMarkPrices) -> None:
raise NotImplementedError("implement `_unsubscribe_mark_prices` in your adapter subclass")
async def _unsubscribe_index_prices(self, command: UnsubscribeIndexPrices) -> None:
raise NotImplementedError("implement `_unsubscribe_index_prices` in your adapter subclass")
async def _unsubscribe_bars(self, command: UnsubscribeBars) -> None:
raise NotImplementedError("implement `_unsubscribe_bars` in your adapter subclass")
async def _unsubscribe_funding_rates(self, command: UnsubscribeFundingRates) -> None:
raise NotImplementedError("implement `_unsubscribe_funding_rates` in your adapter subclass")
async def _unsubscribe_instrument_status(self, command: UnsubscribeInstrumentStatus) -> None:
raise NotImplementedError("implement `_unsubscribe_instrument_status` in your adapter subclass")
async def _unsubscribe_instrument_close(self, command: UnsubscribeInstrumentClose) -> None:
raise NotImplementedError("implement `_unsubscribe_instrument_close` in your adapter subclass")
async def _unsubscribe_option_greeks(self, command: UnsubscribeOptionGreeks) -> None:
raise NotImplementedError("implement `_unsubscribe_option_greeks` in your adapter subclass")
async def _request(self, request: RequestData) -> None:
raise NotImplementedError("implement `_request` in your adapter subclass")
async def _request_instrument(self, request: RequestInstrument) -> None:
raise NotImplementedError("implement `_request_instrument` in your adapter subclass")
async def _request_instruments(self, request: RequestInstruments) -> None:
raise NotImplementedError("implement `_request_instruments` in your adapter subclass")
async def _request_order_book_deltas(self, request: RequestOrderBookDeltas) -> None:
raise NotImplementedError("implement `_request_order_book_deltas` in your adapter subclass")
async def _request_order_book_depth(self, request: RequestOrderBookDepth) -> None:
raise NotImplementedError("implement `_request_order_book_depth` in your adapter subclass")
async def _request_order_book_snapshot(self, request: RequestOrderBookSnapshot) -> None:
raise NotImplementedError("implement `_request_order_book_snapshot` in your adapter subclass")
async def _request_quote_ticks(self, request: RequestQuoteTicks) -> None:
raise NotImplementedError("implement `_request_quote_ticks` in your adapter subclass")
async def _request_trade_ticks(self, request: RequestTradeTicks) -> None:
raise NotImplementedError("implement `_request_trade_ticks` in your adapter subclass")
async def _request_bars(self, request: RequestBars) -> None:
raise NotImplementedError("implement `_request_bars` in your adapter subclass")| Method | Description |
|---|---|
_connect | Establishes a connection to the venue APIs. |
_disconnect | Closes the connection to the venue APIs. |
_subscribe | Subscribes to generic data (base for custom types). |
_subscribe_instruments | Subscribes to market data for multiple instruments. |
_subscribe_instrument | Subscribes to market data for a single instrument. |
_subscribe_order_book_deltas | Subscribes to order book delta updates. |
_subscribe_order_book_depth | Subscribes to order book depth updates. |
_subscribe_quote_ticks | Subscribes to top‑of‑book quote updates. |
_subscribe_trade_ticks | Subscribes to trade tick updates. |
_subscribe_mark_prices | Subscribes to mark price updates. |
_subscribe_index_prices | Subscribes to index price updates. |
_subscribe_bars | Subscribes to bar/candlestick updates. |
_subscribe_funding_rates | Subscribes to funding rate updates. |
_subscribe_instrument_status | Subscribes to instrument status updates. |
_subscribe_instrument_close | Subscribes to instrument close price updates. |
_subscribe_option_greeks | Subscribes to option greeks updates. |
_unsubscribe | Unsubscribes from generic data (base for custom types). |
_unsubscribe_instruments | Unsubscribes from market data for multiple instruments. |
_unsubscribe_instrument | Unsubscribes from market data for a single instrument. |
_unsubscribe_order_book_deltas | Unsubscribes from order book delta updates. |
_unsubscribe_order_book_depth | Unsubscribes from order book depth updates. |
_unsubscribe_quote_ticks | Unsubscribes from quote tick updates. |
_unsubscribe_trade_ticks | Unsubscribes from trade tick updates. |
_unsubscribe_mark_prices | Unsubscribes from mark price updates. |
_unsubscribe_index_prices | Unsubscribes from index price updates. |
_unsubscribe_bars | Unsubscribes from bar updates. |
_unsubscribe_funding_rates | Unsubscribes from funding rate updates. |
_unsubscribe_instrument_status | Unsubscribes from instrument status updates. |
_unsubscribe_instrument_close | Unsubscribes from instrument close price updates. |
_unsubscribe_option_greeks | Unsubscribes from option greeks updates. |
_request | Requests generic data (base for custom types). |
_request_instrument | Requests historical data for a single instrument. |
_request_instruments | Requests historical data for multiple instruments. |
_request_order_book_snapshot | Requests an order book snapshot. |
_request_order_book_depth | Requests order book depth. |
_request_order_book_deltas | Requests historical order book deltas. |
_request_quote_ticks | Requests historical quote tick data. |
_request_trade_ticks | Requests historical trade tick data. |
_request_bars | Requests historical bar data. |
_request_funding_rates | Requests historical funding rate data. |
Order book delta flag requirements
When implementing _subscribe_order_book_deltas or streaming order book
data, adapters must set RecordFlag flags correctly on each
OrderBookDelta. See also Delta flags and event boundaries.
-
F_LAST: Set on the last delta of every logical event group. TheDataEngineuses this flag as the flush signal whenbuffer_deltasis enabled. Without it, deltas accumulate indefinitely and are never published to subscribers. -
F_SNAPSHOT: Set on all deltas that belong to a snapshot sequence (aClearaction followed byAddactions reconstructing the book). -
Empty book snapshots: When emitting a snapshot for an empty book, the
Cleardelta must haveF_SNAPSHOT | F_LAST. Otherwise buffered consumers never receive it. -
Incremental updates: Each venue update message ends with a delta that has
F_LASTset. If the venue batches multiple updates into one message, terminate each logical group withF_LAST.
from nautilus_trader.model.enums import RecordFlag
# Incremental update (single event)
delta = OrderBookDelta(
instrument_id=instrument_id,
action=BookAction.UPDATE,
order=order,
flags=RecordFlag.F_LAST, # Last (and only) delta in this event
sequence=sequence,
ts_event=ts_event,
ts_init=ts_init,
)
# Snapshot sequence
clear_delta = OrderBookDelta(
instrument_id=instrument_id,
action=BookAction.CLEAR,
order=NULL_ORDER,
flags=RecordFlag.F_SNAPSHOT, # Not the last delta
...
)
last_add_delta = OrderBookDelta(
instrument_id=instrument_id,
action=BookAction.ADD,
order=last_order,
flags=RecordFlag.F_SNAPSHOT | RecordFlag.F_LAST, # End of snapshot
...
)A missing F_LAST is a silent bug: no error is raised, but subscribers
never receive the data when buffering is enabled.
ExecutionClient
The ExecutionClient manages order submission, modification, and cancellation against the venue
trading system.
from nautilus_trader.execution.messages import BatchCancelOrders
from nautilus_trader.execution.messages import CancelAllOrders
from nautilus_trader.execution.messages import CancelOrder
from nautilus_trader.execution.messages import GenerateFillReports
from nautilus_trader.execution.messages import GenerateOrderStatusReport
from nautilus_trader.execution.messages import GenerateOrderStatusReports
from nautilus_trader.execution.messages import GeneratePositionStatusReports
from nautilus_trader.execution.messages import ModifyOrder
from nautilus_trader.execution.messages import SubmitOrder
from nautilus_trader.execution.messages import SubmitOrderList
from nautilus_trader.execution.reports import ExecutionMassStatus
from nautilus_trader.execution.reports import FillReport
from nautilus_trader.execution.reports import OrderStatusReport
from nautilus_trader.execution.reports import PositionStatusReport
from nautilus_trader.live.execution_client import LiveExecutionClient
class TemplateLiveExecutionClient(LiveExecutionClient):
"""Example `LiveExecutionClient` outlining the required overrides."""
async def _connect(self) -> None:
raise NotImplementedError("implement `_connect` in your adapter subclass")
async def _disconnect(self) -> None:
raise NotImplementedError("implement `_disconnect` in your adapter subclass")
async def _submit_order(self, command: SubmitOrder) -> None:
raise NotImplementedError("implement `_submit_order` in your adapter subclass")
async def _submit_order_list(self, command: SubmitOrderList) -> None:
raise NotImplementedError("implement `_submit_order_list` in your adapter subclass")
async def _modify_order(self, command: ModifyOrder) -> None:
raise NotImplementedError("implement `_modify_order` in your adapter subclass")
async def _cancel_order(self, command: CancelOrder) -> None:
raise NotImplementedError("implement `_cancel_order` in your adapter subclass")
async def _cancel_all_orders(self, command: CancelAllOrders) -> None:
raise NotImplementedError("implement `_cancel_all_orders` in your adapter subclass")
async def _batch_cancel_orders(self, command: BatchCancelOrders) -> None:
raise NotImplementedError("implement `_batch_cancel_orders` in your adapter subclass")
async def generate_order_status_report(
self,
command: GenerateOrderStatusReport,
) -> OrderStatusReport | None:
raise NotImplementedError("method `generate_order_status_report` must be implemented in the subclass")
async def generate_order_status_reports(
self,
command: GenerateOrderStatusReports,
) -> list[OrderStatusReport]:
raise NotImplementedError("method `generate_order_status_reports` must be implemented in the subclass")
async def generate_fill_reports(
self,
command: GenerateFillReports,
) -> list[FillReport]:
raise NotImplementedError("method `generate_fill_reports` must be implemented in the subclass")
async def generate_position_status_reports(
self,
command: GeneratePositionStatusReports,
) -> list[PositionStatusReport]:
raise NotImplementedError("method `generate_position_status_reports` must be implemented in the subclass")
async def generate_mass_status(
self,
lookback_mins: int | None = None,
) -> ExecutionMassStatus | None:
raise NotImplementedError("method `generate_mass_status` must be implemented in the subclass")| Method | Description |
|---|---|
_connect | Establishes a connection to the venue APIs. |
_disconnect | Closes the connection to the venue APIs. |
_submit_order | Submits a new order to the venue. |
_submit_order_list | Submits a list of orders to the venue. |
_modify_order | Modifies an existing order on the venue. |
_cancel_order | Cancels a specific order on the venue. |
_cancel_all_orders | Cancels all orders for an instrument on the venue. |
_batch_cancel_orders | Cancels a batch of orders for an instrument on the venue. |
generate_order_status_report | Generates a report for a specific order on the venue. |
generate_order_status_reports | Generates reports for all orders on the venue. |
generate_fill_reports | Generates reports for filled orders on the venue. |
generate_position_status_reports | Generates reports for position status on the venue. |
generate_mass_status | Generates execution mass status reports. |
Configuration
Configuration classes hold adapter-specific settings like API keys and connection details.
from nautilus_trader.config import LiveDataClientConfig
from nautilus_trader.config import LiveExecClientConfig
class TemplateDataClientConfig(LiveDataClientConfig):
"""Configuration for `TemplateDataClient` instances."""
api_key: str
api_secret: str
base_url: str
class TemplateExecClientConfig(LiveExecClientConfig):
"""Configuration for `TemplateExecClient` instances."""
api_key: str
api_secret: str
base_url: strKey attributes:
api_key: The API key for authenticating with the data provider.api_secret: The API secret for authenticating with the data provider.base_url: The base URL for connecting to the data provider's API.
Common test scenarios
Exercise adapters across every venue behaviour they claim to support. Incorporate these scenarios into the Rust and Python suites.
Product coverage
Test each supported product family.
- Spot instruments
- Derivatives (perpetuals, futures, swaps)
- Options and structured products
Order flow
- Cover each supported order type (limit, market, stop, conditional, etc.) under every venue time-in-force option, expiries, and rejection handling.
- Submit buy and sell market orders and assert balance, position, and average-price updates align with venue responses.
- Submit representative buy and sell limit orders, verifying acknowledgements, execution reports, full and partial fills, and cancel flows.
State management
- Start sessions with existing open orders to verify the adapter reconciles state on connect before issuing new commands.
- Seed preloaded positions and confirm position snapshots, valuation, and PnL agree with the venue prior to trading.
Data testing spec
See the full Data Testing Spec for the DataTester test matrix.
Execution testing spec
See the full Execution Testing Spec for the ExecTester test matrix.