nautilus_live/
reconciliation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Reconciliation managers for live execution state.
17//!
18//! This module provides managers for reconciling execution state between
19//! the local cache and connected venues during live trading.
20
21use std::{
22    cell::RefCell,
23    collections::{HashMap, HashSet},
24    fmt::Debug,
25    rc::Rc,
26};
27
28use nautilus_common::{cache::Cache, clock::Clock};
29use nautilus_core::{UUID4, UnixNanos};
30use nautilus_model::{
31    enums::OrderStatus,
32    events::{
33        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
34        OrderTriggered,
35    },
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    orders::{Order, OrderAny},
39    reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
40    types::Quantity,
41};
42use ustr::Ustr;
43
44/// Configuration for reconciliation manager.
45#[derive(Debug, Clone)]
46pub struct ReconciliationConfig {
47    /// Number of minutes to look back during reconciliation.
48    pub lookback_mins: Option<u64>,
49    /// Threshold in milliseconds for inflight order checks.
50    pub inflight_threshold_ms: u64,
51    /// Maximum number of retries for inflight checks.
52    pub inflight_max_retries: u32,
53    /// Whether to filter unclaimed external orders.
54    pub filter_unclaimed_external: bool,
55    /// Whether to generate missing orders from reports.
56    pub generate_missing_orders: bool,
57    /// Client order IDs excluded from reconciliation.
58    pub filtered_client_order_ids: HashSet<ClientOrderId>,
59    /// Threshold in nanoseconds before acting on venue discrepancies for open orders.
60    pub open_check_threshold_ns: u64,
61    /// Maximum retries before resolving an open order missing at the venue.
62    pub open_check_missing_retries: u32,
63    /// Whether open-order polling should only request open orders from the venue.
64    pub open_check_open_only: bool,
65    /// Lookback window (minutes) for venue order status polling.
66    pub open_check_lookback_mins: Option<u64>,
67    /// Whether to filter position status reports during reconciliation.
68    pub filter_position_reports: bool,
69    /// Instrument IDs to include during reconciliation (empty => all).
70    pub reconciliation_instrument_ids: HashSet<InstrumentId>,
71}
72
73impl Default for ReconciliationConfig {
74    fn default() -> Self {
75        Self {
76            lookback_mins: Some(60),
77            inflight_threshold_ms: 5000,
78            inflight_max_retries: 5,
79            filter_unclaimed_external: false,
80            generate_missing_orders: true,
81            filtered_client_order_ids: HashSet::new(),
82            open_check_threshold_ns: 5_000_000_000,
83            open_check_missing_retries: 5,
84            open_check_open_only: true,
85            open_check_lookback_mins: Some(60),
86            filter_position_reports: false,
87            reconciliation_instrument_ids: HashSet::new(),
88        }
89    }
90}
91
92/// Execution report for continuous reconciliation.
93/// This is a simplified report type used during runtime reconciliation.
94#[derive(Debug, Clone)]
95pub struct ExecutionReport {
96    pub client_order_id: ClientOrderId,
97    pub venue_order_id: Option<VenueOrderId>,
98    pub status: OrderStatus,
99    pub filled_qty: Quantity,
100    pub avg_px: Option<f64>,
101    pub ts_event: UnixNanos,
102}
103
104/// Information about an inflight order check.
105#[derive(Debug, Clone)]
106struct InflightCheck {
107    #[allow(dead_code)]
108    pub client_order_id: ClientOrderId,
109    pub ts_submitted: UnixNanos,
110    pub retry_count: u32,
111    pub last_query_ts: Option<UnixNanos>,
112}
113
114/// Manager for reconciling execution state between local cache and venues.
115///
116/// The `ReconciliationManager` handles:
117/// - Startup reconciliation to align state on system start
118/// - Continuous reconciliation of inflight orders
119/// - External order discovery and claiming
120/// - Fill report processing and validation
121#[derive(Clone)]
122pub struct ReconciliationManager {
123    clock: Rc<RefCell<dyn Clock>>,
124    cache: Rc<RefCell<Cache>>,
125    config: ReconciliationConfig,
126    inflight_checks: HashMap<ClientOrderId, InflightCheck>,
127    external_order_claims: HashMap<InstrumentId, StrategyId>,
128    processed_fills: HashMap<TradeId, ClientOrderId>,
129    recon_check_retries: HashMap<ClientOrderId, u32>,
130    ts_last_query: HashMap<ClientOrderId, UnixNanos>,
131    order_local_activity_ns: HashMap<ClientOrderId, UnixNanos>,
132}
133
134impl Debug for ReconciliationManager {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        f.debug_struct(stringify!(ReconciliationManager))
137            .field("config", &self.config)
138            .field("inflight_checks", &self.inflight_checks)
139            .field("external_order_claims", &self.external_order_claims)
140            .field("processed_fills", &self.processed_fills)
141            .field("recon_check_retries", &self.recon_check_retries)
142            .finish()
143    }
144}
145
146impl ReconciliationManager {
147    /// Creates a new [`ReconciliationManager`] instance.
148    pub fn new(
149        clock: Rc<RefCell<dyn Clock>>,
150        cache: Rc<RefCell<Cache>>,
151        config: ReconciliationConfig,
152    ) -> Self {
153        Self {
154            clock,
155            cache,
156            config,
157            inflight_checks: HashMap::new(),
158            external_order_claims: HashMap::new(),
159            processed_fills: HashMap::new(),
160            recon_check_retries: HashMap::new(),
161            ts_last_query: HashMap::new(),
162            order_local_activity_ns: HashMap::new(),
163        }
164    }
165
166    /// Reconciles orders and fills from a mass status report.
167    pub async fn reconcile_execution_mass_status(
168        &mut self,
169        mass_status: ExecutionMassStatus,
170    ) -> Vec<OrderEventAny> {
171        let mut events = Vec::new();
172
173        // Process order status reports first
174        for report in mass_status.order_reports().values() {
175            if let Some(client_order_id) = &report.client_order_id {
176                if let Some(order) = self.get_order(client_order_id) {
177                    let mut order = order;
178                    if let Some(event) = self.reconcile_order_report(&mut order, report) {
179                        events.push(event);
180                    }
181                }
182            } else if !self.config.filter_unclaimed_external
183                && let Some(event) = self.handle_external_order(report, &mass_status.account_id)
184            {
185                events.push(event);
186            }
187        }
188
189        // Process fill reports
190        for fills in mass_status.fill_reports().values() {
191            for fill in fills {
192                if let Some(client_order_id) = &fill.client_order_id
193                    && let Some(order) = self.get_order(client_order_id)
194                {
195                    let mut order = order;
196                    // Get instrument for the order
197                    let instrument_id = order.instrument_id();
198                    if let Some(instrument) = self.get_instrument(&instrument_id)
199                        && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
200                    {
201                        events.push(event);
202                    }
203                }
204            }
205        }
206
207        events
208    }
209
210    /// Reconciles a single execution report during runtime.
211    pub fn reconcile_report(&mut self, report: ExecutionReport) -> Vec<OrderEventAny> {
212        let mut events = Vec::new();
213
214        // Remove from inflight checks if present
215        self.clear_recon_tracking(&report.client_order_id, true);
216
217        if let Some(order) = self.get_order(&report.client_order_id) {
218            let mut order = order;
219            // Create an OrderStatusReport from the ExecutionReport
220            let order_report = OrderStatusReport::new(
221                order.account_id().unwrap_or_default(),
222                order.instrument_id(),
223                Some(report.client_order_id),
224                report.venue_order_id.unwrap_or_default(),
225                order.order_side(),
226                order.order_type(),
227                order.time_in_force(),
228                report.status,
229                order.quantity(),
230                report.filled_qty,
231                report.ts_event, // Use ts_event as ts_accepted
232                report.ts_event, // Use ts_event as ts_last
233                self.clock.borrow().timestamp_ns(),
234                Some(UUID4::new()),
235            )
236            .with_avg_px(report.avg_px.unwrap_or(0.0));
237
238            if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
239                events.push(event);
240            }
241        }
242
243        events
244    }
245
246    /// Checks inflight orders and returns events for any that need reconciliation.
247    pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
248        let mut events = Vec::new();
249        let current_time = self.clock.borrow().timestamp_ns();
250        let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
251
252        let mut to_check = Vec::new();
253        for (client_order_id, check) in &self.inflight_checks {
254            if current_time - check.ts_submitted > threshold_ns {
255                to_check.push(*client_order_id);
256            }
257        }
258
259        for client_order_id in to_check {
260            if self
261                .config
262                .filtered_client_order_ids
263                .contains(&client_order_id)
264            {
265                continue;
266            }
267
268            if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
269                if let Some(last_query_ts) = check.last_query_ts
270                    && current_time - last_query_ts < threshold_ns
271                {
272                    continue;
273                }
274
275                check.retry_count += 1;
276                check.last_query_ts = Some(current_time);
277                self.ts_last_query.insert(client_order_id, current_time);
278                self.recon_check_retries
279                    .insert(client_order_id, check.retry_count);
280
281                if check.retry_count >= self.config.inflight_max_retries {
282                    // Generate rejection after max retries
283                    if let Some(order) = self.get_order(&client_order_id) {
284                        events.push(self.create_order_rejected(&order, Some("INFLIGHT_TIMEOUT")));
285                    }
286                    // Remove from inflight checks regardless of whether order exists
287                    self.clear_recon_tracking(&client_order_id, true);
288                }
289            }
290        }
291
292        events
293    }
294
295    /// Checks open orders against the venue state.
296    pub async fn check_open_orders(&mut self) -> Vec<OrderEventAny> {
297        // This would need to query the venue for open orders
298        // and reconcile any discrepancies
299        Vec::new()
300    }
301
302    /// Registers an order as inflight for tracking.
303    pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
304        let ts_submitted = self.clock.borrow().timestamp_ns();
305        self.inflight_checks.insert(
306            client_order_id,
307            InflightCheck {
308                client_order_id,
309                ts_submitted,
310                retry_count: 0,
311                last_query_ts: None,
312            },
313        );
314        self.recon_check_retries.insert(client_order_id, 0);
315        self.ts_last_query.remove(&client_order_id);
316        self.order_local_activity_ns.remove(&client_order_id);
317    }
318
319    /// Records local activity for the specified order.
320    pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
321        self.order_local_activity_ns
322            .insert(client_order_id, ts_event);
323    }
324
325    /// Clears reconciliation tracking state for an order.
326    pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
327        self.inflight_checks.remove(client_order_id);
328        self.recon_check_retries.remove(client_order_id);
329        if drop_last_query {
330            self.ts_last_query.remove(client_order_id);
331        }
332        self.order_local_activity_ns.remove(client_order_id);
333    }
334
335    /// Claims external orders for a specific strategy and instrument.
336    pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
337        self.external_order_claims
338            .insert(instrument_id, strategy_id);
339    }
340
341    // Private helper methods
342
343    fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
344        self.cache.borrow().order(client_order_id).cloned()
345    }
346
347    fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
348        self.cache.borrow().instrument(instrument_id).cloned()
349    }
350
351    fn reconcile_order_report(
352        &mut self,
353        order: &mut OrderAny,
354        report: &OrderStatusReport,
355    ) -> Option<OrderEventAny> {
356        // Check if reconciliation is needed
357        if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
358            return None; // Already in sync
359        }
360
361        // Generate appropriate event based on status
362        match report.order_status {
363            OrderStatus::Accepted => Some(self.create_order_accepted(order, report)),
364            OrderStatus::Rejected => {
365                Some(self.create_order_rejected(order, report.cancel_reason.as_deref()))
366            }
367            OrderStatus::Triggered => Some(self.create_order_triggered(order, report)),
368            OrderStatus::Canceled => Some(self.create_order_canceled(order, report)),
369            OrderStatus::Expired => Some(self.create_order_expired(order, report)),
370            _ => None,
371        }
372    }
373
374    fn handle_external_order(
375        &self,
376        _report: &OrderStatusReport,
377        _account_id: &AccountId,
378    ) -> Option<OrderEventAny> {
379        // This would need to create a new order from the report
380        // For now, we'll skip external order handling
381        None
382    }
383
384    fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
385        OrderEventAny::Accepted(OrderAccepted::new(
386            order.trader_id(),
387            order.strategy_id(),
388            order.instrument_id(),
389            order.client_order_id(),
390            order.venue_order_id().unwrap_or(report.venue_order_id),
391            order.account_id().unwrap_or_default(),
392            UUID4::new(),
393            report.ts_accepted,
394            self.clock.borrow().timestamp_ns(),
395            false,
396        ))
397    }
398
399    fn create_order_rejected(&self, order: &OrderAny, reason: Option<&str>) -> OrderEventAny {
400        let reason = reason.unwrap_or("UNKNOWN");
401        OrderEventAny::Rejected(OrderRejected::new(
402            order.trader_id(),
403            order.strategy_id(),
404            order.instrument_id(),
405            order.client_order_id(),
406            order.account_id().unwrap_or_default(),
407            Ustr::from(reason),
408            UUID4::new(),
409            self.clock.borrow().timestamp_ns(),
410            self.clock.borrow().timestamp_ns(),
411            false,
412            false, // due_post_only
413        ))
414    }
415
416    fn create_order_triggered(
417        &self,
418        order: &OrderAny,
419        report: &OrderStatusReport,
420    ) -> OrderEventAny {
421        OrderEventAny::Triggered(OrderTriggered::new(
422            order.trader_id(),
423            order.strategy_id(),
424            order.instrument_id(),
425            order.client_order_id(),
426            UUID4::new(),
427            report
428                .ts_triggered
429                .unwrap_or(self.clock.borrow().timestamp_ns()),
430            self.clock.borrow().timestamp_ns(),
431            false,
432            order.venue_order_id(),
433            order.account_id(),
434        ))
435    }
436
437    fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
438        OrderEventAny::Canceled(OrderCanceled::new(
439            order.trader_id(),
440            order.strategy_id(),
441            order.instrument_id(),
442            order.client_order_id(),
443            UUID4::new(),
444            report.ts_last,
445            self.clock.borrow().timestamp_ns(),
446            false,
447            order.venue_order_id(),
448            order.account_id(),
449        ))
450    }
451
452    #[allow(dead_code)]
453    fn create_order_canceled_simple(&self, order: &OrderAny, ts_event: UnixNanos) -> OrderEventAny {
454        OrderEventAny::Canceled(OrderCanceled::new(
455            order.trader_id(),
456            order.strategy_id(),
457            order.instrument_id(),
458            order.client_order_id(),
459            UUID4::new(),
460            ts_event,
461            self.clock.borrow().timestamp_ns(),
462            false,
463            order.venue_order_id(),
464            order.account_id(),
465        ))
466    }
467
468    fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
469        OrderEventAny::Expired(OrderExpired::new(
470            order.trader_id(),
471            order.strategy_id(),
472            order.instrument_id(),
473            order.client_order_id(),
474            UUID4::new(),
475            report.ts_last,
476            self.clock.borrow().timestamp_ns(),
477            false,
478            order.venue_order_id(),
479            order.account_id(),
480        ))
481    }
482
483    fn create_order_fill(
484        &mut self,
485        order: &mut OrderAny,
486        fill: &FillReport,
487        instrument: &InstrumentAny,
488    ) -> Option<OrderEventAny> {
489        // Check if this fill was already processed
490        if self.processed_fills.contains_key(&fill.trade_id) {
491            return None;
492        }
493
494        // Mark this fill as processed
495        self.processed_fills
496            .insert(fill.trade_id, order.client_order_id());
497
498        Some(OrderEventAny::Filled(OrderFilled::new(
499            order.trader_id(),
500            order.strategy_id(),
501            order.instrument_id(),
502            order.client_order_id(),
503            fill.venue_order_id,
504            order.account_id().unwrap_or_default(),
505            fill.trade_id,
506            fill.order_side,
507            order.order_type(),
508            fill.last_qty,
509            fill.last_px,
510            instrument.quote_currency(),
511            fill.liquidity_side,
512            fill.report_id,
513            fill.ts_event,
514            self.clock.borrow().timestamp_ns(),
515            false,
516            fill.venue_position_id,
517            Some(fill.commission),
518        )))
519    }
520}
521
522////////////////////////////////////////////////////////////////////////////////
523// Tests
524////////////////////////////////////////////////////////////////////////////////
525
526#[cfg(test)]
527mod tests {
528    use std::{cell::RefCell, rc::Rc};
529
530    use nautilus_common::{cache::Cache, clock::TestClock};
531    use nautilus_core::{UUID4, UnixNanos};
532    use nautilus_model::{
533        enums::OrderStatus,
534        identifiers::{AccountId, ClientId, ClientOrderId, VenueOrderId},
535        reports::ExecutionMassStatus,
536        types::Quantity,
537    };
538    use rstest::rstest;
539
540    use super::*;
541
542    fn create_test_manager() -> ReconciliationManager {
543        let clock = Rc::new(RefCell::new(TestClock::new()));
544        let cache = Rc::new(RefCell::new(Cache::default()));
545        let config = ReconciliationConfig::default();
546        ReconciliationManager::new(clock, cache, config)
547    }
548
549    #[rstest]
550    fn test_reconciliation_manager_new() {
551        let manager = create_test_manager();
552        assert_eq!(manager.inflight_checks.len(), 0);
553        assert_eq!(manager.external_order_claims.len(), 0);
554        assert_eq!(manager.processed_fills.len(), 0);
555    }
556
557    #[rstest]
558    fn test_register_inflight() {
559        let mut manager = create_test_manager();
560        let client_order_id = ClientOrderId::from("O-123456");
561
562        manager.register_inflight(client_order_id);
563
564        assert_eq!(manager.inflight_checks.len(), 1);
565        assert!(manager.inflight_checks.contains_key(&client_order_id));
566    }
567
568    #[rstest]
569    fn test_claim_external_orders() {
570        let mut manager = create_test_manager();
571        let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
572        let strategy_id = StrategyId::from("STRATEGY-001");
573
574        manager.claim_external_orders(instrument_id, strategy_id);
575
576        assert_eq!(manager.external_order_claims.len(), 1);
577        assert_eq!(
578            manager.external_order_claims.get(&instrument_id),
579            Some(&strategy_id)
580        );
581    }
582
583    #[rstest]
584    fn test_reconcile_report_removes_from_inflight() {
585        let mut manager = create_test_manager();
586        let client_order_id = ClientOrderId::from("O-123456");
587
588        // Register as inflight
589        manager.register_inflight(client_order_id);
590        assert_eq!(manager.inflight_checks.len(), 1);
591
592        // Create execution report
593        let report = ExecutionReport {
594            client_order_id,
595            venue_order_id: Some(VenueOrderId::from("V-123456")),
596            status: OrderStatus::Accepted,
597            filled_qty: Quantity::from(0),
598            avg_px: None,
599            ts_event: UnixNanos::default(),
600        };
601
602        // Reconcile should remove from inflight checks
603        manager.reconcile_report(report);
604        assert_eq!(manager.inflight_checks.len(), 0);
605    }
606
607    #[rstest]
608    fn test_check_inflight_orders_generates_rejection_after_max_retries() {
609        let clock = Rc::new(RefCell::new(TestClock::new()));
610        let cache = Rc::new(RefCell::new(Cache::default()));
611        let config = ReconciliationConfig {
612            inflight_threshold_ms: 100,
613            inflight_max_retries: 2,
614            ..ReconciliationConfig::default()
615        };
616        let mut manager = ReconciliationManager::new(clock.clone(), cache.clone(), config);
617
618        let client_order_id = ClientOrderId::from("O-123456");
619        manager.register_inflight(client_order_id);
620
621        // First check - should increment retry count
622        clock
623            .borrow_mut()
624            .advance_time(UnixNanos::from(200_000_000), true);
625        let events = manager.check_inflight_orders();
626        assert_eq!(events.len(), 0);
627        let first_check = manager
628            .inflight_checks
629            .get(&client_order_id)
630            .expect("inflight check present");
631        assert_eq!(first_check.retry_count, 1);
632        let first_query_ts = first_check.last_query_ts.expect("last query recorded");
633
634        // Second check - should hit max retries and generate rejection
635        clock
636            .borrow_mut()
637            .advance_time(UnixNanos::from(400_000_000), true);
638        let events = manager.check_inflight_orders();
639        assert_eq!(events.len(), 0); // Would generate rejection if order existed in cache
640        assert!(!manager.inflight_checks.contains_key(&client_order_id));
641        // Ensure last query timestamp progressed prior to removal
642        assert!(clock.borrow().timestamp_ns() > first_query_ts);
643    }
644
645    #[rstest]
646    fn test_check_inflight_orders_skips_recent_query() {
647        let clock = Rc::new(RefCell::new(TestClock::new()));
648        let cache = Rc::new(RefCell::new(Cache::default()));
649        let config = ReconciliationConfig {
650            inflight_threshold_ms: 100,
651            inflight_max_retries: 3,
652            ..ReconciliationConfig::default()
653        };
654        let mut manager = ReconciliationManager::new(clock.clone(), cache, config);
655
656        let client_order_id = ClientOrderId::from("O-ABCDEF");
657        manager.register_inflight(client_order_id);
658
659        // First pass triggers a venue query and records timestamp
660        clock
661            .borrow_mut()
662            .advance_time(UnixNanos::from(200_000_000), true);
663        let events = manager.check_inflight_orders();
664        assert!(events.is_empty());
665        let initial_check = manager
666            .inflight_checks
667            .get(&client_order_id)
668            .expect("inflight check retained");
669        assert_eq!(initial_check.retry_count, 1);
670        let last_query_ts = initial_check.last_query_ts.expect("last query recorded");
671
672        // Subsequent pass within threshold should be skipped entirely
673        clock
674            .borrow_mut()
675            .advance_time(UnixNanos::from(250_000_000), true);
676        let events = manager.check_inflight_orders();
677        assert!(events.is_empty());
678        let second_check = manager
679            .inflight_checks
680            .get(&client_order_id)
681            .expect("inflight check retained");
682        assert_eq!(second_check.retry_count, 1);
683        assert_eq!(second_check.last_query_ts, Some(last_query_ts));
684    }
685
686    #[rstest]
687    fn test_check_inflight_orders_skips_filtered_ids() {
688        let clock = Rc::new(RefCell::new(TestClock::new()));
689        let cache = Rc::new(RefCell::new(Cache::default()));
690        let filtered_id = ClientOrderId::from("O-FILTERED");
691        let mut config = ReconciliationConfig::default();
692        config.filtered_client_order_ids.insert(filtered_id);
693        config.inflight_threshold_ms = 100;
694        let mut manager = ReconciliationManager::new(clock.clone(), cache, config);
695
696        manager.register_inflight(filtered_id);
697        clock
698            .borrow_mut()
699            .advance_time(UnixNanos::from(200_000_000), true);
700        let events = manager.check_inflight_orders();
701        assert!(events.is_empty());
702        assert!(manager.inflight_checks.contains_key(&filtered_id));
703    }
704
705    #[rstest]
706    fn test_record_and_clear_tracking() {
707        let mut manager = create_test_manager();
708        let client_order_id = ClientOrderId::from("O-TRACK");
709
710        manager.register_inflight(client_order_id);
711        let ts_now = UnixNanos::from(1_000_000);
712        manager.record_local_activity(client_order_id, ts_now);
713
714        assert_eq!(
715            manager
716                .order_local_activity_ns
717                .get(&client_order_id)
718                .copied(),
719            Some(ts_now)
720        );
721
722        manager.clear_recon_tracking(&client_order_id, true);
723        assert!(!manager.inflight_checks.contains_key(&client_order_id));
724        assert!(
725            !manager
726                .order_local_activity_ns
727                .contains_key(&client_order_id)
728        );
729        assert!(!manager.recon_check_retries.contains_key(&client_order_id));
730        assert!(!manager.ts_last_query.contains_key(&client_order_id));
731    }
732
733    #[tokio::test]
734    async fn test_reconcile_execution_mass_status_with_empty() {
735        let mut manager = create_test_manager();
736        let account_id = AccountId::from("ACCOUNT-001");
737        let venue = nautilus_model::identifiers::Venue::from("BINANCE");
738
739        let client_id = ClientId::from("BINANCE");
740        let mass_status = ExecutionMassStatus::new(
741            client_id,
742            account_id,
743            venue,
744            UnixNanos::default(),
745            Some(UUID4::new()),
746        );
747
748        let events = manager.reconcile_execution_mass_status(mass_status).await;
749        assert_eq!(events.len(), 0);
750    }
751
752    #[rstest]
753    fn test_reconciliation_config_default() {
754        let config = ReconciliationConfig::default();
755
756        assert_eq!(config.lookback_mins, Some(60));
757        assert_eq!(config.inflight_threshold_ms, 5000);
758        assert_eq!(config.inflight_max_retries, 5);
759        assert!(!config.filter_unclaimed_external);
760        assert!(config.generate_missing_orders);
761    }
762}