nautilus_live/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Execution state manager for live trading.
17//!
18//! This module provides the execution manager for reconciling execution state between
19//! the local cache and connected venues, as well as purging old state during live trading.
20
21use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25    cache::Cache,
26    clock::Clock,
27    enums::LogColor,
28    log_info,
29    messages::execution::report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
30};
31use nautilus_core::{UUID4, UnixNanos};
32use nautilus_execution::client::ExecutionClient;
33use nautilus_model::{
34    enums::OrderStatus,
35    events::{
36        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderInitialized,
37        OrderRejected, OrderTriggered,
38    },
39    identifiers::{
40        AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
41    },
42    instruments::{Instrument, InstrumentAny},
43    orders::{Order, OrderAny},
44    position::Position,
45    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
46    types::Quantity,
47};
48use rust_decimal::Decimal;
49use ustr::Ustr;
50
51use crate::config::LiveExecEngineConfig;
52
53/// Returns true if the order status represents an open/working order.
54#[inline]
55fn is_order_status_open(status: OrderStatus) -> bool {
56    matches!(
57        status,
58        OrderStatus::Accepted
59            | OrderStatus::Triggered
60            | OrderStatus::PendingCancel
61            | OrderStatus::PendingUpdate
62            | OrderStatus::PartiallyFilled
63    )
64}
65
66/// Configuration for execution manager.
67#[derive(Debug, Clone)]
68pub struct ExecutionManagerConfig {
69    /// The trader ID for generated orders.
70    pub trader_id: TraderId,
71    /// If reconciliation is active at start-up.
72    pub reconciliation: bool,
73    /// The delay (seconds) before starting reconciliation at startup.
74    pub reconciliation_startup_delay_secs: f64,
75    /// Number of minutes to look back during reconciliation.
76    pub lookback_mins: Option<u64>,
77    /// Instrument IDs to include during reconciliation (empty => all).
78    pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
79    /// Whether to filter unclaimed external orders.
80    pub filter_unclaimed_external: bool,
81    /// Whether to filter position status reports during reconciliation.
82    pub filter_position_reports: bool,
83    /// Client order IDs excluded from reconciliation.
84    pub filtered_client_order_ids: AHashSet<ClientOrderId>,
85    /// Whether to generate missing orders from reports.
86    pub generate_missing_orders: bool,
87    /// The interval (milliseconds) between checking whether in-flight orders have exceeded their threshold.
88    pub inflight_check_interval_ms: u32,
89    /// Threshold in milliseconds for inflight order checks.
90    pub inflight_threshold_ms: u64,
91    /// Maximum number of retries for inflight checks.
92    pub inflight_max_retries: u32,
93    /// The interval (seconds) between checks for open orders at the venue.
94    pub open_check_interval_secs: Option<f64>,
95    /// The lookback minutes for open order checks.
96    pub open_check_lookback_mins: Option<u64>,
97    /// Threshold in nanoseconds before acting on venue discrepancies for open orders.
98    pub open_check_threshold_ns: u64,
99    /// Maximum retries before resolving an open order missing at the venue.
100    pub open_check_missing_retries: u32,
101    /// Whether open-order polling should only request open orders from the venue.
102    pub open_check_open_only: bool,
103    /// The maximum number of single-order queries per consistency check cycle.
104    pub max_single_order_queries_per_cycle: u32,
105    /// The delay (milliseconds) between consecutive single-order queries.
106    pub single_order_query_delay_ms: u32,
107    /// The interval (seconds) between checks for open positions at the venue.
108    pub position_check_interval_secs: Option<f64>,
109    /// The lookback minutes for position consistency checks.
110    pub position_check_lookback_mins: u64,
111    /// Threshold in nanoseconds before acting on venue discrepancies for positions.
112    pub position_check_threshold_ns: u64,
113    /// The time buffer (minutes) before closed orders can be purged.
114    pub purge_closed_orders_buffer_mins: Option<u32>,
115    /// The time buffer (minutes) before closed positions can be purged.
116    pub purge_closed_positions_buffer_mins: Option<u32>,
117    /// The time buffer (minutes) before account events can be purged.
118    pub purge_account_events_lookback_mins: Option<u32>,
119    /// If purge operations should also delete from the backing database.
120    pub purge_from_database: bool,
121}
122
123impl Default for ExecutionManagerConfig {
124    fn default() -> Self {
125        Self {
126            trader_id: TraderId::default(),
127            reconciliation: true,
128            reconciliation_startup_delay_secs: 10.0,
129            lookback_mins: Some(60),
130            reconciliation_instrument_ids: AHashSet::new(),
131            filter_unclaimed_external: false,
132            filter_position_reports: false,
133            filtered_client_order_ids: AHashSet::new(),
134            generate_missing_orders: true,
135            inflight_check_interval_ms: 2_000,
136            inflight_threshold_ms: 5_000,
137            inflight_max_retries: 5,
138            open_check_interval_secs: None,
139            open_check_lookback_mins: Some(60),
140            open_check_threshold_ns: 5_000_000_000,
141            open_check_missing_retries: 5,
142            open_check_open_only: true,
143            max_single_order_queries_per_cycle: 5,
144            single_order_query_delay_ms: 100,
145            position_check_interval_secs: None,
146            position_check_lookback_mins: 60,
147            position_check_threshold_ns: 60_000_000_000,
148            purge_closed_orders_buffer_mins: None,
149            purge_closed_positions_buffer_mins: None,
150            purge_account_events_lookback_mins: None,
151            purge_from_database: false,
152        }
153    }
154}
155
156impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
157    fn from(config: &LiveExecEngineConfig) -> Self {
158        let filtered_client_order_ids: AHashSet<ClientOrderId> = config
159            .filtered_client_order_ids
160            .clone()
161            .unwrap_or_default()
162            .into_iter()
163            .map(|value| ClientOrderId::from(value.as_str()))
164            .collect();
165
166        let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
167            .reconciliation_instrument_ids
168            .clone()
169            .unwrap_or_default()
170            .into_iter()
171            .map(|value| InstrumentId::from(value.as_str()))
172            .collect();
173
174        Self {
175            trader_id: TraderId::default(), // Must be set separately via with_trader_id
176            reconciliation: config.reconciliation,
177            reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
178            lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
179            reconciliation_instrument_ids,
180            filter_unclaimed_external: config.filter_unclaimed_external_orders,
181            filter_position_reports: config.filter_position_reports,
182            filtered_client_order_ids,
183            generate_missing_orders: config.generate_missing_orders,
184            inflight_check_interval_ms: config.inflight_check_interval_ms,
185            inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
186            inflight_max_retries: config.inflight_check_retries,
187            open_check_interval_secs: config.open_check_interval_secs,
188            open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
189            open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
190            open_check_missing_retries: config.open_check_missing_retries,
191            open_check_open_only: config.open_check_open_only,
192            max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
193            single_order_query_delay_ms: config.single_order_query_delay_ms,
194            position_check_interval_secs: config.position_check_interval_secs,
195            position_check_lookback_mins: config.position_check_lookback_mins as u64,
196            position_check_threshold_ns: (config.position_check_threshold_ms as u64) * 1_000_000,
197            purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
198            purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
199            purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
200            purge_from_database: config.purge_from_database,
201        }
202    }
203}
204
205impl ExecutionManagerConfig {
206    /// Sets the trader ID on the configuration.
207    #[must_use]
208    pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
209        self.trader_id = trader_id;
210        self
211    }
212}
213
214/// Execution report for continuous reconciliation.
215/// This is a simplified report type used during runtime reconciliation.
216#[derive(Debug, Clone)]
217pub struct ExecutionReport {
218    pub client_order_id: ClientOrderId,
219    pub venue_order_id: Option<VenueOrderId>,
220    pub status: OrderStatus,
221    pub filled_qty: Quantity,
222    pub avg_px: Option<f64>,
223    pub ts_event: UnixNanos,
224}
225
226/// Information about an inflight order check.
227#[derive(Debug, Clone)]
228struct InflightCheck {
229    #[allow(dead_code)]
230    pub client_order_id: ClientOrderId,
231    pub ts_submitted: UnixNanos,
232    pub retry_count: u32,
233    pub last_query_ts: Option<UnixNanos>,
234}
235
236/// Manager for execution state.
237///
238/// The `ExecutionManager` handles:
239/// - Startup reconciliation to align state on system start.
240/// - Continuous reconciliation of inflight orders.
241/// - External order discovery and claiming.
242/// - Fill report processing and validation.
243/// - Purging of old orders, positions, and account events.
244///
245/// # Thread Safety
246///
247/// This struct is **not thread-safe** and is designed for single-threaded use within
248/// an async runtime. Internal state is managed using `AHashMap` without synchronization,
249/// and the `clock` and `cache` use `Rc<RefCell<>>` which provide runtime borrow checking
250/// but no thread-safety guarantees.
251///
252/// If concurrent access is required, this struct must be wrapped in `Arc<Mutex<>>` or
253/// similar synchronization primitives. Alternatively, ensure that all methods are called
254/// from the same thread/task in the async runtime.
255///
256/// **Warning:** Concurrent mutable access to internal AHashMaps or concurrent borrows
257/// of `RefCell` contents will cause runtime panics.
258#[derive(Clone)]
259pub struct ExecutionManager {
260    clock: Rc<RefCell<dyn Clock>>,
261    cache: Rc<RefCell<Cache>>,
262    config: ExecutionManagerConfig,
263    inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
264    external_order_claims: AHashMap<InstrumentId, StrategyId>,
265    processed_fills: AHashMap<TradeId, ClientOrderId>,
266    recon_check_retries: AHashMap<ClientOrderId, u32>,
267    ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
268    order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
269    position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
270    recent_fills_cache: AHashMap<TradeId, UnixNanos>,
271}
272
273impl Debug for ExecutionManager {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        f.debug_struct(stringify!(ExecutionManager))
276            .field("config", &self.config)
277            .field("inflight_checks", &self.inflight_checks)
278            .field("external_order_claims", &self.external_order_claims)
279            .field("processed_fills", &self.processed_fills)
280            .field("recon_check_retries", &self.recon_check_retries)
281            .finish()
282    }
283}
284
285impl ExecutionManager {
286    /// Creates a new [`ExecutionManager`] instance.
287    pub fn new(
288        clock: Rc<RefCell<dyn Clock>>,
289        cache: Rc<RefCell<Cache>>,
290        config: ExecutionManagerConfig,
291    ) -> Self {
292        Self {
293            clock,
294            cache,
295            config,
296            inflight_checks: AHashMap::new(),
297            external_order_claims: AHashMap::new(),
298            processed_fills: AHashMap::new(),
299            recon_check_retries: AHashMap::new(),
300            ts_last_query: AHashMap::new(),
301            order_local_activity_ns: AHashMap::new(),
302            position_local_activity_ns: AHashMap::new(),
303            recent_fills_cache: AHashMap::new(),
304        }
305    }
306
307    /// Reconciles orders and fills from a mass status report.
308    pub async fn reconcile_execution_mass_status(
309        &mut self,
310        mass_status: ExecutionMassStatus,
311    ) -> Vec<OrderEventAny> {
312        let venue = mass_status.venue;
313        let order_count = mass_status.order_reports().len();
314        let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
315        let position_count = mass_status.position_reports().len();
316
317        log_info!(
318            "Reconciling ExecutionMassStatus for {}",
319            venue,
320            color = LogColor::Blue
321        );
322        log_info!(
323            "Received {} order(s), {} fill(s), {} position(s)",
324            order_count,
325            fill_count,
326            position_count,
327            color = LogColor::Blue
328        );
329
330        let mut events = Vec::new();
331        let mut orders_reconciled = 0usize;
332        let mut external_orders_created = 0usize;
333        let mut open_orders_initialized = 0usize;
334        let mut orders_skipped_no_instrument = 0usize;
335        let mut fills_applied = 0usize;
336
337        // Process order status reports first
338        for report in mass_status.order_reports().values() {
339            if let Some(client_order_id) = &report.client_order_id {
340                if let Some(order) = self.get_order(client_order_id) {
341                    let mut order = order;
342                    log::info!(
343                        color = LogColor::Blue as u8;
344                        "Reconciling {} {} {} [{}] -> [{}]",
345                        client_order_id,
346                        report.venue_order_id,
347                        report.instrument_id,
348                        order.status(),
349                        report.order_status,
350                    );
351                    if let Some(event) = self.reconcile_order_report(&mut order, report) {
352                        orders_reconciled += 1;
353                        events.push(event);
354                    }
355                } else if !self.config.filter_unclaimed_external {
356                    // Order has client_order_id but not in cache - external order
357                    if self.get_instrument(&report.instrument_id).is_none() {
358                        orders_skipped_no_instrument += 1;
359                    } else {
360                        let external_events =
361                            self.handle_external_order(report, &mass_status.account_id);
362                        if !external_events.is_empty() {
363                            external_orders_created += 1;
364                            if is_order_status_open(report.order_status) {
365                                open_orders_initialized += 1;
366                            }
367                            events.extend(external_events);
368                        }
369                    }
370                }
371            } else if !self.config.filter_unclaimed_external {
372                if self.get_instrument(&report.instrument_id).is_none() {
373                    orders_skipped_no_instrument += 1;
374                } else {
375                    let external_events =
376                        self.handle_external_order(report, &mass_status.account_id);
377                    if !external_events.is_empty() {
378                        external_orders_created += 1;
379                        if is_order_status_open(report.order_status) {
380                            open_orders_initialized += 1;
381                        }
382                        events.extend(external_events);
383                    }
384                }
385            }
386        }
387
388        // Sort fills chronologically to ensure proper position updates
389        let fill_reports = mass_status.fill_reports();
390        let mut all_fills: Vec<&FillReport> = fill_reports.values().flatten().collect();
391        all_fills.sort_by_key(|f| f.ts_event);
392
393        for fill in all_fills {
394            if let Some(client_order_id) = &fill.client_order_id
395                && let Some(order) = self.get_order(client_order_id)
396            {
397                let mut order = order;
398                let instrument_id = order.instrument_id();
399
400                if let Some(instrument) = self.get_instrument(&instrument_id)
401                    && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
402                {
403                    fills_applied += 1;
404                    events.push(event);
405                }
406            }
407        }
408
409        if orders_skipped_no_instrument > 0 {
410            log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
411        }
412
413        log::info!(
414            color = LogColor::Blue as u8;
415            "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}",
416        );
417
418        events
419    }
420
421    /// Reconciles a single execution report during runtime.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if the average price cannot be converted to a valid `Decimal`.
426    pub fn reconcile_report(
427        &mut self,
428        report: ExecutionReport,
429    ) -> anyhow::Result<Vec<OrderEventAny>> {
430        let mut events = Vec::new();
431
432        self.clear_recon_tracking(&report.client_order_id, true);
433
434        if let Some(order) = self.get_order(&report.client_order_id) {
435            let mut order = order;
436            let Some(account_id) = order.account_id() else {
437                log::error!("Cannot process fill report: order has no account_id");
438                return Ok(vec![]);
439            };
440            let Some(venue_order_id) = report.venue_order_id else {
441                log::error!("Cannot process fill report: report has no venue_order_id");
442                return Ok(vec![]);
443            };
444            let mut order_report = OrderStatusReport::new(
445                account_id,
446                order.instrument_id(),
447                Some(report.client_order_id),
448                venue_order_id,
449                order.order_side(),
450                order.order_type(),
451                order.time_in_force(),
452                report.status,
453                order.quantity(),
454                report.filled_qty,
455                report.ts_event, // Use ts_event as ts_accepted
456                report.ts_event, // Use ts_event as ts_last
457                self.clock.borrow().timestamp_ns(),
458                Some(UUID4::new()),
459            );
460
461            if let Some(avg_px) = report.avg_px {
462                order_report = order_report.with_avg_px(avg_px)?;
463            }
464
465            if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
466                events.push(event);
467            }
468        }
469
470        Ok(events)
471    }
472
473    /// Checks inflight orders and returns events for any that need reconciliation.
474    pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
475        let mut events = Vec::new();
476        let current_time = self.clock.borrow().timestamp_ns();
477        let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
478
479        let mut to_check = Vec::new();
480        for (client_order_id, check) in &self.inflight_checks {
481            if current_time - check.ts_submitted > threshold_ns {
482                to_check.push(*client_order_id);
483            }
484        }
485
486        for client_order_id in to_check {
487            if self
488                .config
489                .filtered_client_order_ids
490                .contains(&client_order_id)
491            {
492                continue;
493            }
494
495            if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
496                if let Some(last_query_ts) = check.last_query_ts
497                    && current_time - last_query_ts < threshold_ns
498                {
499                    continue;
500                }
501
502                check.retry_count += 1;
503                check.last_query_ts = Some(current_time);
504                self.ts_last_query.insert(client_order_id, current_time);
505                self.recon_check_retries
506                    .insert(client_order_id, check.retry_count);
507
508                if check.retry_count >= self.config.inflight_max_retries {
509                    // Generate rejection after max retries
510                    if let Some(order) = self.get_order(&client_order_id) {
511                        events.push(self.create_order_rejected(&order, Some("INFLIGHT_TIMEOUT")));
512                    }
513                    // Remove from inflight checks regardless of whether order exists
514                    self.clear_recon_tracking(&client_order_id, true);
515                }
516            }
517        }
518
519        events
520    }
521
522    /// Checks open orders consistency between cache and venue.
523    ///
524    /// This method validates that open orders in the cache match the venue's state,
525    /// comparing order status and filled quantities, and generating reconciliation
526    /// events for any discrepancies detected.
527    ///
528    /// # Returns
529    ///
530    /// A vector of order events generated to reconcile discrepancies.
531    pub async fn check_open_orders(
532        &mut self,
533        clients: &[Rc<dyn ExecutionClient>],
534    ) -> Vec<OrderEventAny> {
535        log::debug!("Checking order consistency between cached-state and venues");
536
537        let filtered_orders: Vec<OrderAny> = {
538            let cache = self.cache.borrow();
539            let open_orders = cache.orders_open(None, None, None, None);
540
541            if !self.config.reconciliation_instrument_ids.is_empty() {
542                open_orders
543                    .iter()
544                    .filter(|o| {
545                        self.config
546                            .reconciliation_instrument_ids
547                            .contains(&o.instrument_id())
548                    })
549                    .map(|o| (*o).clone())
550                    .collect()
551            } else {
552                open_orders.iter().map(|o| (*o).clone()).collect()
553            }
554        };
555
556        log::debug!(
557            "Found {} order{} open in cache",
558            filtered_orders.len(),
559            if filtered_orders.len() == 1 { "" } else { "s" }
560        );
561
562        let mut all_reports = Vec::new();
563        let mut venue_reported_ids = AHashSet::new();
564
565        for client in clients {
566            let cmd = GenerateOrderStatusReports::new(
567                UUID4::new(),
568                self.clock.borrow().timestamp_ns(),
569                true, // open_only
570                None, // instrument_id - query all
571                None, // start
572                None, // end
573                None, // params
574                None, // correlation_id
575            );
576
577            match client.generate_order_status_reports(&cmd).await {
578                Ok(reports) => {
579                    for report in reports {
580                        if let Some(client_order_id) = &report.client_order_id {
581                            venue_reported_ids.insert(*client_order_id);
582                        }
583                        all_reports.push(report);
584                    }
585                }
586                Err(e) => {
587                    log::error!(
588                        "Failed to query order reports from {}: {e}",
589                        client.client_id()
590                    );
591                }
592            }
593        }
594
595        // Reconcile reports against cached orders
596        let mut events = Vec::new();
597        for report in all_reports {
598            if let Some(client_order_id) = &report.client_order_id
599                && let Some(mut order) = self.get_order(client_order_id)
600                && let Some(event) = self.reconcile_order_report(&mut order, &report)
601            {
602                events.push(event);
603            }
604        }
605
606        // Handle orders missing at venue
607        if !self.config.open_check_open_only {
608            let cached_ids: AHashSet<ClientOrderId> = filtered_orders
609                .iter()
610                .map(|o| o.client_order_id())
611                .collect();
612            let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
613                .difference(&venue_reported_ids)
614                .copied()
615                .collect();
616
617            for client_order_id in missing_at_venue {
618                events.extend(self.handle_missing_order(client_order_id));
619            }
620        }
621
622        events
623    }
624
625    /// Checks position consistency between cache and venue.
626    ///
627    /// This method validates that positions in the cache match the venue's state,
628    /// detecting position drift and querying for missing fills when discrepancies
629    /// are found.
630    ///
631    /// # Returns
632    ///
633    /// A vector of fill events generated to reconcile position discrepancies.
634    pub async fn check_positions_consistency(
635        &mut self,
636        clients: &[Rc<dyn ExecutionClient>],
637    ) -> Vec<OrderEventAny> {
638        log::debug!("Checking position consistency between cached-state and venues");
639
640        let open_positions = {
641            let cache = self.cache.borrow();
642            let positions = cache.positions_open(None, None, None, None);
643
644            if !self.config.reconciliation_instrument_ids.is_empty() {
645                positions
646                    .iter()
647                    .filter(|p| {
648                        self.config
649                            .reconciliation_instrument_ids
650                            .contains(&p.instrument_id)
651                    })
652                    .map(|p| (*p).clone())
653                    .collect::<Vec<_>>()
654            } else {
655                positions.iter().map(|p| (*p).clone()).collect()
656            }
657        };
658
659        log::debug!(
660            "Found {} position{} to check",
661            open_positions.len(),
662            if open_positions.len() == 1 { "" } else { "s" }
663        );
664
665        // Query venue for position reports
666        let mut venue_positions = AHashMap::new();
667
668        for client in clients {
669            let cmd = GeneratePositionStatusReports::new(
670                UUID4::new(),
671                self.clock.borrow().timestamp_ns(),
672                None, // instrument_id - query all
673                None, // start
674                None, // end
675                None, // params
676                None, // correlation_id
677            );
678
679            match client.generate_position_status_reports(&cmd).await {
680                Ok(reports) => {
681                    for report in reports {
682                        venue_positions.insert(report.instrument_id, report);
683                    }
684                }
685                Err(e) => {
686                    log::error!(
687                        "Failed to query position reports from {}: {e}",
688                        client.client_id()
689                    );
690                }
691            }
692        }
693
694        // Check for discrepancies
695        let mut events = Vec::new();
696
697        for position in &open_positions {
698            // Skip if not in filter
699            if !self.config.reconciliation_instrument_ids.is_empty()
700                && !self
701                    .config
702                    .reconciliation_instrument_ids
703                    .contains(&position.instrument_id)
704            {
705                continue;
706            }
707
708            let venue_report = venue_positions.get(&position.instrument_id);
709
710            if let Some(discrepancy_events) =
711                self.check_position_discrepancy(position, venue_report)
712            {
713                events.extend(discrepancy_events);
714            }
715        }
716
717        events
718    }
719
720    /// Registers an order as inflight for tracking.
721    pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
722        let ts_submitted = self.clock.borrow().timestamp_ns();
723        self.inflight_checks.insert(
724            client_order_id,
725            InflightCheck {
726                client_order_id,
727                ts_submitted,
728                retry_count: 0,
729                last_query_ts: None,
730            },
731        );
732        self.recon_check_retries.insert(client_order_id, 0);
733        self.ts_last_query.remove(&client_order_id);
734        self.order_local_activity_ns.remove(&client_order_id);
735    }
736
737    /// Records local activity for the specified order.
738    pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
739        self.order_local_activity_ns
740            .insert(client_order_id, ts_event);
741    }
742
743    /// Clears reconciliation tracking state for an order.
744    pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
745        self.inflight_checks.remove(client_order_id);
746        self.recon_check_retries.remove(client_order_id);
747        if drop_last_query {
748            self.ts_last_query.remove(client_order_id);
749        }
750        self.order_local_activity_ns.remove(client_order_id);
751    }
752
753    /// Claims external orders for a specific strategy and instrument.
754    pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
755        self.external_order_claims
756            .insert(instrument_id, strategy_id);
757    }
758
759    /// Records position activity for reconciliation tracking.
760    pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
761        self.position_local_activity_ns
762            .insert(instrument_id, ts_event);
763    }
764
765    /// Checks if a fill has been recently processed (for deduplication).
766    pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
767        self.recent_fills_cache.contains_key(trade_id)
768    }
769
770    /// Marks a fill as recently processed with current timestamp.
771    pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
772        let ts_now = self.clock.borrow().timestamp_ns();
773        self.recent_fills_cache.insert(trade_id, ts_now);
774    }
775
776    /// Prunes expired fills from the recent fills cache.
777    ///
778    /// Default TTL is 60 seconds.
779    pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
780        let ts_now = self.clock.borrow().timestamp_ns();
781        let ttl_ns = (ttl_secs * 1_000_000_000.0) as u64;
782
783        self.recent_fills_cache
784            .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
785    }
786
787    /// Purges closed orders from the cache that are older than the configured buffer.
788    pub fn purge_closed_orders(&mut self) {
789        let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
790            return;
791        };
792
793        let ts_now = self.clock.borrow().timestamp_ns();
794        let buffer_secs = (buffer_mins as u64) * 60;
795
796        self.cache
797            .borrow_mut()
798            .purge_closed_orders(ts_now, buffer_secs);
799    }
800
801    /// Purges closed positions from the cache that are older than the configured buffer.
802    pub fn purge_closed_positions(&mut self) {
803        let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
804            return;
805        };
806
807        let ts_now = self.clock.borrow().timestamp_ns();
808        let buffer_secs = (buffer_mins as u64) * 60;
809
810        self.cache
811            .borrow_mut()
812            .purge_closed_positions(ts_now, buffer_secs);
813    }
814
815    /// Purges old account events from the cache based on the configured lookback.
816    pub fn purge_account_events(&mut self) {
817        let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
818            return;
819        };
820
821        let ts_now = self.clock.borrow().timestamp_ns();
822        let lookback_secs = (lookback_mins as u64) * 60;
823
824        self.cache
825            .borrow_mut()
826            .purge_account_events(ts_now, lookback_secs);
827    }
828
829    // Private helper methods
830
831    fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
832        self.cache.borrow().order(client_order_id).cloned()
833    }
834
835    fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
836        self.cache.borrow().instrument(instrument_id).cloned()
837    }
838
839    fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
840        let mut events = Vec::new();
841
842        let Some(order) = self.get_order(&client_order_id) else {
843            return events;
844        };
845
846        let ts_now = self.clock.borrow().timestamp_ns();
847        let ts_last = order.ts_last();
848
849        // Check if order is too recent
850        if (ts_now - ts_last) < self.config.open_check_threshold_ns {
851            return events;
852        }
853
854        // Check local activity threshold
855        if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
856            && (ts_now - last_activity) < self.config.open_check_threshold_ns
857        {
858            return events;
859        }
860
861        // Increment retry counter
862        let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
863        *retries += 1;
864
865        // If max retries exceeded, generate rejection event
866        if *retries >= self.config.open_check_missing_retries {
867            log::warn!(
868                "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
869            );
870
871            let rejected = self.create_order_rejected(&order, Some("NOT_FOUND_AT_VENUE"));
872            events.push(rejected);
873
874            self.clear_recon_tracking(&client_order_id, true);
875        } else {
876            log::debug!(
877                "Order {} not found at venue, retry {}/{}",
878                client_order_id,
879                retries,
880                self.config.open_check_missing_retries
881            );
882        }
883
884        events
885    }
886
887    fn check_position_discrepancy(
888        &mut self,
889        position: &Position,
890        venue_report: Option<&PositionStatusReport>,
891    ) -> Option<Vec<OrderEventAny>> {
892        let cached_qty = position.quantity.as_decimal();
893
894        let venue_qty = if let Some(report) = venue_report {
895            report.quantity.as_decimal()
896        } else {
897            Decimal::ZERO
898        };
899
900        // Check if quantities match (within tolerance)
901        let tolerance = Decimal::from_str("0.00000001").unwrap();
902        if (cached_qty - venue_qty).abs() <= tolerance {
903            return None; // No discrepancy
904        }
905
906        // Check activity threshold
907        let ts_now = self.clock.borrow().timestamp_ns();
908        if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
909            && (ts_now - last_activity) < self.config.position_check_threshold_ns
910        {
911            log::debug!(
912                "Skipping position reconciliation for {}: recent activity within threshold",
913                position.instrument_id
914            );
915            return None;
916        }
917
918        log::warn!(
919            "Position discrepancy detected for {}: cached_qty={}, venue_qty={}",
920            position.instrument_id,
921            cached_qty,
922            venue_qty
923        );
924
925        // TODO: Query for missing fills to reconcile the discrepancy
926        // For now, just log the discrepancy
927        None
928    }
929
930    fn reconcile_order_report(
931        &mut self,
932        order: &mut OrderAny,
933        report: &OrderStatusReport,
934    ) -> Option<OrderEventAny> {
935        // Check if reconciliation is needed
936        if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
937            return None; // Already in sync
938        }
939
940        let event = match report.order_status {
941            OrderStatus::Accepted => self.create_order_accepted(order, report),
942            OrderStatus::Rejected => {
943                self.create_order_rejected(order, report.cancel_reason.as_deref())
944            }
945            OrderStatus::Triggered => self.create_order_triggered(order, report),
946            OrderStatus::Canceled => self.create_order_canceled(order, report),
947            OrderStatus::Expired => self.create_order_expired(order, report),
948            _ => return None,
949        };
950
951        Some(event)
952    }
953
954    fn handle_external_order(
955        &mut self,
956        report: &OrderStatusReport,
957        account_id: &AccountId,
958    ) -> Vec<OrderEventAny> {
959        let (strategy_id, tags) =
960            if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
961                let order_id = report
962                    .client_order_id
963                    .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
964                log::info!(
965                    color = LogColor::Blue as u8;
966                    "External order {} for {} claimed by strategy {}",
967                    order_id,
968                    report.instrument_id,
969                    claimed_strategy,
970                );
971                (*claimed_strategy, None)
972            } else {
973                // Unclaimed external orders use EXTERNAL strategy ID with VENUE tag
974                (
975                    StrategyId::from("EXTERNAL"),
976                    Some(vec![Ustr::from("VENUE")]),
977                )
978            };
979
980        let client_order_id = report
981            .client_order_id
982            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
983
984        let ts_now = self.clock.borrow().timestamp_ns();
985
986        let initialized = OrderInitialized::new(
987            self.config.trader_id,
988            strategy_id,
989            report.instrument_id,
990            client_order_id,
991            report.order_side,
992            report.order_type,
993            report.quantity,
994            report.time_in_force,
995            report.post_only,
996            report.reduce_only,
997            false, // quote_quantity
998            true,  // reconciliation
999            UUID4::new(),
1000            ts_now,
1001            ts_now,
1002            report.price,
1003            report.trigger_price,
1004            report.trigger_type,
1005            report.limit_offset,
1006            report.trailing_offset,
1007            Some(report.trailing_offset_type),
1008            report.expire_time,
1009            report.display_qty,
1010            None, // emulation_trigger
1011            None, // trigger_instrument_id
1012            Some(report.contingency_type),
1013            report.order_list_id,
1014            report.linked_order_ids.clone(),
1015            report.parent_order_id,
1016            None, // exec_algorithm_id
1017            None, // exec_algorithm_params
1018            None, // exec_spawn_id
1019            tags,
1020        );
1021
1022        let events = vec![OrderEventAny::Initialized(initialized)];
1023        let order = match OrderAny::from_events(events) {
1024            Ok(order) => order,
1025            Err(e) => {
1026                log::error!("Failed to create order from report: {e}");
1027                return Vec::new();
1028            }
1029        };
1030
1031        {
1032            let mut cache = self.cache.borrow_mut();
1033            if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1034                log::error!("Failed to add external order to cache: {e}");
1035                return Vec::new();
1036            }
1037
1038            if let Err(e) =
1039                cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
1040            {
1041                log::warn!("Failed to add venue order ID index: {e}");
1042            }
1043        }
1044
1045        log::info!(
1046            color = LogColor::Blue as u8;
1047            "Created external order {} ({}) for {} [{}]",
1048            client_order_id,
1049            report.venue_order_id,
1050            report.instrument_id,
1051            report.order_status,
1052        );
1053
1054        self.generate_external_order_status_events(&order, report, account_id)
1055    }
1056
1057    /// Generates the appropriate order status events for an external order.
1058    ///
1059    /// After creating an external order, we need to transition it to its actual state
1060    /// based on the order status report from the venue. For terminal states like
1061    /// Canceled/Expired, we return multiple events to properly transition through states.
1062    fn generate_external_order_status_events(
1063        &self,
1064        order: &OrderAny,
1065        report: &OrderStatusReport,
1066        account_id: &AccountId,
1067    ) -> Vec<OrderEventAny> {
1068        let ts_now = self.clock.borrow().timestamp_ns();
1069
1070        let accepted = OrderEventAny::Accepted(OrderAccepted::new(
1071            order.trader_id(),
1072            order.strategy_id(),
1073            order.instrument_id(),
1074            order.client_order_id(),
1075            report.venue_order_id,
1076            *account_id,
1077            UUID4::new(),
1078            report.ts_accepted,
1079            ts_now,
1080            true, // reconciliation
1081        ));
1082
1083        match report.order_status {
1084            OrderStatus::Accepted | OrderStatus::PartiallyFilled | OrderStatus::Filled => {
1085                // All these states require order to first be accepted
1086                vec![accepted]
1087            }
1088            OrderStatus::Canceled => {
1089                // Accept first, then cancel to reach terminal state
1090                let canceled = OrderEventAny::Canceled(OrderCanceled::new(
1091                    order.trader_id(),
1092                    order.strategy_id(),
1093                    order.instrument_id(),
1094                    order.client_order_id(),
1095                    UUID4::new(),
1096                    report.ts_last,
1097                    ts_now,
1098                    true, // reconciliation
1099                    Some(report.venue_order_id),
1100                    Some(*account_id),
1101                ));
1102                vec![accepted, canceled]
1103            }
1104            OrderStatus::Expired => {
1105                // Accept first, then expire to reach terminal state
1106                let expired = OrderEventAny::Expired(OrderExpired::new(
1107                    order.trader_id(),
1108                    order.strategy_id(),
1109                    order.instrument_id(),
1110                    order.client_order_id(),
1111                    UUID4::new(),
1112                    report.ts_last,
1113                    ts_now,
1114                    true, // reconciliation
1115                    Some(report.venue_order_id),
1116                    Some(*account_id),
1117                ));
1118                vec![accepted, expired]
1119            }
1120            OrderStatus::Rejected => {
1121                // Rejected goes directly to terminal state without acceptance
1122                vec![OrderEventAny::Rejected(OrderRejected::new(
1123                    order.trader_id(),
1124                    order.strategy_id(),
1125                    order.instrument_id(),
1126                    order.client_order_id(),
1127                    *account_id,
1128                    Ustr::from(report.cancel_reason.as_deref().unwrap_or("UNKNOWN")),
1129                    UUID4::new(),
1130                    report.ts_last,
1131                    ts_now,
1132                    true, // reconciliation
1133                    false,
1134                ))]
1135            }
1136            OrderStatus::Triggered => {
1137                // Triggered orders need acceptance first
1138                vec![accepted]
1139            }
1140            _ => {
1141                log::warn!(
1142                    "Unhandled order status {} for external order {}",
1143                    report.order_status,
1144                    order.client_order_id()
1145                );
1146                Vec::new()
1147            }
1148        }
1149    }
1150
1151    fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
1152        OrderEventAny::Accepted(OrderAccepted::new(
1153            order.trader_id(),
1154            order.strategy_id(),
1155            order.instrument_id(),
1156            order.client_order_id(),
1157            order.venue_order_id().unwrap_or(report.venue_order_id),
1158            order
1159                .account_id()
1160                .expect("Order should have account_id when creating accepted event"),
1161            UUID4::new(),
1162            report.ts_accepted,
1163            self.clock.borrow().timestamp_ns(),
1164            false,
1165        ))
1166    }
1167
1168    fn create_order_rejected(&self, order: &OrderAny, reason: Option<&str>) -> OrderEventAny {
1169        let reason = reason.unwrap_or("UNKNOWN");
1170        OrderEventAny::Rejected(OrderRejected::new(
1171            order.trader_id(),
1172            order.strategy_id(),
1173            order.instrument_id(),
1174            order.client_order_id(),
1175            order
1176                .account_id()
1177                .expect("Order should have account_id when creating rejected event"),
1178            Ustr::from(reason),
1179            UUID4::new(),
1180            self.clock.borrow().timestamp_ns(),
1181            self.clock.borrow().timestamp_ns(),
1182            false,
1183            false, // due_post_only
1184        ))
1185    }
1186
1187    fn create_order_triggered(
1188        &self,
1189        order: &OrderAny,
1190        report: &OrderStatusReport,
1191    ) -> OrderEventAny {
1192        OrderEventAny::Triggered(OrderTriggered::new(
1193            order.trader_id(),
1194            order.strategy_id(),
1195            order.instrument_id(),
1196            order.client_order_id(),
1197            UUID4::new(),
1198            report
1199                .ts_triggered
1200                .unwrap_or(self.clock.borrow().timestamp_ns()),
1201            self.clock.borrow().timestamp_ns(),
1202            false,
1203            order.venue_order_id(),
1204            order.account_id(),
1205        ))
1206    }
1207
1208    fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
1209        OrderEventAny::Canceled(OrderCanceled::new(
1210            order.trader_id(),
1211            order.strategy_id(),
1212            order.instrument_id(),
1213            order.client_order_id(),
1214            UUID4::new(),
1215            report.ts_last,
1216            self.clock.borrow().timestamp_ns(),
1217            false,
1218            order.venue_order_id(),
1219            order.account_id(),
1220        ))
1221    }
1222
1223    fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
1224        OrderEventAny::Expired(OrderExpired::new(
1225            order.trader_id(),
1226            order.strategy_id(),
1227            order.instrument_id(),
1228            order.client_order_id(),
1229            UUID4::new(),
1230            report.ts_last,
1231            self.clock.borrow().timestamp_ns(),
1232            false,
1233            order.venue_order_id(),
1234            order.account_id(),
1235        ))
1236    }
1237
1238    fn create_order_fill(
1239        &mut self,
1240        order: &mut OrderAny,
1241        fill: &FillReport,
1242        instrument: &InstrumentAny,
1243    ) -> Option<OrderEventAny> {
1244        if self.processed_fills.contains_key(&fill.trade_id) {
1245            return None;
1246        }
1247
1248        self.processed_fills
1249            .insert(fill.trade_id, order.client_order_id());
1250
1251        Some(OrderEventAny::Filled(OrderFilled::new(
1252            order.trader_id(),
1253            order.strategy_id(),
1254            order.instrument_id(),
1255            order.client_order_id(),
1256            fill.venue_order_id,
1257            fill.account_id,
1258            fill.trade_id,
1259            fill.order_side,
1260            order.order_type(),
1261            fill.last_qty,
1262            fill.last_px,
1263            instrument.quote_currency(),
1264            fill.liquidity_side,
1265            fill.report_id,
1266            fill.ts_event,
1267            self.clock.borrow().timestamp_ns(),
1268            false,
1269            fill.venue_position_id,
1270            Some(fill.commission),
1271        )))
1272    }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277    use std::{cell::RefCell, rc::Rc};
1278
1279    use nautilus_common::{cache::Cache, clock::TestClock};
1280    use nautilus_core::{UUID4, UnixNanos};
1281    use nautilus_model::{
1282        enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce},
1283        events::OrderEventAny,
1284        identifiers::{
1285            AccountId, ClientId, ClientOrderId, InstrumentId, TradeId, Venue, VenueOrderId,
1286        },
1287        instruments::stubs::audusd_sim,
1288        orders::{Order, OrderTestBuilder},
1289        reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
1290        types::{Money, Price, Quantity},
1291    };
1292    use rstest::rstest;
1293
1294    use super::*;
1295
1296    fn create_test_manager() -> ExecutionManager {
1297        let clock = Rc::new(RefCell::new(TestClock::new()));
1298        let cache = Rc::new(RefCell::new(Cache::default()));
1299        let config = ExecutionManagerConfig::default();
1300        ExecutionManager::new(clock, cache, config)
1301    }
1302
1303    #[rstest]
1304    fn test_reconciliation_manager_new() {
1305        let manager = create_test_manager();
1306        assert_eq!(manager.inflight_checks.len(), 0);
1307        assert_eq!(manager.external_order_claims.len(), 0);
1308        assert_eq!(manager.processed_fills.len(), 0);
1309    }
1310
1311    #[rstest]
1312    fn test_register_inflight() {
1313        let mut manager = create_test_manager();
1314        let client_order_id = ClientOrderId::from("O-123456");
1315
1316        manager.register_inflight(client_order_id);
1317
1318        assert_eq!(manager.inflight_checks.len(), 1);
1319        assert!(manager.inflight_checks.contains_key(&client_order_id));
1320    }
1321
1322    #[rstest]
1323    fn test_claim_external_orders() {
1324        let mut manager = create_test_manager();
1325        let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1326        let strategy_id = StrategyId::from("STRATEGY-001");
1327
1328        manager.claim_external_orders(instrument_id, strategy_id);
1329
1330        assert_eq!(manager.external_order_claims.len(), 1);
1331        assert_eq!(
1332            manager.external_order_claims.get(&instrument_id),
1333            Some(&strategy_id)
1334        );
1335    }
1336
1337    #[rstest]
1338    fn test_reconcile_report_removes_from_inflight() {
1339        let mut manager = create_test_manager();
1340        let client_order_id = ClientOrderId::from("O-123456");
1341
1342        manager.register_inflight(client_order_id);
1343        assert_eq!(manager.inflight_checks.len(), 1);
1344
1345        let report = ExecutionReport {
1346            client_order_id,
1347            venue_order_id: Some(VenueOrderId::from("V-123456")),
1348            status: OrderStatus::Accepted,
1349            filled_qty: Quantity::from(0),
1350            avg_px: None,
1351            ts_event: UnixNanos::default(),
1352        };
1353
1354        // Reconcile should remove from inflight checks
1355        manager.reconcile_report(report).unwrap();
1356        assert_eq!(manager.inflight_checks.len(), 0);
1357    }
1358
1359    #[rstest]
1360    fn test_check_inflight_orders_generates_rejection_after_max_retries() {
1361        let clock = Rc::new(RefCell::new(TestClock::new()));
1362        let cache = Rc::new(RefCell::new(Cache::default()));
1363        let config = ExecutionManagerConfig {
1364            inflight_threshold_ms: 100,
1365            inflight_max_retries: 2,
1366            ..ExecutionManagerConfig::default()
1367        };
1368        let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1369
1370        let client_order_id = ClientOrderId::from("O-123456");
1371        manager.register_inflight(client_order_id);
1372
1373        // First check - should increment retry count
1374        clock
1375            .borrow_mut()
1376            .advance_time(UnixNanos::from(200_000_000), true);
1377        let events = manager.check_inflight_orders();
1378        assert_eq!(events.len(), 0);
1379        let first_check = manager
1380            .inflight_checks
1381            .get(&client_order_id)
1382            .expect("inflight check present");
1383        assert_eq!(first_check.retry_count, 1);
1384        let first_query_ts = first_check.last_query_ts.expect("last query recorded");
1385
1386        // Second check - should hit max retries and generate rejection
1387        clock
1388            .borrow_mut()
1389            .advance_time(UnixNanos::from(400_000_000), true);
1390        let events = manager.check_inflight_orders();
1391        assert_eq!(events.len(), 0); // Would generate rejection if order existed in cache
1392        assert!(!manager.inflight_checks.contains_key(&client_order_id));
1393        // Ensure last query timestamp progressed prior to removal
1394        assert!(clock.borrow().timestamp_ns() > first_query_ts);
1395    }
1396
1397    #[rstest]
1398    fn test_check_inflight_orders_skips_recent_query() {
1399        let clock = Rc::new(RefCell::new(TestClock::new()));
1400        let cache = Rc::new(RefCell::new(Cache::default()));
1401        let config = ExecutionManagerConfig {
1402            inflight_threshold_ms: 100,
1403            inflight_max_retries: 3,
1404            ..ExecutionManagerConfig::default()
1405        };
1406        let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1407
1408        let client_order_id = ClientOrderId::from("O-ABCDEF");
1409        manager.register_inflight(client_order_id);
1410
1411        // First pass triggers a venue query and records timestamp
1412        clock
1413            .borrow_mut()
1414            .advance_time(UnixNanos::from(200_000_000), true);
1415        let events = manager.check_inflight_orders();
1416        assert!(events.is_empty());
1417        let initial_check = manager
1418            .inflight_checks
1419            .get(&client_order_id)
1420            .expect("inflight check retained");
1421        assert_eq!(initial_check.retry_count, 1);
1422        let last_query_ts = initial_check.last_query_ts.expect("last query recorded");
1423
1424        // Subsequent pass within threshold should be skipped entirely
1425        clock
1426            .borrow_mut()
1427            .advance_time(UnixNanos::from(250_000_000), true);
1428        let events = manager.check_inflight_orders();
1429        assert!(events.is_empty());
1430        let second_check = manager
1431            .inflight_checks
1432            .get(&client_order_id)
1433            .expect("inflight check retained");
1434        assert_eq!(second_check.retry_count, 1);
1435        assert_eq!(second_check.last_query_ts, Some(last_query_ts));
1436    }
1437
1438    #[rstest]
1439    fn test_check_inflight_orders_skips_filtered_ids() {
1440        let clock = Rc::new(RefCell::new(TestClock::new()));
1441        let cache = Rc::new(RefCell::new(Cache::default()));
1442        let filtered_id = ClientOrderId::from("O-FILTERED");
1443        let mut config = ExecutionManagerConfig::default();
1444        config.filtered_client_order_ids.insert(filtered_id);
1445        config.inflight_threshold_ms = 100;
1446        let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1447
1448        manager.register_inflight(filtered_id);
1449        clock
1450            .borrow_mut()
1451            .advance_time(UnixNanos::from(200_000_000), true);
1452        let events = manager.check_inflight_orders();
1453        assert!(events.is_empty());
1454        assert!(manager.inflight_checks.contains_key(&filtered_id));
1455    }
1456
1457    #[rstest]
1458    fn test_record_and_clear_tracking() {
1459        let mut manager = create_test_manager();
1460        let client_order_id = ClientOrderId::from("O-TRACK");
1461
1462        manager.register_inflight(client_order_id);
1463        let ts_now = UnixNanos::from(1_000_000);
1464        manager.record_local_activity(client_order_id, ts_now);
1465
1466        assert_eq!(
1467            manager
1468                .order_local_activity_ns
1469                .get(&client_order_id)
1470                .copied(),
1471            Some(ts_now)
1472        );
1473
1474        manager.clear_recon_tracking(&client_order_id, true);
1475        assert!(!manager.inflight_checks.contains_key(&client_order_id));
1476        assert!(
1477            !manager
1478                .order_local_activity_ns
1479                .contains_key(&client_order_id)
1480        );
1481        assert!(!manager.recon_check_retries.contains_key(&client_order_id));
1482        assert!(!manager.ts_last_query.contains_key(&client_order_id));
1483    }
1484
1485    #[tokio::test]
1486    async fn test_reconcile_execution_mass_status_with_empty() {
1487        let mut manager = create_test_manager();
1488        let account_id = AccountId::from("ACCOUNT-001");
1489        let venue = Venue::from("BINANCE");
1490
1491        let client_id = ClientId::from("BINANCE");
1492        let mass_status = ExecutionMassStatus::new(
1493            client_id,
1494            account_id,
1495            venue,
1496            UnixNanos::default(),
1497            Some(UUID4::new()),
1498        );
1499
1500        let events = manager.reconcile_execution_mass_status(mass_status).await;
1501        assert_eq!(events.len(), 0);
1502    }
1503
1504    #[rstest]
1505    fn test_reconciliation_config_default() {
1506        let config = ExecutionManagerConfig::default();
1507
1508        assert_eq!(config.lookback_mins, Some(60));
1509        assert_eq!(config.inflight_threshold_ms, 5000);
1510        assert_eq!(config.inflight_max_retries, 5);
1511        assert!(!config.filter_unclaimed_external);
1512        assert!(config.generate_missing_orders);
1513    }
1514
1515    #[rstest]
1516    fn test_create_order_fill_deduplicates_by_trade_id() {
1517        let mut manager = create_test_manager();
1518        let instrument = audusd_sim();
1519        let mut order = OrderTestBuilder::new(OrderType::Market)
1520            .instrument_id(instrument.id())
1521            .side(OrderSide::Buy)
1522            .quantity(Quantity::from(100_000))
1523            .build();
1524        let trade_id = TradeId::from("T-001");
1525        let fill = FillReport::new(
1526            AccountId::from("SIM-001"),
1527            instrument.id(),
1528            VenueOrderId::from("V-001"),
1529            trade_id,
1530            OrderSide::Buy,
1531            Quantity::from(100_000),
1532            Price::from("1.00000"),
1533            Money::from("1.00 USD"),
1534            LiquiditySide::Maker,
1535            Some(ClientOrderId::from("O-123456")),
1536            None,
1537            UnixNanos::from(1_000_000_000),
1538            UnixNanos::from(1_000_000_000),
1539            None,
1540        );
1541        let event1 = manager.create_order_fill(&mut order, &fill, &InstrumentAny::from(instrument));
1542        assert!(event1.is_some());
1543
1544        // Same trade_id should be skipped
1545        let event2 = manager.create_order_fill(&mut order, &fill, &InstrumentAny::from(instrument));
1546        assert!(event2.is_none());
1547    }
1548
1549    #[rstest]
1550    fn test_handle_external_order_uses_claimed_strategy() {
1551        let mut manager = create_test_manager();
1552        let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1553        let strategy_id = StrategyId::from("STRATEGY-001");
1554        let account_id = AccountId::from("BINANCE-001");
1555        let venue_order_id = VenueOrderId::from("V-EXT-001");
1556        manager.claim_external_orders(instrument_id, strategy_id);
1557        let report = OrderStatusReport::new(
1558            account_id,
1559            instrument_id,
1560            None, // No client_order_id (external)
1561            venue_order_id,
1562            OrderSide::Buy,
1563            OrderType::Limit,
1564            TimeInForce::Gtc,
1565            OrderStatus::Accepted,
1566            Quantity::from(1),
1567            Quantity::from(0),
1568            UnixNanos::from(1_000_000),
1569            UnixNanos::from(1_000_000),
1570            UnixNanos::from(1_000_000),
1571            None,
1572        )
1573        .with_price(Price::from("50000.00"));
1574        let events = manager.handle_external_order(&report, &account_id);
1575
1576        // Initialized consumed internally, only Accepted returned
1577        assert_eq!(events.len(), 1);
1578        assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1579        let client_order_id = ClientOrderId::from(venue_order_id.as_str());
1580        let order = manager.cache.borrow().order(&client_order_id).cloned();
1581        assert!(order.is_some());
1582        assert_eq!(order.unwrap().strategy_id(), strategy_id);
1583    }
1584
1585    #[rstest]
1586    fn test_handle_external_order_uses_external_strategy_when_unclaimed() {
1587        let mut manager = create_test_manager();
1588        let instrument_id = InstrumentId::from("ETHUSDT.BINANCE");
1589        let account_id = AccountId::from("BINANCE-001");
1590        let venue_order_id = VenueOrderId::from("V-EXT-002");
1591        let report = OrderStatusReport::new(
1592            account_id,
1593            instrument_id,
1594            None, // No client_order_id (external)
1595            venue_order_id,
1596            OrderSide::Sell,
1597            OrderType::Limit,
1598            TimeInForce::Gtc,
1599            OrderStatus::Accepted,
1600            Quantity::from(1),
1601            Quantity::from(0),
1602            UnixNanos::from(1_000_000),
1603            UnixNanos::from(1_000_000),
1604            UnixNanos::from(1_000_000),
1605            None,
1606        )
1607        .with_price(Price::from("3000.00"));
1608        let events = manager.handle_external_order(&report, &account_id);
1609        assert_eq!(events.len(), 1);
1610        assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1611        let client_order_id = ClientOrderId::from(venue_order_id.as_str());
1612        let order = manager.cache.borrow().order(&client_order_id).cloned();
1613        assert!(order.is_some());
1614        let order = order.unwrap();
1615        assert_eq!(order.strategy_id(), StrategyId::from("EXTERNAL"));
1616        assert!(
1617            order
1618                .tags()
1619                .is_some_and(|t| t.iter().any(|s| s.as_str() == "VENUE"))
1620        );
1621    }
1622
1623    #[rstest]
1624    fn test_external_order_canceled_generates_accepted_and_canceled() {
1625        let mut manager = create_test_manager();
1626        let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1627        let account_id = AccountId::from("BINANCE-001");
1628        let report = OrderStatusReport::new(
1629            account_id,
1630            instrument_id,
1631            None, // No client_order_id (external)
1632            VenueOrderId::from("V-EXT-003"),
1633            OrderSide::Buy,
1634            OrderType::Limit,
1635            TimeInForce::Gtc,
1636            OrderStatus::Canceled,
1637            Quantity::from(1),
1638            Quantity::from(0),
1639            UnixNanos::from(1_000_000),
1640            UnixNanos::from(1_000_000),
1641            UnixNanos::from(1_000_000),
1642            None,
1643        )
1644        .with_price(Price::from("50000.00"));
1645        let events = manager.handle_external_order(&report, &account_id);
1646        assert_eq!(events.len(), 2);
1647        assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1648        assert!(matches!(events[1], OrderEventAny::Canceled(_)));
1649    }
1650
1651    #[rstest]
1652    fn test_external_order_expired_generates_accepted_and_expired() {
1653        let mut manager = create_test_manager();
1654        let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1655        let account_id = AccountId::from("BINANCE-001");
1656        let report = OrderStatusReport::new(
1657            account_id,
1658            instrument_id,
1659            None, // No client_order_id (external)
1660            VenueOrderId::from("V-EXT-004"),
1661            OrderSide::Buy,
1662            OrderType::Limit,
1663            TimeInForce::Gtc,
1664            OrderStatus::Expired,
1665            Quantity::from(1),
1666            Quantity::from(0),
1667            UnixNanos::from(1_000_000),
1668            UnixNanos::from(1_000_000),
1669            UnixNanos::from(1_000_000),
1670            None,
1671        )
1672        .with_price(Price::from("50000.00"));
1673        let events = manager.handle_external_order(&report, &account_id);
1674        assert_eq!(events.len(), 2);
1675        assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1676        assert!(matches!(events[1], OrderEventAny::Expired(_)));
1677    }
1678}