1use std::{
23 cell::{Ref, RefCell},
24 collections::HashSet,
25 fmt::{Debug, Display},
26 rc::Rc,
27 time::Duration,
28};
29
30use nautilus_common::{
31 cache::Cache,
32 clock::Clock,
33 logging::{CMD, EVT, RECV},
34 messages::{ExecutionEvent, ExecutionReport as ExecReportEnum, execution::TradingCommand},
35 msgbus::{self, MessageBus, switchboard},
36};
37use nautilus_execution::client::ExecutionClient;
38use nautilus_model::{
39 events::OrderEventAny,
40 identifiers::{ClientId, ClientOrderId, InstrumentId},
41 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
42};
43
44use crate::{
45 config::LiveExecEngineConfig,
46 reconciliation::{ExecutionReport, ReconciliationConfig, ReconciliationManager},
47};
48
49#[allow(dead_code)]
57pub struct LiveExecutionEngine {
58 clock: Rc<RefCell<dyn Clock>>,
59 cache: Rc<RefCell<Cache>>,
60 msgbus: Rc<RefCell<MessageBus>>,
61 reconciliation: ReconciliationManager,
62 config: LiveExecEngineConfig,
63 cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
64 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<TradingCommand>>,
65 evt_tx: tokio::sync::mpsc::UnboundedSender<OrderEventAny>,
66 evt_rx: Option<tokio::sync::mpsc::UnboundedReceiver<OrderEventAny>>,
67 reconciliation_task: Option<tokio::task::JoinHandle<()>>,
68 inflight_check_task: Option<tokio::task::JoinHandle<()>>,
69 open_check_task: Option<tokio::task::JoinHandle<()>>,
70 is_running: bool,
71 shutdown_initiated: bool,
72}
73
74impl Debug for LiveExecutionEngine {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct(stringify!(LiveExecutionEngine))
77 .field("config", &self.config)
78 .field("is_running", &self.is_running)
79 .field("shutdown_initiated", &self.shutdown_initiated)
80 .finish()
81 }
82}
83
84impl LiveExecutionEngine {
85 pub fn new(
87 clock: Rc<RefCell<dyn Clock>>,
88 cache: Rc<RefCell<Cache>>,
89 msgbus: Rc<RefCell<MessageBus>>,
90 config: LiveExecEngineConfig,
91 ) -> Self {
92 let filtered_client_order_ids: HashSet<ClientOrderId> = config
93 .filtered_client_order_ids
94 .clone()
95 .unwrap_or_default()
96 .into_iter()
97 .map(|value| ClientOrderId::from(value.as_str()))
98 .collect();
99
100 let reconciliation_instrument_ids: HashSet<InstrumentId> = config
101 .reconciliation_instrument_ids
102 .clone()
103 .unwrap_or_default()
104 .into_iter()
105 .map(|value| InstrumentId::from(value.as_str()))
106 .collect();
107
108 let reconciliation_config = ReconciliationConfig {
109 lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
110 inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
111 inflight_max_retries: config.inflight_check_retries,
112 filter_unclaimed_external: config.filter_unclaimed_external_orders,
113 generate_missing_orders: config.generate_missing_orders,
114 filtered_client_order_ids,
115 open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
116 open_check_missing_retries: config.open_check_missing_retries,
117 open_check_open_only: config.open_check_open_only,
118 open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
119 filter_position_reports: config.filter_position_reports,
120 reconciliation_instrument_ids,
121 };
122
123 let reconciliation =
124 ReconciliationManager::new(clock.clone(), cache.clone(), reconciliation_config);
125
126 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
127 let (evt_tx, evt_rx) = tokio::sync::mpsc::unbounded_channel();
128
129 Self {
130 clock,
131 cache,
132 msgbus,
133 reconciliation,
134 config,
135 cmd_tx,
136 cmd_rx: Some(cmd_rx),
137 evt_tx,
138 evt_rx: Some(evt_rx),
139 reconciliation_task: None,
140 inflight_check_task: None,
141 open_check_task: None,
142 is_running: false,
143 shutdown_initiated: false,
144 }
145 }
146
147 pub async fn start(&mut self) -> anyhow::Result<()> {
158 if self.is_running {
159 return Ok(());
160 }
161
162 log::info!("Starting LiveExecutionEngine");
163
164 if self.config.reconciliation {
165 self.reconcile_execution_state().await?;
166 }
167
168 self.start_continuous_tasks();
169
170 self.is_running = true;
171 log::info!("LiveExecutionEngine started");
172
173 Ok(())
174 }
175
176 pub async fn stop(&mut self) -> anyhow::Result<()> {
182 if !self.is_running {
183 return Ok(());
184 }
185
186 log::info!("Stopping LiveExecutionEngine");
187 self.shutdown_initiated = true;
188
189 if let Some(task) = self.reconciliation_task.take() {
191 task.abort();
192 }
193 if let Some(task) = self.inflight_check_task.take() {
194 task.abort();
195 }
196 if let Some(task) = self.open_check_task.take() {
197 task.abort();
198 }
199
200 self.is_running = false;
201 log::info!("LiveExecutionEngine stopped");
202
203 Ok(())
204 }
205
206 #[allow(dead_code)]
208 async fn reconcile_execution_state(&mut self) -> anyhow::Result<()> {
209 log::info!("Running startup reconciliation");
210
211 if self.config.reconciliation_startup_delay_secs > 0.0 {
213 let delay_secs = self.config.reconciliation_startup_delay_secs;
214 log::info!("Waiting {}s before reconciliation", delay_secs);
215 tokio::time::sleep(Duration::from_secs_f64(delay_secs)).await;
216 }
217
218 log::info!("Startup reconciliation complete");
230 Ok(())
231 }
232
233 #[allow(dead_code)]
235 async fn reconcile_execution_mass_status(
236 &mut self,
237 mass_status: ExecutionMassStatus,
238 ) -> anyhow::Result<()> {
239 log::info!(
240 "Processing mass status for {}: {} orders, {} fills",
241 mass_status.venue,
242 mass_status.order_reports().len(),
243 mass_status.fill_reports().len()
244 );
245
246 let events = self
247 .reconciliation
248 .reconcile_execution_mass_status(mass_status)
249 .await;
250
251 for event in events {
253 self.publish_event(event);
254 }
255
256 Ok(())
257 }
258
259 pub fn reconcile_execution_report(&mut self, report: ExecutionReport) {
261 log::debug!("{RECV} {report:?}");
262
263 let events = self.reconciliation.reconcile_report(report);
264
265 for event in events {
267 self.publish_event(event);
268 }
269 }
270
271 pub fn handle_command(&mut self, command: TradingCommand) {
273 log::debug!("{CMD} {command:?}");
274
275 }
277
278 fn publish_event(&mut self, event: OrderEventAny) {
280 log::debug!("{EVT} {event:?}");
281
282 let topic = switchboard::get_event_orders_topic(event.strategy_id());
283 msgbus::publish(topic, &event);
284 }
285
286 pub fn record_local_activity(&mut self, event: &OrderEventAny) {
288 let client_order_id = event.client_order_id();
289 let mut ts_event = event.ts_event();
290 if ts_event.is_zero() {
291 ts_event = self.clock.borrow().timestamp_ns();
292 }
293 self.reconciliation
294 .record_local_activity(client_order_id, ts_event);
295 }
296
297 pub fn clear_reconciliation_tracking(
299 &mut self,
300 client_order_id: &ClientOrderId,
301 drop_last_query: bool,
302 ) {
303 self.reconciliation
304 .clear_recon_tracking(client_order_id, drop_last_query);
305 }
306
307 fn start_continuous_tasks(&mut self) {
309 if self.config.inflight_check_interval_ms > 0 {
310 let interval_ms = self.config.inflight_check_interval_ms as u64;
311 self.start_inflight_check_task(interval_ms);
312 }
313
314 if let Some(interval_secs) = self.config.open_check_interval_secs
315 && interval_secs > 0.0
316 {
317 self.start_open_check_task(interval_secs);
318 }
319 }
320
321 fn start_inflight_check_task(&mut self, _interval_ms: u64) {
322 log::warn!("Inflight check task not yet implemented due to Send constraints");
323 }
324
325 fn start_open_check_task(&mut self, _interval_secs: f64) {
326 log::warn!("Open check task not yet implemented due to Send constraints");
327 }
328
329 pub fn is_running(&self) -> bool {
336 self.is_running
337 }
338}
339
340pub trait LiveExecutionClientExt: ExecutionClient {
342 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent>;
344
345 fn get_clock(&self) -> Ref<'_, dyn Clock>;
347
348 fn send_order_event(&self, event: OrderEventAny) {
350 if let Err(e) = self
351 .get_message_channel()
352 .send(ExecutionEvent::Order(event))
353 {
354 log_send_error(&self.client_id(), &e);
355 }
356 }
357
358 fn send_order_status_report(&self, report: OrderStatusReport) {
360 let exec_report = ExecReportEnum::OrderStatus(Box::new(report));
361 if let Err(e) = self
362 .get_message_channel()
363 .send(ExecutionEvent::Report(exec_report))
364 {
365 log_send_error(&self.client_id(), &e);
366 }
367 }
368
369 fn send_fill_report(&self, report: FillReport) {
371 let exec_report = ExecReportEnum::Fill(Box::new(report));
372 if let Err(e) = self
373 .get_message_channel()
374 .send(ExecutionEvent::Report(exec_report))
375 {
376 log_send_error(&self.client_id(), &e);
377 }
378 }
379
380 fn send_position_status_report(&self, report: PositionStatusReport) {
382 let exec_report = ExecReportEnum::Position(Box::new(report));
383 if let Err(e) = self
384 .get_message_channel()
385 .send(ExecutionEvent::Report(exec_report))
386 {
387 log_send_error(&self.client_id(), &e);
388 }
389 }
390
391 fn send_mass_status(&self, mass_status: ExecutionMassStatus) {
393 let exec_report = ExecReportEnum::Mass(Box::new(mass_status));
394 if let Err(e) = self
395 .get_message_channel()
396 .send(ExecutionEvent::Report(exec_report))
397 {
398 log_send_error(&self.client_id(), &e);
399 }
400 }
401}
402
403#[inline(always)]
404fn log_send_error<E: Display>(client_id: &ClientId, e: &E) {
405 log::error!("ExecutionClient-{client_id} failed to send message: {e}");
406}