nautilus_live/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution engine for managing execution state and reconciliation.
17//!
18//! This module provides the orchestration layer for live trading execution,
19//! coordinating between the core execution engine and venue-specific clients
20//! while managing state reconciliation.
21
22use std::{cell::RefCell, fmt::Debug, rc::Rc, time::Duration};
23
24use nautilus_common::{
25    cache::Cache,
26    clock::Clock,
27    logging::{CMD, EVT, RECV},
28    messages::execution::TradingCommand,
29    msgbus::{self, MessageBus, switchboard},
30};
31use nautilus_model::{events::OrderEventAny, reports::ExecutionMassStatus};
32
33use crate::{
34    config::LiveExecEngineConfig,
35    reconciliation::{ExecutionReport, ReconciliationConfig, ReconciliationManager},
36};
37
38/// Live execution engine that manages execution state and reconciliation.
39///
40/// The `LiveExecutionEngine` orchestrates:
41/// - Startup reconciliation with all venues.
42/// - Continuous reconciliation of execution reports.
43/// - Inflight order checking and resolution.
44/// - Message routing between venues and the core execution engine.
45#[allow(dead_code)]
46pub struct LiveExecutionEngine {
47    clock: Rc<RefCell<dyn Clock>>,
48    cache: Rc<RefCell<Cache>>,
49    msgbus: Rc<RefCell<MessageBus>>,
50    reconciliation: ReconciliationManager,
51    config: LiveExecEngineConfig,
52    cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
53    cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<TradingCommand>>,
54    evt_tx: tokio::sync::mpsc::UnboundedSender<OrderEventAny>,
55    evt_rx: Option<tokio::sync::mpsc::UnboundedReceiver<OrderEventAny>>,
56    reconciliation_task: Option<tokio::task::JoinHandle<()>>,
57    inflight_check_task: Option<tokio::task::JoinHandle<()>>,
58    open_check_task: Option<tokio::task::JoinHandle<()>>,
59    is_running: bool,
60    shutdown_initiated: bool,
61}
62
63impl Debug for LiveExecutionEngine {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("LiveExecutionEngine")
66            .field("config", &self.config)
67            .field("is_running", &self.is_running)
68            .field("shutdown_initiated", &self.shutdown_initiated)
69            .finish()
70    }
71}
72
73impl LiveExecutionEngine {
74    /// Creates a new [`LiveExecutionEngine`] instance.
75    pub fn new(
76        clock: Rc<RefCell<dyn Clock>>,
77        cache: Rc<RefCell<Cache>>,
78        msgbus: Rc<RefCell<MessageBus>>,
79        config: LiveExecEngineConfig,
80    ) -> Self {
81        let reconciliation_config = ReconciliationConfig {
82            lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
83            inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
84            inflight_max_retries: config.inflight_check_retries as u8,
85            filter_unclaimed_external: config.filter_unclaimed_external_orders,
86            generate_missing_orders: config.generate_missing_orders,
87        };
88
89        let reconciliation =
90            ReconciliationManager::new(clock.clone(), cache.clone(), reconciliation_config);
91
92        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
93        let (evt_tx, evt_rx) = tokio::sync::mpsc::unbounded_channel();
94
95        Self {
96            clock,
97            cache,
98            msgbus,
99            reconciliation,
100            config,
101            cmd_tx,
102            cmd_rx: Some(cmd_rx),
103            evt_tx,
104            evt_rx: Some(evt_rx),
105            reconciliation_task: None,
106            inflight_check_task: None,
107            open_check_task: None,
108            is_running: false,
109            shutdown_initiated: false,
110        }
111    }
112
113    /// Starts the live execution engine.
114    ///
115    /// This initiates:
116    /// - Startup reconciliation with all venues.
117    /// - Continuous reconciliation tasks.
118    /// - Message processing loops.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if startup reconciliation fails.
123    pub async fn start(&mut self) -> anyhow::Result<()> {
124        if self.is_running {
125            return Ok(());
126        }
127
128        log::info!("Starting LiveExecutionEngine");
129
130        if self.config.reconciliation {
131            self.reconcile_execution_state().await?;
132        }
133
134        self.start_continuous_tasks();
135
136        self.is_running = true;
137        log::info!("LiveExecutionEngine started");
138
139        Ok(())
140    }
141
142    /// Stops the live execution engine.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if stopping tasks fails.
147    pub async fn stop(&mut self) -> anyhow::Result<()> {
148        if !self.is_running {
149            return Ok(());
150        }
151
152        log::info!("Stopping LiveExecutionEngine");
153        self.shutdown_initiated = true;
154
155        // Cancel all tasks
156        if let Some(task) = self.reconciliation_task.take() {
157            task.abort();
158        }
159        if let Some(task) = self.inflight_check_task.take() {
160            task.abort();
161        }
162        if let Some(task) = self.open_check_task.take() {
163            task.abort();
164        }
165
166        self.is_running = false;
167        log::info!("LiveExecutionEngine stopped");
168
169        Ok(())
170    }
171
172    /// Reconciles execution state at startup.
173    #[allow(dead_code)]
174    async fn reconcile_execution_state(&mut self) -> anyhow::Result<()> {
175        log::info!("Running startup reconciliation");
176
177        // Add startup delay to let connections stabilize
178        if let Some(delay_secs) = self.config.reconciliation_startup_delay_secs {
179            log::info!("Waiting {}s before reconciliation", delay_secs);
180            tokio::time::sleep(Duration::from_secs_f64(delay_secs)).await;
181        }
182
183        // TODO: Get all registered clients from ExecutionEngine
184        // For now, this is a placeholder
185        // let clients = self.get_execution_clients();
186        // for client in clients {
187        //     let lookback_mins = self.config.reconciliation_lookback_mins;
188        //     let mass_status = client.generate_mass_status(lookback_mins).await?;
189        //     if let Some(mass_status) = mass_status {
190        //         self.reconcile_execution_mass_status(mass_status).await?;
191        //     }
192        // }
193
194        log::info!("Startup reconciliation complete");
195        Ok(())
196    }
197
198    /// Reconciles execution mass status report.
199    #[allow(dead_code)]
200    async fn reconcile_execution_mass_status(
201        &mut self,
202        mass_status: ExecutionMassStatus,
203    ) -> anyhow::Result<()> {
204        log::info!(
205            "Processing mass status for {}: {} orders, {} fills",
206            mass_status.venue,
207            mass_status.order_reports().len(),
208            mass_status.fill_reports().len()
209        );
210
211        let events = self
212            .reconciliation
213            .reconcile_execution_mass_status(mass_status)
214            .await;
215
216        // Publish events to execution engine
217        for event in events {
218            self.publish_event(event);
219        }
220
221        Ok(())
222    }
223
224    /// Reconciles an execution report.
225    pub fn reconcile_execution_report(&mut self, report: ExecutionReport) {
226        log::debug!("{RECV} {report:?}");
227
228        let events = self.reconciliation.reconcile_report(report);
229
230        // Publish events to execution engine
231        for event in events {
232            self.publish_event(event);
233        }
234    }
235
236    /// Handles a trading command.
237    pub fn handle_command(&mut self, command: TradingCommand) {
238        log::debug!("{CMD} {command:?}");
239
240        // Commands would be forwarded to appropriate execution client
241    }
242
243    /// Publishes an event to the execution engine.
244    fn publish_event(&mut self, event: OrderEventAny) {
245        log::debug!("{EVT} {event:?}");
246
247        let topic = switchboard::get_event_orders_topic(event.strategy_id());
248        msgbus::publish(topic, &event);
249    }
250
251    /// Starts continuous reconciliation tasks.
252    fn start_continuous_tasks(&mut self) {
253        if self.config.inflight_check_interval_ms > 0 {
254            let interval_ms = self.config.inflight_check_interval_ms as u64;
255            self.start_inflight_check_task(interval_ms);
256        }
257
258        if let Some(interval_secs) = self.config.open_check_interval_secs
259            && interval_secs > 0.0
260        {
261            self.start_open_check_task(interval_secs);
262        }
263    }
264
265    fn start_inflight_check_task(&mut self, _interval_ms: u64) {
266        log::warn!("Inflight check task not yet implemented due to Send constraints");
267    }
268
269    fn start_open_check_task(&mut self, _interval_secs: f64) {
270        log::warn!("Open check task not yet implemented due to Send constraints");
271    }
272
273    // TODO: Implement when LiveExecutionClient is available
274    // fn get_execution_clients(&self) -> Vec<Rc<dyn LiveExecutionClient>> {
275    //     Vec::new()
276    // }
277
278    /// Returns whether the engine is currently running.
279    pub fn is_running(&self) -> bool {
280        self.is_running
281    }
282}