Skip to main content

nautilus_backtest/
execution_client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `BacktestExecutionClient` implementation for backtesting.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22    cache::Cache,
23    clients::ExecutionClient,
24    clock::Clock,
25    factories::OrderEventFactory,
26    messages::execution::{
27        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28        SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35    accounts::AccountAny,
36    enums::OmsType,
37    events::OrderEventAny,
38    identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39    orders::OrderAny,
40    types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45/// Execution client implementation for backtesting trading operations.
46///
47/// The `BacktestExecutionClient` provides an execution client interface for
48/// backtesting environments, handling order management and trade execution
49/// through simulated exchanges. It processes trading commands and coordinates
50/// with the simulation infrastructure to provide realistic execution behavior.
51#[derive(Clone)]
52pub struct BacktestExecutionClient {
53    core: ExecutionClientCore,
54    factory: OrderEventFactory,
55    cache: Rc<RefCell<Cache>>,
56    clock: Rc<RefCell<dyn Clock>>,
57    exchange: WeakCell<SimulatedExchange>,
58    /// Buffered order events for deferred processing.
59    ///
60    /// Events like `OrderSubmitted` cannot be sent synchronously through
61    /// the msgbus during `submit_order` because the exec engine holds a
62    /// borrow via its `execute` handler. Instead, events are buffered here
63    /// and drained by the engine after the execute borrow is released.
64    queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65    routing: bool,
66    _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct(stringify!(BacktestExecutionClient))
72            .field("client_id", &self.core.client_id)
73            .field("routing", &self.routing)
74            .finish()
75    }
76}
77
78impl BacktestExecutionClient {
79    #[allow(clippy::too_many_arguments)]
80    pub fn new(
81        trader_id: TraderId,
82        account_id: AccountId,
83        exchange: Rc<RefCell<SimulatedExchange>>,
84        cache: Rc<RefCell<Cache>>,
85        clock: Rc<RefCell<dyn Clock>>,
86        routing: Option<bool>,
87        frozen_account: Option<bool>,
88    ) -> Self {
89        let routing = routing.unwrap_or(false);
90        let frozen_account = frozen_account.unwrap_or(false);
91        let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
92        let exchange_id = exchange_shared.borrow().id;
93        let account_type = exchange.borrow().account_type;
94        let base_currency = exchange.borrow().base_currency;
95
96        let core = ExecutionClientCore::new(
97            trader_id,
98            ClientId::from(exchange_id.as_str()),
99            Venue::from(exchange_id.as_str()),
100            exchange.borrow().oms_type,
101            account_id,
102            account_type,
103            base_currency,
104            cache.clone(),
105        );
106
107        let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
108
109        if !frozen_account {
110            // TODO Register calculated account
111        }
112
113        Self {
114            core,
115            factory,
116            exchange: exchange_shared.downgrade(),
117            cache,
118            clock,
119            queued_events: Rc::new(RefCell::new(Vec::new())),
120            routing,
121            _frozen_account: frozen_account,
122        }
123    }
124
125    fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
126        self.cache
127            .borrow()
128            .order(client_order_id)
129            .cloned()
130            .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
131    }
132
133    /// Drain buffered order events, sending each to the exec engine.
134    pub fn drain_queued_events(&self) {
135        let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
136        let endpoint = MessagingSwitchboard::exec_engine_process();
137        for event in events {
138            msgbus::send_order_event(endpoint, event);
139        }
140    }
141}
142
143#[async_trait(?Send)]
144impl ExecutionClient for BacktestExecutionClient {
145    fn is_connected(&self) -> bool {
146        self.core.is_connected()
147    }
148
149    fn client_id(&self) -> ClientId {
150        self.core.client_id
151    }
152
153    fn account_id(&self) -> AccountId {
154        self.core.account_id
155    }
156
157    fn venue(&self) -> Venue {
158        self.core.venue
159    }
160
161    fn oms_type(&self) -> OmsType {
162        self.core.oms_type
163    }
164
165    fn get_account(&self) -> Option<AccountAny> {
166        self.cache.borrow().account(&self.core.account_id).cloned()
167    }
168
169    fn generate_account_state(
170        &self,
171        balances: Vec<AccountBalance>,
172        margins: Vec<MarginBalance>,
173        reported: bool,
174        ts_event: UnixNanos,
175    ) -> anyhow::Result<()> {
176        let ts_init = self.clock.borrow().timestamp_ns();
177        let state = self
178            .factory
179            .generate_account_state(balances, margins, reported, ts_event, ts_init);
180        let endpoint = MessagingSwitchboard::portfolio_update_account();
181        msgbus::send_account_state(endpoint, &state);
182        Ok(())
183    }
184
185    fn start(&mut self) -> anyhow::Result<()> {
186        self.core.set_connected();
187        log::info!("Backtest execution client started");
188        Ok(())
189    }
190
191    fn stop(&mut self) -> anyhow::Result<()> {
192        self.core.set_disconnected();
193        log::info!("Backtest execution client stopped");
194        Ok(())
195    }
196
197    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
198        // Buffer the OrderSubmitted event for deferred processing to avoid
199        // RefCell re-entrancy (exec_engine holds a borrow during execute)
200        let order = self.get_order(&cmd.client_order_id)?;
201        let ts_init = self.clock.borrow().timestamp_ns();
202        let event = self.factory.generate_order_submitted(&order, ts_init);
203        self.queued_events.borrow_mut().push(event);
204
205        if let Some(exchange) = self.exchange.upgrade() {
206            exchange
207                .borrow_mut()
208                .send(TradingCommand::SubmitOrder(cmd.clone()));
209        } else {
210            log::error!("submit_order: SimulatedExchange has been dropped");
211        }
212        Ok(())
213    }
214
215    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
216        let ts_init = self.clock.borrow().timestamp_ns();
217
218        let orders: Vec<OrderAny> = self
219            .cache
220            .borrow()
221            .orders_for_ids(&cmd.order_list.client_order_ids, cmd);
222
223        // Buffer events for deferred processing
224        let mut queued = self.queued_events.borrow_mut();
225        for order in &orders {
226            let event = self.factory.generate_order_submitted(order, ts_init);
227            queued.push(event);
228        }
229        drop(queued);
230
231        if let Some(exchange) = self.exchange.upgrade() {
232            exchange
233                .borrow_mut()
234                .send(TradingCommand::SubmitOrderList(cmd.clone()));
235        } else {
236            log::error!("submit_order_list: SimulatedExchange has been dropped");
237        }
238        Ok(())
239    }
240
241    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
242        if let Some(exchange) = self.exchange.upgrade() {
243            exchange
244                .borrow_mut()
245                .send(TradingCommand::ModifyOrder(cmd.clone()));
246        } else {
247            log::error!("modify_order: SimulatedExchange has been dropped");
248        }
249        Ok(())
250    }
251
252    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
253        if let Some(exchange) = self.exchange.upgrade() {
254            exchange
255                .borrow_mut()
256                .send(TradingCommand::CancelOrder(cmd.clone()));
257        } else {
258            log::error!("cancel_order: SimulatedExchange has been dropped");
259        }
260        Ok(())
261    }
262
263    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
264        if let Some(exchange) = self.exchange.upgrade() {
265            exchange
266                .borrow_mut()
267                .send(TradingCommand::CancelAllOrders(cmd.clone()));
268        } else {
269            log::error!("cancel_all_orders: SimulatedExchange has been dropped");
270        }
271        Ok(())
272    }
273
274    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
275        if let Some(exchange) = self.exchange.upgrade() {
276            exchange
277                .borrow_mut()
278                .send(TradingCommand::BatchCancelOrders(cmd.clone()));
279        } else {
280            log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
281        }
282        Ok(())
283    }
284
285    fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
286        if let Some(exchange) = self.exchange.upgrade() {
287            exchange
288                .borrow_mut()
289                .send(TradingCommand::QueryAccount(cmd.clone()));
290        } else {
291            log::error!("query_account: SimulatedExchange has been dropped");
292        }
293        Ok(())
294    }
295
296    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
297        if let Some(exchange) = self.exchange.upgrade() {
298            exchange
299                .borrow_mut()
300                .send(TradingCommand::QueryOrder(cmd.clone()));
301        } else {
302            log::error!("query_order: SimulatedExchange has been dropped");
303        }
304        Ok(())
305    }
306}