1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22 cache::Cache,
23 clients::ExecutionClient,
24 clock::Clock,
25 factories::OrderEventFactory,
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28 SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35 accounts::AccountAny,
36 enums::OmsType,
37 events::OrderEventAny,
38 identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39 orders::OrderAny,
40 types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45#[derive(Clone)]
52pub struct BacktestExecutionClient {
53 core: ExecutionClientCore,
54 factory: OrderEventFactory,
55 cache: Rc<RefCell<Cache>>,
56 clock: Rc<RefCell<dyn Clock>>,
57 exchange: WeakCell<SimulatedExchange>,
58 queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65 routing: bool,
66 _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct(stringify!(BacktestExecutionClient))
72 .field("client_id", &self.core.client_id)
73 .field("routing", &self.routing)
74 .finish()
75 }
76}
77
78impl BacktestExecutionClient {
79 #[allow(clippy::too_many_arguments)]
80 pub fn new(
81 trader_id: TraderId,
82 account_id: AccountId,
83 exchange: Rc<RefCell<SimulatedExchange>>,
84 cache: Rc<RefCell<Cache>>,
85 clock: Rc<RefCell<dyn Clock>>,
86 routing: Option<bool>,
87 frozen_account: Option<bool>,
88 ) -> Self {
89 let routing = routing.unwrap_or(false);
90 let frozen_account = frozen_account.unwrap_or(false);
91 let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
92 let exchange_id = exchange_shared.borrow().id;
93 let account_type = exchange.borrow().account_type;
94 let base_currency = exchange.borrow().base_currency;
95
96 let core = ExecutionClientCore::new(
97 trader_id,
98 ClientId::from(exchange_id.as_str()),
99 Venue::from(exchange_id.as_str()),
100 exchange.borrow().oms_type,
101 account_id,
102 account_type,
103 base_currency,
104 cache.clone(),
105 );
106
107 let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
108
109 if !frozen_account {
110 }
112
113 Self {
114 core,
115 factory,
116 exchange: exchange_shared.downgrade(),
117 cache,
118 clock,
119 queued_events: Rc::new(RefCell::new(Vec::new())),
120 routing,
121 _frozen_account: frozen_account,
122 }
123 }
124
125 fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
126 self.cache
127 .borrow()
128 .order(client_order_id)
129 .cloned()
130 .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
131 }
132
133 pub fn drain_queued_events(&self) {
135 let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
136 let endpoint = MessagingSwitchboard::exec_engine_process();
137 for event in events {
138 msgbus::send_order_event(endpoint, event);
139 }
140 }
141}
142
143#[async_trait(?Send)]
144impl ExecutionClient for BacktestExecutionClient {
145 fn is_connected(&self) -> bool {
146 self.core.is_connected()
147 }
148
149 fn client_id(&self) -> ClientId {
150 self.core.client_id
151 }
152
153 fn account_id(&self) -> AccountId {
154 self.core.account_id
155 }
156
157 fn venue(&self) -> Venue {
158 self.core.venue
159 }
160
161 fn oms_type(&self) -> OmsType {
162 self.core.oms_type
163 }
164
165 fn get_account(&self) -> Option<AccountAny> {
166 self.cache.borrow().account(&self.core.account_id).cloned()
167 }
168
169 fn generate_account_state(
170 &self,
171 balances: Vec<AccountBalance>,
172 margins: Vec<MarginBalance>,
173 reported: bool,
174 ts_event: UnixNanos,
175 ) -> anyhow::Result<()> {
176 let ts_init = self.clock.borrow().timestamp_ns();
177 let state = self
178 .factory
179 .generate_account_state(balances, margins, reported, ts_event, ts_init);
180 let endpoint = MessagingSwitchboard::portfolio_update_account();
181 msgbus::send_account_state(endpoint, &state);
182 Ok(())
183 }
184
185 fn start(&mut self) -> anyhow::Result<()> {
186 self.core.set_connected();
187 log::info!("Backtest execution client started");
188 Ok(())
189 }
190
191 fn stop(&mut self) -> anyhow::Result<()> {
192 self.core.set_disconnected();
193 log::info!("Backtest execution client stopped");
194 Ok(())
195 }
196
197 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
198 let order = self.get_order(&cmd.client_order_id)?;
201 let ts_init = self.clock.borrow().timestamp_ns();
202 let event = self.factory.generate_order_submitted(&order, ts_init);
203 self.queued_events.borrow_mut().push(event);
204
205 if let Some(exchange) = self.exchange.upgrade() {
206 exchange
207 .borrow_mut()
208 .send(TradingCommand::SubmitOrder(cmd.clone()));
209 } else {
210 log::error!("submit_order: SimulatedExchange has been dropped");
211 }
212 Ok(())
213 }
214
215 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
216 let ts_init = self.clock.borrow().timestamp_ns();
217
218 let orders: Vec<OrderAny> = self
219 .cache
220 .borrow()
221 .orders_for_ids(&cmd.order_list.client_order_ids, cmd);
222
223 let mut queued = self.queued_events.borrow_mut();
225 for order in &orders {
226 let event = self.factory.generate_order_submitted(order, ts_init);
227 queued.push(event);
228 }
229 drop(queued);
230
231 if let Some(exchange) = self.exchange.upgrade() {
232 exchange
233 .borrow_mut()
234 .send(TradingCommand::SubmitOrderList(cmd.clone()));
235 } else {
236 log::error!("submit_order_list: SimulatedExchange has been dropped");
237 }
238 Ok(())
239 }
240
241 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
242 if let Some(exchange) = self.exchange.upgrade() {
243 exchange
244 .borrow_mut()
245 .send(TradingCommand::ModifyOrder(cmd.clone()));
246 } else {
247 log::error!("modify_order: SimulatedExchange has been dropped");
248 }
249 Ok(())
250 }
251
252 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
253 if let Some(exchange) = self.exchange.upgrade() {
254 exchange
255 .borrow_mut()
256 .send(TradingCommand::CancelOrder(cmd.clone()));
257 } else {
258 log::error!("cancel_order: SimulatedExchange has been dropped");
259 }
260 Ok(())
261 }
262
263 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
264 if let Some(exchange) = self.exchange.upgrade() {
265 exchange
266 .borrow_mut()
267 .send(TradingCommand::CancelAllOrders(cmd.clone()));
268 } else {
269 log::error!("cancel_all_orders: SimulatedExchange has been dropped");
270 }
271 Ok(())
272 }
273
274 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
275 if let Some(exchange) = self.exchange.upgrade() {
276 exchange
277 .borrow_mut()
278 .send(TradingCommand::BatchCancelOrders(cmd.clone()));
279 } else {
280 log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
281 }
282 Ok(())
283 }
284
285 fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
286 if let Some(exchange) = self.exchange.upgrade() {
287 exchange
288 .borrow_mut()
289 .send(TradingCommand::QueryAccount(cmd.clone()));
290 } else {
291 log::error!("query_account: SimulatedExchange has been dropped");
292 }
293 Ok(())
294 }
295
296 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
297 if let Some(exchange) = self.exchange.upgrade() {
298 exchange
299 .borrow_mut()
300 .send(TradingCommand::QueryOrder(cmd.clone()));
301 } else {
302 log::error!("query_order: SimulatedExchange has been dropped");
303 }
304 Ok(())
305 }
306}