nautilus_live/
execution.rs1use std::{cell::RefCell, fmt::Debug, rc::Rc, time::Duration};
23
24use nautilus_common::{
25 cache::Cache,
26 clock::Clock,
27 logging::{CMD, EVT, RECV},
28 messages::execution::TradingCommand,
29 msgbus::{self, MessageBus, switchboard},
30};
31use nautilus_model::{events::OrderEventAny, reports::ExecutionMassStatus};
32
33use crate::{
34 config::LiveExecEngineConfig,
35 reconciliation::{ExecutionReport, ReconciliationConfig, ReconciliationManager},
36};
37
38#[allow(dead_code)]
46pub struct LiveExecutionEngine {
47 clock: Rc<RefCell<dyn Clock>>,
48 cache: Rc<RefCell<Cache>>,
49 msgbus: Rc<RefCell<MessageBus>>,
50 reconciliation: ReconciliationManager,
51 config: LiveExecEngineConfig,
52 cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
53 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<TradingCommand>>,
54 evt_tx: tokio::sync::mpsc::UnboundedSender<OrderEventAny>,
55 evt_rx: Option<tokio::sync::mpsc::UnboundedReceiver<OrderEventAny>>,
56 reconciliation_task: Option<tokio::task::JoinHandle<()>>,
57 inflight_check_task: Option<tokio::task::JoinHandle<()>>,
58 open_check_task: Option<tokio::task::JoinHandle<()>>,
59 is_running: bool,
60 shutdown_initiated: bool,
61}
62
63impl Debug for LiveExecutionEngine {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("LiveExecutionEngine")
66 .field("config", &self.config)
67 .field("is_running", &self.is_running)
68 .field("shutdown_initiated", &self.shutdown_initiated)
69 .finish()
70 }
71}
72
73impl LiveExecutionEngine {
74 pub fn new(
76 clock: Rc<RefCell<dyn Clock>>,
77 cache: Rc<RefCell<Cache>>,
78 msgbus: Rc<RefCell<MessageBus>>,
79 config: LiveExecEngineConfig,
80 ) -> Self {
81 let reconciliation_config = ReconciliationConfig {
82 lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
83 inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
84 inflight_max_retries: config.inflight_check_retries as u8,
85 filter_unclaimed_external: config.filter_unclaimed_external_orders,
86 generate_missing_orders: config.generate_missing_orders,
87 };
88
89 let reconciliation =
90 ReconciliationManager::new(clock.clone(), cache.clone(), reconciliation_config);
91
92 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
93 let (evt_tx, evt_rx) = tokio::sync::mpsc::unbounded_channel();
94
95 Self {
96 clock,
97 cache,
98 msgbus,
99 reconciliation,
100 config,
101 cmd_tx,
102 cmd_rx: Some(cmd_rx),
103 evt_tx,
104 evt_rx: Some(evt_rx),
105 reconciliation_task: None,
106 inflight_check_task: None,
107 open_check_task: None,
108 is_running: false,
109 shutdown_initiated: false,
110 }
111 }
112
113 pub async fn start(&mut self) -> anyhow::Result<()> {
124 if self.is_running {
125 return Ok(());
126 }
127
128 log::info!("Starting LiveExecutionEngine");
129
130 if self.config.reconciliation {
131 self.reconcile_execution_state().await?;
132 }
133
134 self.start_continuous_tasks();
135
136 self.is_running = true;
137 log::info!("LiveExecutionEngine started");
138
139 Ok(())
140 }
141
142 pub async fn stop(&mut self) -> anyhow::Result<()> {
148 if !self.is_running {
149 return Ok(());
150 }
151
152 log::info!("Stopping LiveExecutionEngine");
153 self.shutdown_initiated = true;
154
155 if let Some(task) = self.reconciliation_task.take() {
157 task.abort();
158 }
159 if let Some(task) = self.inflight_check_task.take() {
160 task.abort();
161 }
162 if let Some(task) = self.open_check_task.take() {
163 task.abort();
164 }
165
166 self.is_running = false;
167 log::info!("LiveExecutionEngine stopped");
168
169 Ok(())
170 }
171
172 #[allow(dead_code)]
174 async fn reconcile_execution_state(&mut self) -> anyhow::Result<()> {
175 log::info!("Running startup reconciliation");
176
177 if let Some(delay_secs) = self.config.reconciliation_startup_delay_secs {
179 log::info!("Waiting {}s before reconciliation", delay_secs);
180 tokio::time::sleep(Duration::from_secs_f64(delay_secs)).await;
181 }
182
183 log::info!("Startup reconciliation complete");
195 Ok(())
196 }
197
198 #[allow(dead_code)]
200 async fn reconcile_execution_mass_status(
201 &mut self,
202 mass_status: ExecutionMassStatus,
203 ) -> anyhow::Result<()> {
204 log::info!(
205 "Processing mass status for {}: {} orders, {} fills",
206 mass_status.venue,
207 mass_status.order_reports().len(),
208 mass_status.fill_reports().len()
209 );
210
211 let events = self
212 .reconciliation
213 .reconcile_execution_mass_status(mass_status)
214 .await;
215
216 for event in events {
218 self.publish_event(event);
219 }
220
221 Ok(())
222 }
223
224 pub fn reconcile_execution_report(&mut self, report: ExecutionReport) {
226 log::debug!("{RECV} {report:?}");
227
228 let events = self.reconciliation.reconcile_report(report);
229
230 for event in events {
232 self.publish_event(event);
233 }
234 }
235
236 pub fn handle_command(&mut self, command: TradingCommand) {
238 log::debug!("{CMD} {command:?}");
239
240 }
242
243 fn publish_event(&mut self, event: OrderEventAny) {
245 log::debug!("{EVT} {event:?}");
246
247 let topic = switchboard::get_event_orders_topic(event.strategy_id());
248 msgbus::publish(topic, &event);
249 }
250
251 fn start_continuous_tasks(&mut self) {
253 if self.config.inflight_check_interval_ms > 0 {
254 let interval_ms = self.config.inflight_check_interval_ms as u64;
255 self.start_inflight_check_task(interval_ms);
256 }
257
258 if let Some(interval_secs) = self.config.open_check_interval_secs
259 && interval_secs > 0.0
260 {
261 self.start_open_check_task(interval_secs);
262 }
263 }
264
265 fn start_inflight_check_task(&mut self, _interval_ms: u64) {
266 log::warn!("Inflight check task not yet implemented due to Send constraints");
267 }
268
269 fn start_open_check_task(&mut self, _interval_secs: f64) {
270 log::warn!("Open check task not yet implemented due to Send constraints");
271 }
272
273 pub fn is_running(&self) -> bool {
280 self.is_running
281 }
282}