nautilus_live/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution engine for managing execution state and reconciliation.
17//!
18//! This module provides the orchestration layer for live trading execution,
19//! coordinating between the core execution engine and venue-specific clients
20//! while managing state reconciliation.
21
22use std::{
23    cell::{Ref, RefCell},
24    collections::HashSet,
25    fmt::{Debug, Display},
26    rc::Rc,
27    time::Duration,
28};
29
30use nautilus_common::{
31    cache::Cache,
32    clock::Clock,
33    logging::{CMD, EVT, RECV},
34    messages::{ExecutionEvent, ExecutionReport as ExecReportEnum, execution::TradingCommand},
35    msgbus::{self, MessageBus, switchboard},
36};
37use nautilus_execution::client::ExecutionClient;
38use nautilus_model::{
39    events::OrderEventAny,
40    identifiers::{ClientId, ClientOrderId, InstrumentId},
41    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
42};
43
44use crate::{
45    config::LiveExecEngineConfig,
46    reconciliation::{ExecutionReport, ReconciliationConfig, ReconciliationManager},
47};
48
49/// Live execution engine that manages execution state and reconciliation.
50///
51/// The `LiveExecutionEngine` orchestrates:
52/// - Startup reconciliation with all venues.
53/// - Continuous reconciliation of execution reports.
54/// - Inflight order checking and resolution.
55/// - Message routing between venues and the core execution engine.
56#[allow(dead_code)]
57pub struct LiveExecutionEngine {
58    clock: Rc<RefCell<dyn Clock>>,
59    cache: Rc<RefCell<Cache>>,
60    msgbus: Rc<RefCell<MessageBus>>,
61    reconciliation: ReconciliationManager,
62    config: LiveExecEngineConfig,
63    cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
64    cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<TradingCommand>>,
65    evt_tx: tokio::sync::mpsc::UnboundedSender<OrderEventAny>,
66    evt_rx: Option<tokio::sync::mpsc::UnboundedReceiver<OrderEventAny>>,
67    reconciliation_task: Option<tokio::task::JoinHandle<()>>,
68    inflight_check_task: Option<tokio::task::JoinHandle<()>>,
69    open_check_task: Option<tokio::task::JoinHandle<()>>,
70    is_running: bool,
71    shutdown_initiated: bool,
72}
73
74impl Debug for LiveExecutionEngine {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct(stringify!(LiveExecutionEngine))
77            .field("config", &self.config)
78            .field("is_running", &self.is_running)
79            .field("shutdown_initiated", &self.shutdown_initiated)
80            .finish()
81    }
82}
83
84impl LiveExecutionEngine {
85    /// Creates a new [`LiveExecutionEngine`] instance.
86    pub fn new(
87        clock: Rc<RefCell<dyn Clock>>,
88        cache: Rc<RefCell<Cache>>,
89        msgbus: Rc<RefCell<MessageBus>>,
90        config: LiveExecEngineConfig,
91    ) -> Self {
92        let filtered_client_order_ids: HashSet<ClientOrderId> = config
93            .filtered_client_order_ids
94            .clone()
95            .unwrap_or_default()
96            .into_iter()
97            .map(|value| ClientOrderId::from(value.as_str()))
98            .collect();
99
100        let reconciliation_instrument_ids: HashSet<InstrumentId> = config
101            .reconciliation_instrument_ids
102            .clone()
103            .unwrap_or_default()
104            .into_iter()
105            .map(|value| InstrumentId::from(value.as_str()))
106            .collect();
107
108        let reconciliation_config = ReconciliationConfig {
109            lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
110            inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
111            inflight_max_retries: config.inflight_check_retries,
112            filter_unclaimed_external: config.filter_unclaimed_external_orders,
113            generate_missing_orders: config.generate_missing_orders,
114            filtered_client_order_ids,
115            open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
116            open_check_missing_retries: config.open_check_missing_retries,
117            open_check_open_only: config.open_check_open_only,
118            open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
119            filter_position_reports: config.filter_position_reports,
120            reconciliation_instrument_ids,
121        };
122
123        let reconciliation =
124            ReconciliationManager::new(clock.clone(), cache.clone(), reconciliation_config);
125
126        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
127        let (evt_tx, evt_rx) = tokio::sync::mpsc::unbounded_channel();
128
129        Self {
130            clock,
131            cache,
132            msgbus,
133            reconciliation,
134            config,
135            cmd_tx,
136            cmd_rx: Some(cmd_rx),
137            evt_tx,
138            evt_rx: Some(evt_rx),
139            reconciliation_task: None,
140            inflight_check_task: None,
141            open_check_task: None,
142            is_running: false,
143            shutdown_initiated: false,
144        }
145    }
146
147    /// Starts the live execution engine.
148    ///
149    /// This initiates:
150    /// - Startup reconciliation with all venues.
151    /// - Continuous reconciliation tasks.
152    /// - Message processing loops.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if startup reconciliation fails.
157    pub async fn start(&mut self) -> anyhow::Result<()> {
158        if self.is_running {
159            return Ok(());
160        }
161
162        log::info!("Starting LiveExecutionEngine");
163
164        if self.config.reconciliation {
165            self.reconcile_execution_state().await?;
166        }
167
168        self.start_continuous_tasks();
169
170        self.is_running = true;
171        log::info!("LiveExecutionEngine started");
172
173        Ok(())
174    }
175
176    /// Stops the live execution engine.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if stopping tasks fails.
181    pub async fn stop(&mut self) -> anyhow::Result<()> {
182        if !self.is_running {
183            return Ok(());
184        }
185
186        log::info!("Stopping LiveExecutionEngine");
187        self.shutdown_initiated = true;
188
189        // Cancel all tasks
190        if let Some(task) = self.reconciliation_task.take() {
191            task.abort();
192        }
193        if let Some(task) = self.inflight_check_task.take() {
194            task.abort();
195        }
196        if let Some(task) = self.open_check_task.take() {
197            task.abort();
198        }
199
200        self.is_running = false;
201        log::info!("LiveExecutionEngine stopped");
202
203        Ok(())
204    }
205
206    /// Reconciles execution state at startup.
207    #[allow(dead_code)]
208    async fn reconcile_execution_state(&mut self) -> anyhow::Result<()> {
209        log::info!("Running startup reconciliation");
210
211        // Add startup delay to let connections stabilize
212        if self.config.reconciliation_startup_delay_secs > 0.0 {
213            let delay_secs = self.config.reconciliation_startup_delay_secs;
214            log::info!("Waiting {}s before reconciliation", delay_secs);
215            tokio::time::sleep(Duration::from_secs_f64(delay_secs)).await;
216        }
217
218        // TODO: Get all registered clients from ExecutionEngine
219        // For now, this is a placeholder
220        // let clients = self.get_execution_clients();
221        // for client in clients {
222        //     let lookback_mins = self.config.reconciliation_lookback_mins;
223        //     let mass_status = client.generate_mass_status(lookback_mins).await?;
224        //     if let Some(mass_status) = mass_status {
225        //         self.reconcile_execution_mass_status(mass_status).await?;
226        //     }
227        // }
228
229        log::info!("Startup reconciliation complete");
230        Ok(())
231    }
232
233    /// Reconciles execution mass status report.
234    #[allow(dead_code)]
235    async fn reconcile_execution_mass_status(
236        &mut self,
237        mass_status: ExecutionMassStatus,
238    ) -> anyhow::Result<()> {
239        log::info!(
240            "Processing mass status for {}: {} orders, {} fills",
241            mass_status.venue,
242            mass_status.order_reports().len(),
243            mass_status.fill_reports().len()
244        );
245
246        let events = self
247            .reconciliation
248            .reconcile_execution_mass_status(mass_status)
249            .await;
250
251        // Publish events to execution engine
252        for event in events {
253            self.publish_event(event);
254        }
255
256        Ok(())
257    }
258
259    /// Reconciles an execution report.
260    pub fn reconcile_execution_report(&mut self, report: ExecutionReport) {
261        log::debug!("{RECV} {report:?}");
262
263        let events = self.reconciliation.reconcile_report(report);
264
265        // Publish events to execution engine
266        for event in events {
267            self.publish_event(event);
268        }
269    }
270
271    /// Handles a trading command.
272    pub fn handle_command(&mut self, command: TradingCommand) {
273        log::debug!("{CMD} {command:?}");
274
275        // Commands would be forwarded to appropriate execution client
276    }
277
278    /// Publishes an event to the execution engine.
279    fn publish_event(&mut self, event: OrderEventAny) {
280        log::debug!("{EVT} {event:?}");
281
282        let topic = switchboard::get_event_orders_topic(event.strategy_id());
283        msgbus::publish(topic, &event);
284    }
285
286    /// Records local order activity for reconciliation tracking.
287    pub fn record_local_activity(&mut self, event: &OrderEventAny) {
288        let client_order_id = event.client_order_id();
289        let mut ts_event = event.ts_event();
290        if ts_event.is_zero() {
291            ts_event = self.clock.borrow().timestamp_ns();
292        }
293        self.reconciliation
294            .record_local_activity(client_order_id, ts_event);
295    }
296
297    /// Clears reconciliation tracking for an order.
298    pub fn clear_reconciliation_tracking(
299        &mut self,
300        client_order_id: &ClientOrderId,
301        drop_last_query: bool,
302    ) {
303        self.reconciliation
304            .clear_recon_tracking(client_order_id, drop_last_query);
305    }
306
307    /// Starts continuous reconciliation tasks.
308    fn start_continuous_tasks(&mut self) {
309        if self.config.inflight_check_interval_ms > 0 {
310            let interval_ms = self.config.inflight_check_interval_ms as u64;
311            self.start_inflight_check_task(interval_ms);
312        }
313
314        if let Some(interval_secs) = self.config.open_check_interval_secs
315            && interval_secs > 0.0
316        {
317            self.start_open_check_task(interval_secs);
318        }
319    }
320
321    fn start_inflight_check_task(&mut self, _interval_ms: u64) {
322        log::warn!("Inflight check task not yet implemented due to Send constraints");
323    }
324
325    fn start_open_check_task(&mut self, _interval_secs: f64) {
326        log::warn!("Open check task not yet implemented due to Send constraints");
327    }
328
329    // TODO: Implement when LiveExecutionClient is available
330    // fn get_execution_clients(&self) -> Vec<Rc<dyn LiveExecutionClient>> {
331    //     Vec::new()
332    // }
333
334    /// Returns whether the engine is currently running.
335    pub fn is_running(&self) -> bool {
336        self.is_running
337    }
338}
339
340/// Extension trait for live execution clients with message channel support.
341pub trait LiveExecutionClientExt: ExecutionClient {
342    /// Gets the message channel for sending execution events.
343    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent>;
344
345    /// Gets the clock for timestamp generation.
346    fn get_clock(&self) -> Ref<'_, dyn Clock>;
347
348    /// Sends an order event to the execution engine.
349    fn send_order_event(&self, event: OrderEventAny) {
350        if let Err(e) = self
351            .get_message_channel()
352            .send(ExecutionEvent::Order(event))
353        {
354            log_send_error(&self.client_id(), &e);
355        }
356    }
357
358    /// Sends an order status report to the execution engine.
359    fn send_order_status_report(&self, report: OrderStatusReport) {
360        let exec_report = ExecReportEnum::OrderStatus(Box::new(report));
361        if let Err(e) = self
362            .get_message_channel()
363            .send(ExecutionEvent::Report(exec_report))
364        {
365            log_send_error(&self.client_id(), &e);
366        }
367    }
368
369    /// Sends a fill report to the execution engine.
370    fn send_fill_report(&self, report: FillReport) {
371        let exec_report = ExecReportEnum::Fill(Box::new(report));
372        if let Err(e) = self
373            .get_message_channel()
374            .send(ExecutionEvent::Report(exec_report))
375        {
376            log_send_error(&self.client_id(), &e);
377        }
378    }
379
380    /// Sends a position status report to the execution engine.
381    fn send_position_status_report(&self, report: PositionStatusReport) {
382        let exec_report = ExecReportEnum::Position(Box::new(report));
383        if let Err(e) = self
384            .get_message_channel()
385            .send(ExecutionEvent::Report(exec_report))
386        {
387            log_send_error(&self.client_id(), &e);
388        }
389    }
390
391    /// Sends a mass status report to the execution engine.
392    fn send_mass_status(&self, mass_status: ExecutionMassStatus) {
393        let exec_report = ExecReportEnum::Mass(Box::new(mass_status));
394        if let Err(e) = self
395            .get_message_channel()
396            .send(ExecutionEvent::Report(exec_report))
397        {
398            log_send_error(&self.client_id(), &e);
399        }
400    }
401}
402
403#[inline(always)]
404fn log_send_error<E: Display>(client_id: &ClientId, e: &E) {
405    log::error!("ExecutionClient-{client_id} failed to send message: {e}");
406}