nautilus_live/execution/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Execution state manager for live trading.
17//!
18//! This module provides the execution manager for reconciling execution state between
19//! the local cache and connected venues, as well as purging old state during live trading.
20
21use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25    cache::Cache,
26    clock::Clock,
27    messages::execution::report::{GenerateOrderStatusReport, GeneratePositionReports},
28};
29use nautilus_core::{UUID4, UnixNanos};
30use nautilus_model::{
31    enums::OrderStatus,
32    events::{
33        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
34        OrderTriggered,
35    },
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    orders::{Order, OrderAny},
39    position::Position,
40    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
41    types::Quantity,
42};
43use rust_decimal::Decimal;
44use ustr::Ustr;
45
46use crate::{config::LiveExecEngineConfig, execution::client::LiveExecutionClient};
47
48/// Configuration for execution manager.
49#[derive(Debug, Clone)]
50pub struct ExecutionManagerConfig {
51    /// If reconciliation is active at start-up.
52    pub reconciliation: bool,
53    /// The delay (seconds) before starting reconciliation at startup.
54    pub reconciliation_startup_delay_secs: f64,
55    /// Number of minutes to look back during reconciliation.
56    pub lookback_mins: Option<u64>,
57    /// Instrument IDs to include during reconciliation (empty => all).
58    pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
59    /// Whether to filter unclaimed external orders.
60    pub filter_unclaimed_external: bool,
61    /// Whether to filter position status reports during reconciliation.
62    pub filter_position_reports: bool,
63    /// Client order IDs excluded from reconciliation.
64    pub filtered_client_order_ids: AHashSet<ClientOrderId>,
65    /// Whether to generate missing orders from reports.
66    pub generate_missing_orders: bool,
67    /// The interval (milliseconds) between checking whether in-flight orders have exceeded their threshold.
68    pub inflight_check_interval_ms: u32,
69    /// Threshold in milliseconds for inflight order checks.
70    pub inflight_threshold_ms: u64,
71    /// Maximum number of retries for inflight checks.
72    pub inflight_max_retries: u32,
73    /// The interval (seconds) between checks for open orders at the venue.
74    pub open_check_interval_secs: Option<f64>,
75    /// The lookback minutes for open order checks.
76    pub open_check_lookback_mins: Option<u64>,
77    /// Threshold in nanoseconds before acting on venue discrepancies for open orders.
78    pub open_check_threshold_ns: u64,
79    /// Maximum retries before resolving an open order missing at the venue.
80    pub open_check_missing_retries: u32,
81    /// Whether open-order polling should only request open orders from the venue.
82    pub open_check_open_only: bool,
83    /// The maximum number of single-order queries per consistency check cycle.
84    pub max_single_order_queries_per_cycle: u32,
85    /// The delay (milliseconds) between consecutive single-order queries.
86    pub single_order_query_delay_ms: u32,
87    /// The interval (seconds) between checks for open positions at the venue.
88    pub position_check_interval_secs: Option<f64>,
89    /// The lookback minutes for position consistency checks.
90    pub position_check_lookback_mins: u64,
91    /// Threshold in nanoseconds before acting on venue discrepancies for positions.
92    pub position_check_threshold_ns: u64,
93    /// The time buffer (minutes) before closed orders can be purged.
94    pub purge_closed_orders_buffer_mins: Option<u32>,
95    /// The time buffer (minutes) before closed positions can be purged.
96    pub purge_closed_positions_buffer_mins: Option<u32>,
97    /// The time buffer (minutes) before account events can be purged.
98    pub purge_account_events_lookback_mins: Option<u32>,
99    /// If purge operations should also delete from the backing database.
100    pub purge_from_database: bool,
101}
102
103impl Default for ExecutionManagerConfig {
104    fn default() -> Self {
105        Self {
106            reconciliation: true,
107            reconciliation_startup_delay_secs: 10.0,
108            lookback_mins: Some(60),
109            reconciliation_instrument_ids: AHashSet::new(),
110            filter_unclaimed_external: false,
111            filter_position_reports: false,
112            filtered_client_order_ids: AHashSet::new(),
113            generate_missing_orders: true,
114            inflight_check_interval_ms: 2_000,
115            inflight_threshold_ms: 5_000,
116            inflight_max_retries: 5,
117            open_check_interval_secs: None,
118            open_check_lookback_mins: Some(60),
119            open_check_threshold_ns: 5_000_000_000,
120            open_check_missing_retries: 5,
121            open_check_open_only: true,
122            max_single_order_queries_per_cycle: 5,
123            single_order_query_delay_ms: 100,
124            position_check_interval_secs: None,
125            position_check_lookback_mins: 60,
126            position_check_threshold_ns: 60_000_000_000,
127            purge_closed_orders_buffer_mins: None,
128            purge_closed_positions_buffer_mins: None,
129            purge_account_events_lookback_mins: None,
130            purge_from_database: false,
131        }
132    }
133}
134
135impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
136    fn from(config: &LiveExecEngineConfig) -> Self {
137        let filtered_client_order_ids: AHashSet<ClientOrderId> = config
138            .filtered_client_order_ids
139            .clone()
140            .unwrap_or_default()
141            .into_iter()
142            .map(|value| ClientOrderId::from(value.as_str()))
143            .collect();
144
145        let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
146            .reconciliation_instrument_ids
147            .clone()
148            .unwrap_or_default()
149            .into_iter()
150            .map(|value| InstrumentId::from(value.as_str()))
151            .collect();
152
153        Self {
154            reconciliation: config.reconciliation,
155            reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
156            lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
157            reconciliation_instrument_ids,
158            filter_unclaimed_external: config.filter_unclaimed_external_orders,
159            filter_position_reports: config.filter_position_reports,
160            filtered_client_order_ids,
161            generate_missing_orders: config.generate_missing_orders,
162            inflight_check_interval_ms: config.inflight_check_interval_ms,
163            inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
164            inflight_max_retries: config.inflight_check_retries,
165            open_check_interval_secs: config.open_check_interval_secs,
166            open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
167            open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
168            open_check_missing_retries: config.open_check_missing_retries,
169            open_check_open_only: config.open_check_open_only,
170            max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
171            single_order_query_delay_ms: config.single_order_query_delay_ms,
172            position_check_interval_secs: config.position_check_interval_secs,
173            position_check_lookback_mins: config.position_check_lookback_mins as u64,
174            position_check_threshold_ns: (config.position_check_threshold_ms as u64) * 1_000_000,
175            purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
176            purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
177            purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
178            purge_from_database: config.purge_from_database,
179        }
180    }
181}
182
183/// Execution report for continuous reconciliation.
184/// This is a simplified report type used during runtime reconciliation.
185#[derive(Debug, Clone)]
186pub struct ExecutionReport {
187    pub client_order_id: ClientOrderId,
188    pub venue_order_id: Option<VenueOrderId>,
189    pub status: OrderStatus,
190    pub filled_qty: Quantity,
191    pub avg_px: Option<f64>,
192    pub ts_event: UnixNanos,
193}
194
195/// Information about an inflight order check.
196#[derive(Debug, Clone)]
197struct InflightCheck {
198    #[allow(dead_code)]
199    pub client_order_id: ClientOrderId,
200    pub ts_submitted: UnixNanos,
201    pub retry_count: u32,
202    pub last_query_ts: Option<UnixNanos>,
203}
204
205/// Manager for execution state.
206///
207/// The `ExecutionManager` handles:
208/// - Startup reconciliation to align state on system start.
209/// - Continuous reconciliation of inflight orders.
210/// - External order discovery and claiming.
211/// - Fill report processing and validation.
212/// - Purging of old orders, positions, and account events.
213///
214/// # Thread Safety
215///
216/// This struct is **not thread-safe** and is designed for single-threaded use within
217/// an async runtime. Internal state is managed using `AHashMap` without synchronization,
218/// and the `clock` and `cache` use `Rc<RefCell<>>` which provide runtime borrow checking
219/// but no thread-safety guarantees.
220///
221/// If concurrent access is required, this struct must be wrapped in `Arc<Mutex<>>` or
222/// similar synchronization primitives. Alternatively, ensure that all methods are called
223/// from the same thread/task in the async runtime.
224///
225/// **Warning:** Concurrent mutable access to internal AHashMaps or concurrent borrows
226/// of `RefCell` contents will cause runtime panics.
227#[derive(Clone)]
228pub struct ExecutionManager {
229    clock: Rc<RefCell<dyn Clock>>,
230    cache: Rc<RefCell<Cache>>,
231    config: ExecutionManagerConfig,
232    inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
233    external_order_claims: AHashMap<InstrumentId, StrategyId>,
234    processed_fills: AHashMap<TradeId, ClientOrderId>,
235    recon_check_retries: AHashMap<ClientOrderId, u32>,
236    ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
237    order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
238    position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
239    recent_fills_cache: AHashMap<TradeId, UnixNanos>,
240}
241
242impl Debug for ExecutionManager {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct(stringify!(ExecutionManager))
245            .field("config", &self.config)
246            .field("inflight_checks", &self.inflight_checks)
247            .field("external_order_claims", &self.external_order_claims)
248            .field("processed_fills", &self.processed_fills)
249            .field("recon_check_retries", &self.recon_check_retries)
250            .finish()
251    }
252}
253
254impl ExecutionManager {
255    /// Creates a new [`ExecutionManager`] instance.
256    pub fn new(
257        clock: Rc<RefCell<dyn Clock>>,
258        cache: Rc<RefCell<Cache>>,
259        config: ExecutionManagerConfig,
260    ) -> Self {
261        Self {
262            clock,
263            cache,
264            config,
265            inflight_checks: AHashMap::new(),
266            external_order_claims: AHashMap::new(),
267            processed_fills: AHashMap::new(),
268            recon_check_retries: AHashMap::new(),
269            ts_last_query: AHashMap::new(),
270            order_local_activity_ns: AHashMap::new(),
271            position_local_activity_ns: AHashMap::new(),
272            recent_fills_cache: AHashMap::new(),
273        }
274    }
275
276    /// Reconciles orders and fills from a mass status report.
277    pub async fn reconcile_execution_mass_status(
278        &mut self,
279        mass_status: ExecutionMassStatus,
280    ) -> Vec<OrderEventAny> {
281        let mut events = Vec::new();
282
283        // Process order status reports first
284        for report in mass_status.order_reports().values() {
285            if let Some(client_order_id) = &report.client_order_id {
286                if let Some(order) = self.get_order(client_order_id) {
287                    let mut order = order;
288                    if let Some(event) = self.reconcile_order_report(&mut order, report) {
289                        events.push(event);
290                    }
291                }
292            } else if !self.config.filter_unclaimed_external
293                && let Some(event) = self.handle_external_order(report, &mass_status.account_id)
294            {
295                events.push(event);
296            }
297        }
298
299        // Process fill reports
300        for fills in mass_status.fill_reports().values() {
301            for fill in fills {
302                if let Some(client_order_id) = &fill.client_order_id
303                    && let Some(order) = self.get_order(client_order_id)
304                {
305                    let mut order = order;
306                    let instrument_id = order.instrument_id();
307
308                    if let Some(instrument) = self.get_instrument(&instrument_id)
309                        && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
310                    {
311                        events.push(event);
312                    }
313                }
314            }
315        }
316
317        events
318    }
319
320    /// Reconciles a single execution report during runtime.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the average price cannot be converted to a valid `Decimal`.
325    pub fn reconcile_report(
326        &mut self,
327        report: ExecutionReport,
328    ) -> anyhow::Result<Vec<OrderEventAny>> {
329        let mut events = Vec::new();
330
331        self.clear_recon_tracking(&report.client_order_id, true);
332
333        if let Some(order) = self.get_order(&report.client_order_id) {
334            let mut order = order;
335            let mut order_report = OrderStatusReport::new(
336                order.account_id().unwrap_or_default(),
337                order.instrument_id(),
338                Some(report.client_order_id),
339                report.venue_order_id.unwrap_or_default(),
340                order.order_side(),
341                order.order_type(),
342                order.time_in_force(),
343                report.status,
344                order.quantity(),
345                report.filled_qty,
346                report.ts_event, // Use ts_event as ts_accepted
347                report.ts_event, // Use ts_event as ts_last
348                self.clock.borrow().timestamp_ns(),
349                Some(UUID4::new()),
350            );
351
352            if let Some(avg_px) = report.avg_px {
353                order_report = order_report.with_avg_px(avg_px)?;
354            }
355
356            if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
357                events.push(event);
358            }
359        }
360
361        Ok(events)
362    }
363
364    /// Checks inflight orders and returns events for any that need reconciliation.
365    pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
366        let mut events = Vec::new();
367        let current_time = self.clock.borrow().timestamp_ns();
368        let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
369
370        let mut to_check = Vec::new();
371        for (client_order_id, check) in &self.inflight_checks {
372            if current_time - check.ts_submitted > threshold_ns {
373                to_check.push(*client_order_id);
374            }
375        }
376
377        for client_order_id in to_check {
378            if self
379                .config
380                .filtered_client_order_ids
381                .contains(&client_order_id)
382            {
383                continue;
384            }
385
386            if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
387                if let Some(last_query_ts) = check.last_query_ts
388                    && current_time - last_query_ts < threshold_ns
389                {
390                    continue;
391                }
392
393                check.retry_count += 1;
394                check.last_query_ts = Some(current_time);
395                self.ts_last_query.insert(client_order_id, current_time);
396                self.recon_check_retries
397                    .insert(client_order_id, check.retry_count);
398
399                if check.retry_count >= self.config.inflight_max_retries {
400                    // Generate rejection after max retries
401                    if let Some(order) = self.get_order(&client_order_id) {
402                        events.push(self.create_order_rejected(&order, Some("INFLIGHT_TIMEOUT")));
403                    }
404                    // Remove from inflight checks regardless of whether order exists
405                    self.clear_recon_tracking(&client_order_id, true);
406                }
407            }
408        }
409
410        events
411    }
412
413    /// Checks open orders consistency between cache and venue.
414    ///
415    /// This method validates that open orders in the cache match the venue's state,
416    /// comparing order status and filled quantities, and generating reconciliation
417    /// events for any discrepancies detected.
418    ///
419    /// # Returns
420    ///
421    /// A vector of order events generated to reconcile discrepancies.
422    pub async fn check_open_orders(
423        &mut self,
424        clients: &[Rc<dyn LiveExecutionClient>],
425    ) -> Vec<OrderEventAny> {
426        log::debug!("Checking order consistency between cached-state and venues");
427
428        let filtered_orders: Vec<OrderAny> = {
429            let cache = self.cache.borrow();
430            let open_orders = cache.orders_open(None, None, None, None);
431
432            if !self.config.reconciliation_instrument_ids.is_empty() {
433                open_orders
434                    .iter()
435                    .filter(|o| {
436                        self.config
437                            .reconciliation_instrument_ids
438                            .contains(&o.instrument_id())
439                    })
440                    .map(|o| (*o).clone())
441                    .collect()
442            } else {
443                open_orders.iter().map(|o| (*o).clone()).collect()
444            }
445        };
446
447        log::debug!(
448            "Found {} order{} open in cache",
449            filtered_orders.len(),
450            if filtered_orders.len() == 1 { "" } else { "s" }
451        );
452
453        let mut all_reports = Vec::new();
454        let mut venue_reported_ids = AHashSet::new();
455
456        for client in clients {
457            let cmd = GenerateOrderStatusReport::new(
458                UUID4::new(),
459                self.clock.borrow().timestamp_ns(),
460                None, // instrument_id - query all
461                None, // client_order_id
462                None, // venue_order_id
463            );
464
465            match client.generate_order_status_reports(&cmd).await {
466                Ok(reports) => {
467                    for report in reports {
468                        if let Some(client_order_id) = &report.client_order_id {
469                            venue_reported_ids.insert(*client_order_id);
470                        }
471                        all_reports.push(report);
472                    }
473                }
474                Err(e) => {
475                    log::error!(
476                        "Failed to query order reports from {}: {e}",
477                        client.client_id()
478                    );
479                }
480            }
481        }
482
483        // Reconcile reports against cached orders
484        let mut events = Vec::new();
485        for report in all_reports {
486            if let Some(client_order_id) = &report.client_order_id
487                && let Some(mut order) = self.get_order(client_order_id)
488                && let Some(event) = self.reconcile_order_report(&mut order, &report)
489            {
490                events.push(event);
491            }
492        }
493
494        // Handle orders missing at venue
495        if !self.config.open_check_open_only {
496            let cached_ids: AHashSet<ClientOrderId> = filtered_orders
497                .iter()
498                .map(|o| o.client_order_id())
499                .collect();
500            let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
501                .difference(&venue_reported_ids)
502                .copied()
503                .collect();
504
505            for client_order_id in missing_at_venue {
506                events.extend(self.handle_missing_order(client_order_id));
507            }
508        }
509
510        events
511    }
512
513    /// Checks position consistency between cache and venue.
514    ///
515    /// This method validates that positions in the cache match the venue's state,
516    /// detecting position drift and querying for missing fills when discrepancies
517    /// are found.
518    ///
519    /// # Returns
520    ///
521    /// A vector of fill events generated to reconcile position discrepancies.
522    pub async fn check_positions_consistency(
523        &mut self,
524        clients: &[Rc<dyn LiveExecutionClient>],
525    ) -> Vec<OrderEventAny> {
526        log::debug!("Checking position consistency between cached-state and venues");
527
528        let open_positions = {
529            let cache = self.cache.borrow();
530            let positions = cache.positions_open(None, None, None, None);
531
532            if !self.config.reconciliation_instrument_ids.is_empty() {
533                positions
534                    .iter()
535                    .filter(|p| {
536                        self.config
537                            .reconciliation_instrument_ids
538                            .contains(&p.instrument_id)
539                    })
540                    .map(|p| (*p).clone())
541                    .collect::<Vec<_>>()
542            } else {
543                positions.iter().map(|p| (*p).clone()).collect()
544            }
545        };
546
547        log::debug!(
548            "Found {} position{} to check",
549            open_positions.len(),
550            if open_positions.len() == 1 { "" } else { "s" }
551        );
552
553        // Query venue for position reports
554        let mut venue_positions = AHashMap::new();
555
556        for client in clients {
557            let cmd = GeneratePositionReports::new(
558                UUID4::new(),
559                self.clock.borrow().timestamp_ns(),
560                None, // instrument_id - query all
561                None, // start
562                None, // end
563            );
564
565            match client.generate_position_status_reports(&cmd).await {
566                Ok(reports) => {
567                    for report in reports {
568                        venue_positions.insert(report.instrument_id, report);
569                    }
570                }
571                Err(e) => {
572                    log::error!(
573                        "Failed to query position reports from {}: {e}",
574                        client.client_id()
575                    );
576                }
577            }
578        }
579
580        // Check for discrepancies
581        let mut events = Vec::new();
582
583        for position in &open_positions {
584            // Skip if not in filter
585            if !self.config.reconciliation_instrument_ids.is_empty()
586                && !self
587                    .config
588                    .reconciliation_instrument_ids
589                    .contains(&position.instrument_id)
590            {
591                continue;
592            }
593
594            let venue_report = venue_positions.get(&position.instrument_id);
595
596            if let Some(discrepancy_events) =
597                self.check_position_discrepancy(position, venue_report)
598            {
599                events.extend(discrepancy_events);
600            }
601        }
602
603        events
604    }
605
606    /// Registers an order as inflight for tracking.
607    pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
608        let ts_submitted = self.clock.borrow().timestamp_ns();
609        self.inflight_checks.insert(
610            client_order_id,
611            InflightCheck {
612                client_order_id,
613                ts_submitted,
614                retry_count: 0,
615                last_query_ts: None,
616            },
617        );
618        self.recon_check_retries.insert(client_order_id, 0);
619        self.ts_last_query.remove(&client_order_id);
620        self.order_local_activity_ns.remove(&client_order_id);
621    }
622
623    /// Records local activity for the specified order.
624    pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
625        self.order_local_activity_ns
626            .insert(client_order_id, ts_event);
627    }
628
629    /// Clears reconciliation tracking state for an order.
630    pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
631        self.inflight_checks.remove(client_order_id);
632        self.recon_check_retries.remove(client_order_id);
633        if drop_last_query {
634            self.ts_last_query.remove(client_order_id);
635        }
636        self.order_local_activity_ns.remove(client_order_id);
637    }
638
639    /// Claims external orders for a specific strategy and instrument.
640    pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
641        self.external_order_claims
642            .insert(instrument_id, strategy_id);
643    }
644
645    /// Records position activity for reconciliation tracking.
646    pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
647        self.position_local_activity_ns
648            .insert(instrument_id, ts_event);
649    }
650
651    /// Checks if a fill has been recently processed (for deduplication).
652    pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
653        self.recent_fills_cache.contains_key(trade_id)
654    }
655
656    /// Marks a fill as recently processed with current timestamp.
657    pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
658        let ts_now = self.clock.borrow().timestamp_ns();
659        self.recent_fills_cache.insert(trade_id, ts_now);
660    }
661
662    /// Prunes expired fills from the recent fills cache.
663    ///
664    /// Default TTL is 60 seconds.
665    pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
666        let ts_now = self.clock.borrow().timestamp_ns();
667        let ttl_ns = (ttl_secs * 1_000_000_000.0) as u64;
668
669        self.recent_fills_cache
670            .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
671    }
672
673    /// Purges closed orders from the cache that are older than the configured buffer.
674    pub fn purge_closed_orders(&mut self) {
675        let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
676            return;
677        };
678
679        let ts_now = self.clock.borrow().timestamp_ns();
680        let buffer_secs = (buffer_mins as u64) * 60;
681
682        self.cache
683            .borrow_mut()
684            .purge_closed_orders(ts_now, buffer_secs);
685    }
686
687    /// Purges closed positions from the cache that are older than the configured buffer.
688    pub fn purge_closed_positions(&mut self) {
689        let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
690            return;
691        };
692
693        let ts_now = self.clock.borrow().timestamp_ns();
694        let buffer_secs = (buffer_mins as u64) * 60;
695
696        self.cache
697            .borrow_mut()
698            .purge_closed_positions(ts_now, buffer_secs);
699    }
700
701    /// Purges old account events from the cache based on the configured lookback.
702    pub fn purge_account_events(&mut self) {
703        let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
704            return;
705        };
706
707        let ts_now = self.clock.borrow().timestamp_ns();
708        let lookback_secs = (lookback_mins as u64) * 60;
709
710        self.cache
711            .borrow_mut()
712            .purge_account_events(ts_now, lookback_secs);
713    }
714
715    // Private helper methods
716
717    fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
718        self.cache.borrow().order(client_order_id).cloned()
719    }
720
721    fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
722        self.cache.borrow().instrument(instrument_id).cloned()
723    }
724
725    fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
726        let mut events = Vec::new();
727
728        let Some(order) = self.get_order(&client_order_id) else {
729            return events;
730        };
731
732        let ts_now = self.clock.borrow().timestamp_ns();
733        let ts_last = order.ts_last();
734
735        // Check if order is too recent
736        if (ts_now - ts_last) < self.config.open_check_threshold_ns {
737            return events;
738        }
739
740        // Check local activity threshold
741        if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
742            && (ts_now - last_activity) < self.config.open_check_threshold_ns
743        {
744            return events;
745        }
746
747        // Increment retry counter
748        let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
749        *retries += 1;
750
751        // If max retries exceeded, generate rejection event
752        if *retries >= self.config.open_check_missing_retries {
753            log::warn!(
754                "Order {} not found at venue after {} retries, marking as REJECTED",
755                client_order_id,
756                retries
757            );
758
759            let rejected = self.create_order_rejected(&order, Some("NOT_FOUND_AT_VENUE"));
760            events.push(rejected);
761
762            self.clear_recon_tracking(&client_order_id, true);
763        } else {
764            log::debug!(
765                "Order {} not found at venue, retry {}/{}",
766                client_order_id,
767                retries,
768                self.config.open_check_missing_retries
769            );
770        }
771
772        events
773    }
774
775    fn check_position_discrepancy(
776        &mut self,
777        position: &Position,
778        venue_report: Option<&PositionStatusReport>,
779    ) -> Option<Vec<OrderEventAny>> {
780        let cached_qty = position.quantity.as_decimal();
781
782        let venue_qty = if let Some(report) = venue_report {
783            report.quantity.as_decimal()
784        } else {
785            Decimal::ZERO
786        };
787
788        // Check if quantities match (within tolerance)
789        let tolerance = Decimal::from_str("0.00000001").unwrap();
790        if (cached_qty - venue_qty).abs() <= tolerance {
791            return None; // No discrepancy
792        }
793
794        // Check activity threshold
795        let ts_now = self.clock.borrow().timestamp_ns();
796        if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
797            && (ts_now - last_activity) < self.config.position_check_threshold_ns
798        {
799            log::debug!(
800                "Skipping position reconciliation for {}: recent activity within threshold",
801                position.instrument_id
802            );
803            return None;
804        }
805
806        log::warn!(
807            "Position discrepancy detected for {}: cached_qty={}, venue_qty={}",
808            position.instrument_id,
809            cached_qty,
810            venue_qty
811        );
812
813        // TODO: Query for missing fills to reconcile the discrepancy
814        // For now, just log the discrepancy
815        None
816    }
817
818    fn reconcile_order_report(
819        &mut self,
820        order: &mut OrderAny,
821        report: &OrderStatusReport,
822    ) -> Option<OrderEventAny> {
823        // Check if reconciliation is needed
824        if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
825            return None; // Already in sync
826        }
827
828        let event = match report.order_status {
829            OrderStatus::Accepted => self.create_order_accepted(order, report),
830            OrderStatus::Rejected => {
831                self.create_order_rejected(order, report.cancel_reason.as_deref())
832            }
833            OrderStatus::Triggered => self.create_order_triggered(order, report),
834            OrderStatus::Canceled => self.create_order_canceled(order, report),
835            OrderStatus::Expired => self.create_order_expired(order, report),
836            _ => return None,
837        };
838
839        Some(event)
840    }
841
842    fn handle_external_order(
843        &self,
844        _report: &OrderStatusReport,
845        _account_id: &AccountId,
846    ) -> Option<OrderEventAny> {
847        // TODO: This would need to create a new order from the report
848        // For now, we'll skip external order handling - WIP
849        None
850    }
851
852    fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
853        OrderEventAny::Accepted(OrderAccepted::new(
854            order.trader_id(),
855            order.strategy_id(),
856            order.instrument_id(),
857            order.client_order_id(),
858            order.venue_order_id().unwrap_or(report.venue_order_id),
859            order.account_id().unwrap_or_default(),
860            UUID4::new(),
861            report.ts_accepted,
862            self.clock.borrow().timestamp_ns(),
863            false,
864        ))
865    }
866
867    fn create_order_rejected(&self, order: &OrderAny, reason: Option<&str>) -> OrderEventAny {
868        let reason = reason.unwrap_or("UNKNOWN");
869        OrderEventAny::Rejected(OrderRejected::new(
870            order.trader_id(),
871            order.strategy_id(),
872            order.instrument_id(),
873            order.client_order_id(),
874            order.account_id().unwrap_or_default(),
875            Ustr::from(reason),
876            UUID4::new(),
877            self.clock.borrow().timestamp_ns(),
878            self.clock.borrow().timestamp_ns(),
879            false,
880            false, // due_post_only
881        ))
882    }
883
884    fn create_order_triggered(
885        &self,
886        order: &OrderAny,
887        report: &OrderStatusReport,
888    ) -> OrderEventAny {
889        OrderEventAny::Triggered(OrderTriggered::new(
890            order.trader_id(),
891            order.strategy_id(),
892            order.instrument_id(),
893            order.client_order_id(),
894            UUID4::new(),
895            report
896                .ts_triggered
897                .unwrap_or(self.clock.borrow().timestamp_ns()),
898            self.clock.borrow().timestamp_ns(),
899            false,
900            order.venue_order_id(),
901            order.account_id(),
902        ))
903    }
904
905    fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
906        OrderEventAny::Canceled(OrderCanceled::new(
907            order.trader_id(),
908            order.strategy_id(),
909            order.instrument_id(),
910            order.client_order_id(),
911            UUID4::new(),
912            report.ts_last,
913            self.clock.borrow().timestamp_ns(),
914            false,
915            order.venue_order_id(),
916            order.account_id(),
917        ))
918    }
919
920    fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
921        OrderEventAny::Expired(OrderExpired::new(
922            order.trader_id(),
923            order.strategy_id(),
924            order.instrument_id(),
925            order.client_order_id(),
926            UUID4::new(),
927            report.ts_last,
928            self.clock.borrow().timestamp_ns(),
929            false,
930            order.venue_order_id(),
931            order.account_id(),
932        ))
933    }
934
935    fn create_order_fill(
936        &mut self,
937        order: &mut OrderAny,
938        fill: &FillReport,
939        instrument: &InstrumentAny,
940    ) -> Option<OrderEventAny> {
941        if self.processed_fills.contains_key(&fill.trade_id) {
942            return None;
943        }
944
945        self.processed_fills
946            .insert(fill.trade_id, order.client_order_id());
947
948        Some(OrderEventAny::Filled(OrderFilled::new(
949            order.trader_id(),
950            order.strategy_id(),
951            order.instrument_id(),
952            order.client_order_id(),
953            fill.venue_order_id,
954            order.account_id().unwrap_or_default(),
955            fill.trade_id,
956            fill.order_side,
957            order.order_type(),
958            fill.last_qty,
959            fill.last_px,
960            instrument.quote_currency(),
961            fill.liquidity_side,
962            fill.report_id,
963            fill.ts_event,
964            self.clock.borrow().timestamp_ns(),
965            false,
966            fill.venue_position_id,
967            Some(fill.commission),
968        )))
969    }
970}
971
972////////////////////////////////////////////////////////////////////////////////
973// Tests
974////////////////////////////////////////////////////////////////////////////////
975
976#[cfg(test)]
977mod tests {
978    use std::{cell::RefCell, rc::Rc};
979
980    use nautilus_common::{cache::Cache, clock::TestClock};
981    use nautilus_core::{UUID4, UnixNanos};
982    use nautilus_model::{
983        enums::OrderStatus,
984        identifiers::{AccountId, ClientId, ClientOrderId, Venue, VenueOrderId},
985        reports::ExecutionMassStatus,
986        types::Quantity,
987    };
988    use rstest::rstest;
989
990    use super::*;
991
992    fn create_test_manager() -> ExecutionManager {
993        let clock = Rc::new(RefCell::new(TestClock::new()));
994        let cache = Rc::new(RefCell::new(Cache::default()));
995        let config = ExecutionManagerConfig::default();
996        ExecutionManager::new(clock, cache, config)
997    }
998
999    #[rstest]
1000    fn test_reconciliation_manager_new() {
1001        let manager = create_test_manager();
1002        assert_eq!(manager.inflight_checks.len(), 0);
1003        assert_eq!(manager.external_order_claims.len(), 0);
1004        assert_eq!(manager.processed_fills.len(), 0);
1005    }
1006
1007    #[rstest]
1008    fn test_register_inflight() {
1009        let mut manager = create_test_manager();
1010        let client_order_id = ClientOrderId::from("O-123456");
1011
1012        manager.register_inflight(client_order_id);
1013
1014        assert_eq!(manager.inflight_checks.len(), 1);
1015        assert!(manager.inflight_checks.contains_key(&client_order_id));
1016    }
1017
1018    #[rstest]
1019    fn test_claim_external_orders() {
1020        let mut manager = create_test_manager();
1021        let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1022        let strategy_id = StrategyId::from("STRATEGY-001");
1023
1024        manager.claim_external_orders(instrument_id, strategy_id);
1025
1026        assert_eq!(manager.external_order_claims.len(), 1);
1027        assert_eq!(
1028            manager.external_order_claims.get(&instrument_id),
1029            Some(&strategy_id)
1030        );
1031    }
1032
1033    #[rstest]
1034    fn test_reconcile_report_removes_from_inflight() {
1035        let mut manager = create_test_manager();
1036        let client_order_id = ClientOrderId::from("O-123456");
1037
1038        manager.register_inflight(client_order_id);
1039        assert_eq!(manager.inflight_checks.len(), 1);
1040
1041        let report = ExecutionReport {
1042            client_order_id,
1043            venue_order_id: Some(VenueOrderId::from("V-123456")),
1044            status: OrderStatus::Accepted,
1045            filled_qty: Quantity::from(0),
1046            avg_px: None,
1047            ts_event: UnixNanos::default(),
1048        };
1049
1050        // Reconcile should remove from inflight checks
1051        manager.reconcile_report(report).unwrap();
1052        assert_eq!(manager.inflight_checks.len(), 0);
1053    }
1054
1055    #[rstest]
1056    fn test_check_inflight_orders_generates_rejection_after_max_retries() {
1057        let clock = Rc::new(RefCell::new(TestClock::new()));
1058        let cache = Rc::new(RefCell::new(Cache::default()));
1059        let config = ExecutionManagerConfig {
1060            inflight_threshold_ms: 100,
1061            inflight_max_retries: 2,
1062            ..ExecutionManagerConfig::default()
1063        };
1064        let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1065
1066        let client_order_id = ClientOrderId::from("O-123456");
1067        manager.register_inflight(client_order_id);
1068
1069        // First check - should increment retry count
1070        clock
1071            .borrow_mut()
1072            .advance_time(UnixNanos::from(200_000_000), true);
1073        let events = manager.check_inflight_orders();
1074        assert_eq!(events.len(), 0);
1075        let first_check = manager
1076            .inflight_checks
1077            .get(&client_order_id)
1078            .expect("inflight check present");
1079        assert_eq!(first_check.retry_count, 1);
1080        let first_query_ts = first_check.last_query_ts.expect("last query recorded");
1081
1082        // Second check - should hit max retries and generate rejection
1083        clock
1084            .borrow_mut()
1085            .advance_time(UnixNanos::from(400_000_000), true);
1086        let events = manager.check_inflight_orders();
1087        assert_eq!(events.len(), 0); // Would generate rejection if order existed in cache
1088        assert!(!manager.inflight_checks.contains_key(&client_order_id));
1089        // Ensure last query timestamp progressed prior to removal
1090        assert!(clock.borrow().timestamp_ns() > first_query_ts);
1091    }
1092
1093    #[rstest]
1094    fn test_check_inflight_orders_skips_recent_query() {
1095        let clock = Rc::new(RefCell::new(TestClock::new()));
1096        let cache = Rc::new(RefCell::new(Cache::default()));
1097        let config = ExecutionManagerConfig {
1098            inflight_threshold_ms: 100,
1099            inflight_max_retries: 3,
1100            ..ExecutionManagerConfig::default()
1101        };
1102        let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1103
1104        let client_order_id = ClientOrderId::from("O-ABCDEF");
1105        manager.register_inflight(client_order_id);
1106
1107        // First pass triggers a venue query and records timestamp
1108        clock
1109            .borrow_mut()
1110            .advance_time(UnixNanos::from(200_000_000), true);
1111        let events = manager.check_inflight_orders();
1112        assert!(events.is_empty());
1113        let initial_check = manager
1114            .inflight_checks
1115            .get(&client_order_id)
1116            .expect("inflight check retained");
1117        assert_eq!(initial_check.retry_count, 1);
1118        let last_query_ts = initial_check.last_query_ts.expect("last query recorded");
1119
1120        // Subsequent pass within threshold should be skipped entirely
1121        clock
1122            .borrow_mut()
1123            .advance_time(UnixNanos::from(250_000_000), true);
1124        let events = manager.check_inflight_orders();
1125        assert!(events.is_empty());
1126        let second_check = manager
1127            .inflight_checks
1128            .get(&client_order_id)
1129            .expect("inflight check retained");
1130        assert_eq!(second_check.retry_count, 1);
1131        assert_eq!(second_check.last_query_ts, Some(last_query_ts));
1132    }
1133
1134    #[rstest]
1135    fn test_check_inflight_orders_skips_filtered_ids() {
1136        let clock = Rc::new(RefCell::new(TestClock::new()));
1137        let cache = Rc::new(RefCell::new(Cache::default()));
1138        let filtered_id = ClientOrderId::from("O-FILTERED");
1139        let mut config = ExecutionManagerConfig::default();
1140        config.filtered_client_order_ids.insert(filtered_id);
1141        config.inflight_threshold_ms = 100;
1142        let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1143
1144        manager.register_inflight(filtered_id);
1145        clock
1146            .borrow_mut()
1147            .advance_time(UnixNanos::from(200_000_000), true);
1148        let events = manager.check_inflight_orders();
1149        assert!(events.is_empty());
1150        assert!(manager.inflight_checks.contains_key(&filtered_id));
1151    }
1152
1153    #[rstest]
1154    fn test_record_and_clear_tracking() {
1155        let mut manager = create_test_manager();
1156        let client_order_id = ClientOrderId::from("O-TRACK");
1157
1158        manager.register_inflight(client_order_id);
1159        let ts_now = UnixNanos::from(1_000_000);
1160        manager.record_local_activity(client_order_id, ts_now);
1161
1162        assert_eq!(
1163            manager
1164                .order_local_activity_ns
1165                .get(&client_order_id)
1166                .copied(),
1167            Some(ts_now)
1168        );
1169
1170        manager.clear_recon_tracking(&client_order_id, true);
1171        assert!(!manager.inflight_checks.contains_key(&client_order_id));
1172        assert!(
1173            !manager
1174                .order_local_activity_ns
1175                .contains_key(&client_order_id)
1176        );
1177        assert!(!manager.recon_check_retries.contains_key(&client_order_id));
1178        assert!(!manager.ts_last_query.contains_key(&client_order_id));
1179    }
1180
1181    #[tokio::test]
1182    async fn test_reconcile_execution_mass_status_with_empty() {
1183        let mut manager = create_test_manager();
1184        let account_id = AccountId::from("ACCOUNT-001");
1185        let venue = Venue::from("BINANCE");
1186
1187        let client_id = ClientId::from("BINANCE");
1188        let mass_status = ExecutionMassStatus::new(
1189            client_id,
1190            account_id,
1191            venue,
1192            UnixNanos::default(),
1193            Some(UUID4::new()),
1194        );
1195
1196        let events = manager.reconcile_execution_mass_status(mass_status).await;
1197        assert_eq!(events.len(), 0);
1198    }
1199
1200    #[rstest]
1201    fn test_reconciliation_config_default() {
1202        let config = ExecutionManagerConfig::default();
1203
1204        assert_eq!(config.lookback_mins, Some(60));
1205        assert_eq!(config.inflight_threshold_ms, 5000);
1206        assert_eq!(config.inflight_max_retries, 5);
1207        assert!(!config.filter_unclaimed_external);
1208        assert!(config.generate_missing_orders);
1209    }
1210}