Skip to main content

nautilus_live/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Execution state manager for live trading.
17//!
18//! This module provides the execution manager for reconciling execution state between
19//! the local cache and connected venues, as well as purging old state during live trading.
20
21use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr, sync::LazyLock};
22
23use ahash::{AHashMap, AHashSet};
24use indexmap::IndexMap;
25use nautilus_common::{
26    cache::Cache,
27    clients::ExecutionClient,
28    clock::Clock,
29    enums::{LogColor, LogLevel},
30    log_info,
31    messages::execution::report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
32};
33use nautilus_core::{
34    UUID4, UnixNanos,
35    datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, nanos_to_millis},
36};
37use nautilus_execution::{
38    engine::ExecutionEngine,
39    reconciliation::{
40        calculate_reconciliation_price, create_inferred_fill_for_qty,
41        create_reconciliation_rejected, create_reconciliation_triggered,
42        create_synthetic_venue_order_id, generate_external_order_status_events,
43        process_mass_status_for_reconciliation, reconcile_order_report,
44        should_reconciliation_update,
45    },
46};
47use nautilus_model::{
48    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
49    events::{OrderEventAny, OrderFilled, OrderInitialized},
50    identifiers::{
51        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
52        VenueOrderId,
53    },
54    instruments::{Instrument, InstrumentAny},
55    orders::{Order, OrderAny},
56    position::Position,
57    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
58    types::Quantity,
59};
60use rust_decimal::{Decimal, prelude::ToPrimitive};
61use ustr::Ustr;
62
63use crate::config::LiveExecEngineConfig;
64
65/// Tag for orders originating from venue (external orders).
66static TAG_VENUE: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("VENUE"));
67
68/// Tag for orders generated by reconciliation logic (synthetic orders).
69static TAG_RECONCILIATION: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("RECONCILIATION"));
70
71/// Metadata for an external order that needs to be registered with the execution client.
72#[derive(Debug, Clone)]
73pub struct ExternalOrderMetadata {
74    pub client_order_id: ClientOrderId,
75    pub venue_order_id: VenueOrderId,
76    pub instrument_id: InstrumentId,
77    pub strategy_id: StrategyId,
78    pub ts_init: UnixNanos,
79}
80
81/// Result of reconciliation containing events and external order metadata.
82#[derive(Debug, Default)]
83pub struct ReconciliationResult {
84    /// Order events generated during reconciliation.
85    pub events: Vec<OrderEventAny>,
86    /// External orders that need to be registered with execution clients.
87    pub external_orders: Vec<ExternalOrderMetadata>,
88}
89
90/// Configuration for execution manager.
91#[derive(Debug, Clone)]
92pub struct ExecutionManagerConfig {
93    /// The trader ID for generated orders.
94    pub trader_id: TraderId,
95    /// If reconciliation is active at start-up.
96    pub reconciliation: bool,
97    /// The delay (seconds) before starting reconciliation at startup.
98    pub reconciliation_startup_delay_secs: f64,
99    /// Number of minutes to look back during reconciliation.
100    pub lookback_mins: Option<u64>,
101    /// Instrument IDs to include during reconciliation (empty => all).
102    pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
103    /// Whether to filter unclaimed external orders.
104    pub filter_unclaimed_external: bool,
105    /// Whether to filter position status reports during reconciliation.
106    pub filter_position_reports: bool,
107    /// Client order IDs excluded from reconciliation.
108    pub filtered_client_order_ids: AHashSet<ClientOrderId>,
109    /// Whether to generate missing orders from reports.
110    pub generate_missing_orders: bool,
111    /// The interval (milliseconds) between checking whether in-flight orders have exceeded their threshold.
112    pub inflight_check_interval_ms: u32,
113    /// Threshold in milliseconds for inflight order checks.
114    pub inflight_threshold_ms: u64,
115    /// Maximum number of retries for inflight checks.
116    pub inflight_max_retries: u32,
117    /// The interval (seconds) between checks for open orders at the venue.
118    pub open_check_interval_secs: Option<f64>,
119    /// The lookback minutes for open order checks.
120    pub open_check_lookback_mins: Option<u64>,
121    /// Threshold in nanoseconds before acting on venue discrepancies for open orders.
122    pub open_check_threshold_ns: u64,
123    /// Maximum retries before resolving an open order missing at the venue.
124    pub open_check_missing_retries: u32,
125    /// Whether open-order polling should only request open orders from the venue.
126    pub open_check_open_only: bool,
127    /// The maximum number of single-order queries per consistency check cycle.
128    pub max_single_order_queries_per_cycle: u32,
129    /// The delay (milliseconds) between consecutive single-order queries.
130    pub single_order_query_delay_ms: u32,
131    /// The interval (seconds) between checks for open positions at the venue.
132    pub position_check_interval_secs: Option<f64>,
133    /// The lookback minutes for position consistency checks.
134    pub position_check_lookback_mins: u64,
135    /// Threshold in nanoseconds before acting on venue discrepancies for positions.
136    pub position_check_threshold_ns: u64,
137    /// The time buffer (minutes) before closed orders can be purged.
138    pub purge_closed_orders_buffer_mins: Option<u32>,
139    /// The time buffer (minutes) before closed positions can be purged.
140    pub purge_closed_positions_buffer_mins: Option<u32>,
141    /// The time buffer (minutes) before account events can be purged.
142    pub purge_account_events_lookback_mins: Option<u32>,
143    /// If purge operations should also delete from the backing database.
144    pub purge_from_database: bool,
145}
146
147impl Default for ExecutionManagerConfig {
148    fn default() -> Self {
149        Self {
150            trader_id: TraderId::default(),
151            reconciliation: true,
152            reconciliation_startup_delay_secs: 10.0,
153            lookback_mins: Some(60),
154            reconciliation_instrument_ids: AHashSet::new(),
155            filter_unclaimed_external: false,
156            filter_position_reports: false,
157            filtered_client_order_ids: AHashSet::new(),
158            generate_missing_orders: true,
159            inflight_check_interval_ms: 2_000,
160            inflight_threshold_ms: 5_000,
161            inflight_max_retries: 5,
162            open_check_interval_secs: None,
163            open_check_lookback_mins: Some(60),
164            open_check_threshold_ns: 5_000_000_000,
165            open_check_missing_retries: 5,
166            open_check_open_only: true,
167            max_single_order_queries_per_cycle: 5,
168            single_order_query_delay_ms: 100,
169            position_check_interval_secs: None,
170            position_check_lookback_mins: 60,
171            position_check_threshold_ns: 60_000_000_000,
172            purge_closed_orders_buffer_mins: None,
173            purge_closed_positions_buffer_mins: None,
174            purge_account_events_lookback_mins: None,
175            purge_from_database: false,
176        }
177    }
178}
179
180impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
181    fn from(config: &LiveExecEngineConfig) -> Self {
182        let filtered_client_order_ids: AHashSet<ClientOrderId> = config
183            .filtered_client_order_ids
184            .clone()
185            .unwrap_or_default()
186            .into_iter()
187            .map(|value| ClientOrderId::from(value.as_str()))
188            .collect();
189
190        let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
191            .reconciliation_instrument_ids
192            .clone()
193            .unwrap_or_default()
194            .into_iter()
195            .map(InstrumentId::from)
196            .collect();
197
198        let open_check_threshold_ns =
199            (config.open_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
200        let position_check_threshold_ns =
201            (config.position_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
202
203        Self {
204            trader_id: TraderId::default(), // Must be set separately via with_trader_id
205            reconciliation: config.reconciliation,
206            reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
207            lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
208            reconciliation_instrument_ids,
209            filter_unclaimed_external: config.filter_unclaimed_external_orders,
210            filter_position_reports: config.filter_position_reports,
211            filtered_client_order_ids,
212            generate_missing_orders: config.generate_missing_orders,
213            inflight_check_interval_ms: config.inflight_check_interval_ms,
214            inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
215            inflight_max_retries: config.inflight_check_retries,
216            open_check_interval_secs: config.open_check_interval_secs,
217            open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
218            open_check_threshold_ns,
219            open_check_missing_retries: config.open_check_missing_retries,
220            open_check_open_only: config.open_check_open_only,
221            max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
222            single_order_query_delay_ms: config.single_order_query_delay_ms,
223            position_check_interval_secs: config.position_check_interval_secs,
224            position_check_lookback_mins: config.position_check_lookback_mins as u64,
225            position_check_threshold_ns,
226            purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
227            purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
228            purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
229            purge_from_database: config.purge_from_database,
230        }
231    }
232}
233
234impl ExecutionManagerConfig {
235    /// Sets the trader ID on the configuration.
236    #[must_use]
237    pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
238        self.trader_id = trader_id;
239        self
240    }
241}
242
243/// Execution report for continuous reconciliation.
244/// This is a simplified report type used during runtime reconciliation.
245#[derive(Debug, Clone)]
246pub struct ExecutionReport {
247    pub client_order_id: ClientOrderId,
248    pub venue_order_id: Option<VenueOrderId>,
249    pub status: OrderStatus,
250    pub filled_qty: Quantity,
251    pub avg_px: Option<f64>,
252    pub ts_event: UnixNanos,
253}
254
255/// Information about an inflight order check.
256#[derive(Debug, Clone)]
257struct InflightCheck {
258    #[allow(dead_code)]
259    pub client_order_id: ClientOrderId,
260    pub ts_submitted: UnixNanos,
261    pub retry_count: u32,
262    pub last_query_ts: Option<UnixNanos>,
263}
264
265/// Manager for execution state.
266///
267/// The `ExecutionManager` handles:
268/// - Startup reconciliation to align state on system start.
269/// - Continuous reconciliation of inflight orders.
270/// - External order discovery and claiming.
271/// - Fill report processing and validation.
272/// - Purging of old orders, positions, and account events.
273///
274/// # Thread Safety
275///
276/// This struct is **not thread-safe** and is designed for single-threaded use within
277/// an async runtime. Internal state is managed using `AHashMap` without synchronization,
278/// and the `clock` and `cache` use `Rc<RefCell<>>` which provide runtime borrow checking
279/// but no thread-safety guarantees.
280///
281/// If concurrent access is required, this struct must be wrapped in `Arc<Mutex<>>` or
282/// similar synchronization primitives. Alternatively, ensure that all methods are called
283/// from the same thread/task in the async runtime.
284///
285/// **Warning:** Concurrent mutable access to internal AHashMaps or concurrent borrows
286/// of `RefCell` contents will cause runtime panics.
287#[derive(Clone)]
288pub struct ExecutionManager {
289    clock: Rc<RefCell<dyn Clock>>,
290    cache: Rc<RefCell<Cache>>,
291    config: ExecutionManagerConfig,
292    inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
293    external_order_claims: AHashMap<InstrumentId, StrategyId>,
294    processed_fills: AHashMap<TradeId, ClientOrderId>,
295    recon_check_retries: AHashMap<ClientOrderId, u32>,
296    ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
297    order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
298    position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
299    recent_fills_cache: AHashMap<TradeId, UnixNanos>,
300}
301
302impl Debug for ExecutionManager {
303    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304        f.debug_struct(stringify!(ExecutionManager))
305            .field("config", &self.config)
306            .field("inflight_checks", &self.inflight_checks)
307            .field("external_order_claims", &self.external_order_claims)
308            .field("processed_fills", &self.processed_fills)
309            .field("recon_check_retries", &self.recon_check_retries)
310            .finish()
311    }
312}
313
314impl ExecutionManager {
315    /// Creates a new [`ExecutionManager`] instance.
316    pub fn new(
317        clock: Rc<RefCell<dyn Clock>>,
318        cache: Rc<RefCell<Cache>>,
319        config: ExecutionManagerConfig,
320    ) -> Self {
321        Self {
322            clock,
323            cache,
324            config,
325            inflight_checks: AHashMap::new(),
326            external_order_claims: AHashMap::new(),
327            processed_fills: AHashMap::new(),
328            recon_check_retries: AHashMap::new(),
329            ts_last_query: AHashMap::new(),
330            order_local_activity_ns: AHashMap::new(),
331            position_local_activity_ns: AHashMap::new(),
332            recent_fills_cache: AHashMap::new(),
333        }
334    }
335
336    /// Reconciles orders and fills from a mass status report.
337    ///
338    /// Order events are collected, sorted globally by ts_event, then processed through
339    /// the execution engine to ensure chronological ordering across all orders.
340    /// Position events are processed after all order events to ensure fills are applied first.
341    pub async fn reconcile_execution_mass_status(
342        &mut self,
343        mass_status: ExecutionMassStatus,
344        exec_engine: Rc<RefCell<ExecutionEngine>>,
345    ) -> ReconciliationResult {
346        let venue = mass_status.venue;
347        let order_count = mass_status.order_reports().len();
348        let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
349        let position_count = mass_status.position_reports().len();
350
351        log_info!(
352            "Reconciling ExecutionMassStatus for {venue}",
353            color = LogColor::Blue
354        );
355        log_info!(
356            "Received {order_count} order(s), {fill_count} fill(s), {position_count} position(s)",
357            color = LogColor::Blue
358        );
359
360        let (adjusted_order_reports, adjusted_fill_reports) =
361            self.adjust_mass_status_fills(&mass_status);
362
363        let mut events = Vec::new();
364        let mut external_orders = Vec::new();
365        let mut orders_reconciled = 0usize;
366        let mut external_orders_created = 0usize;
367        let mut open_orders_initialized = 0usize;
368        let mut orders_skipped_no_instrument = 0usize;
369        let mut orders_skipped_duplicate = 0usize;
370        let mut fills_applied = 0usize;
371
372        let fill_reports = &adjusted_fill_reports;
373        let mut seen_trade_ids: AHashSet<TradeId> = AHashSet::new();
374
375        for fills in fill_reports.values() {
376            for fill in fills {
377                if !seen_trade_ids.insert(fill.trade_id) {
378                    log::warn!("Duplicate trade_id {} in mass status", fill.trade_id);
379                }
380            }
381        }
382
383        // Deduplicate reports by venue_order_id, keeping the most advanced state
384        let order_reports = self.deduplicate_order_reports(adjusted_order_reports.values());
385        let mut orders_skipped_filtered = 0usize;
386
387        for report in order_reports.values() {
388            if self.should_skip_order_report(report) {
389                orders_skipped_filtered += 1;
390                continue;
391            }
392
393            if let Some(client_order_id) = &report.client_order_id {
394                if let Some(cached_order) = self.get_order(client_order_id)
395                    && self.is_exact_order_match(&cached_order, report)
396                {
397                    log::debug!("Skipping order {client_order_id}: already in sync with venue");
398                    orders_skipped_duplicate += 1;
399
400                    // Still ensure venue_order_id is indexed even when skipping
401                    if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
402                        client_order_id,
403                        &report.venue_order_id,
404                        false,
405                    ) {
406                        log::warn!("Failed to add venue order ID index: {e}");
407                    }
408
409                    continue;
410                }
411
412                // Skip closed reconciliation orders to prevent duplicate inferred fills on restart
413                if let Some(cached_order) = self.get_order(client_order_id)
414                    && cached_order.is_closed()
415                    && cached_order
416                        .tags()
417                        .is_some_and(|tags| tags.contains(&*TAG_RECONCILIATION))
418                {
419                    log::debug!(
420                        "Skipping closed reconciliation order {client_order_id}: \
421                         synthetic position adjustment from previous session",
422                    );
423                    orders_skipped_duplicate += 1;
424                    continue;
425                }
426
427                if let Some(mut order) = self.get_order(client_order_id) {
428                    let instrument = self.get_instrument(&report.instrument_id);
429                    log::info!(
430                        color = LogColor::Blue as u8;
431                        "Reconciling {} {} {} [{}] -> [{}]",
432                        client_order_id,
433                        report.venue_order_id,
434                        report.instrument_id,
435                        order.status(),
436                        report.order_status,
437                    );
438
439                    let order_fills: Vec<&FillReport> = fill_reports
440                        .get(&report.venue_order_id)
441                        .map(|f| f.iter().collect())
442                        .unwrap_or_default();
443                    let order_events = self.reconcile_order_with_fills(
444                        &mut order,
445                        report,
446                        &order_fills,
447                        instrument.as_ref(),
448                    );
449                    if !order_events.is_empty() {
450                        orders_reconciled += 1;
451                        fills_applied += order_events
452                            .iter()
453                            .filter(|e| matches!(e, OrderEventAny::Filled(_)))
454                            .count();
455                        events.extend(order_events);
456                    }
457
458                    // Always ensure venue_order_id is indexed after reconciliation
459                    if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
460                        client_order_id,
461                        &report.venue_order_id,
462                        false,
463                    ) {
464                        log::warn!("Failed to add venue order ID index: {e}");
465                    }
466                } else if let Some(mut order) =
467                    self.get_order_by_venue_order_id(&report.venue_order_id)
468                {
469                    // Fallback: match by venue_order_id
470                    let instrument = self.get_instrument(&report.instrument_id);
471
472                    log::info!(
473                        color = LogColor::Blue as u8;
474                        "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
475                        order.client_order_id(),
476                        report.venue_order_id,
477                        report.instrument_id,
478                        order.status(),
479                        report.order_status,
480                    );
481
482                    let order_fills: Vec<&FillReport> = fill_reports
483                        .get(&report.venue_order_id)
484                        .map(|f| f.iter().collect())
485                        .unwrap_or_default();
486                    let order_events = self.reconcile_order_with_fills(
487                        &mut order,
488                        report,
489                        &order_fills,
490                        instrument.as_ref(),
491                    );
492
493                    if !order_events.is_empty() {
494                        orders_reconciled += 1;
495                        fills_applied += order_events
496                            .iter()
497                            .filter(|e| matches!(e, OrderEventAny::Filled(_)))
498                            .count();
499                        events.extend(order_events);
500                    }
501
502                    if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
503                        &order.client_order_id(),
504                        &report.venue_order_id,
505                        false,
506                    ) {
507                        log::warn!("Failed to add venue order ID index: {e}");
508                    }
509                } else if !self.config.filter_unclaimed_external {
510                    if let Some(instrument) = self.get_instrument(&report.instrument_id) {
511                        let order_fills: Vec<&FillReport> = fill_reports
512                            .get(&report.venue_order_id)
513                            .map(|f| f.iter().collect())
514                            .unwrap_or_default();
515                        let (external_events, metadata) = self.handle_external_order(
516                            report,
517                            &mass_status.account_id,
518                            &instrument,
519                            &order_fills,
520                            false, // Not synthetic (venue order)
521                        );
522
523                        if !external_events.is_empty() {
524                            external_orders_created += 1;
525                            fills_applied += external_events
526                                .iter()
527                                .filter(|e| matches!(e, OrderEventAny::Filled(_)))
528                                .count();
529
530                            if report.order_status.is_open() {
531                                open_orders_initialized += 1;
532                            }
533
534                            events.extend(external_events);
535
536                            if let Some(m) = metadata {
537                                external_orders.push(m);
538                            }
539                        }
540                    } else {
541                        orders_skipped_no_instrument += 1;
542                    }
543                }
544            } else if let Some(mut order) = self.get_order_by_venue_order_id(&report.venue_order_id)
545            {
546                // Fallback: match by venue_order_id
547                let instrument = self.get_instrument(&report.instrument_id);
548                log::info!(
549                    color = LogColor::Blue as u8;
550                    "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
551                    order.client_order_id(),
552                    report.venue_order_id,
553                    report.instrument_id,
554                    order.status(),
555                    report.order_status,
556                );
557
558                let order_fills: Vec<&FillReport> = fill_reports
559                    .get(&report.venue_order_id)
560                    .map(|f| f.iter().collect())
561                    .unwrap_or_default();
562                let order_events = self.reconcile_order_with_fills(
563                    &mut order,
564                    report,
565                    &order_fills,
566                    instrument.as_ref(),
567                );
568
569                if !order_events.is_empty() {
570                    orders_reconciled += 1;
571                    fills_applied += order_events
572                        .iter()
573                        .filter(|e| matches!(e, OrderEventAny::Filled(_)))
574                        .count();
575                    events.extend(order_events);
576                }
577
578                if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
579                    &order.client_order_id(),
580                    &report.venue_order_id,
581                    false,
582                ) {
583                    log::warn!("Failed to add venue order ID index: {e}");
584                }
585            } else if let Some(instrument) = self.get_instrument(&report.instrument_id) {
586                // Synthetic orders (S- prefix) are generated by reconciliation logic
587                let is_synthetic = report.venue_order_id.as_str().starts_with("S-");
588
589                let order_fills: Vec<&FillReport> = fill_reports
590                    .get(&report.venue_order_id)
591                    .map(|f| f.iter().collect())
592                    .unwrap_or_default();
593                let (external_events, metadata) = self.handle_external_order(
594                    report,
595                    &mass_status.account_id,
596                    &instrument,
597                    &order_fills,
598                    is_synthetic,
599                );
600
601                if !external_events.is_empty() {
602                    external_orders_created += 1;
603                    fills_applied += external_events
604                        .iter()
605                        .filter(|e| matches!(e, OrderEventAny::Filled(_)))
606                        .count();
607
608                    if report.order_status.is_open() {
609                        open_orders_initialized += 1;
610                    }
611
612                    events.extend(external_events);
613
614                    if let Some(m) = metadata {
615                        external_orders.push(m);
616                    }
617                }
618            } else {
619                orders_skipped_no_instrument += 1;
620            }
621        }
622
623        // Process orphan fills (fills without matching order reports)
624        let processed_venue_order_ids: AHashSet<VenueOrderId> =
625            order_reports.keys().copied().collect();
626
627        for (venue_order_id, fills) in fill_reports {
628            if processed_venue_order_ids.contains(venue_order_id) {
629                continue;
630            }
631
632            let Some(first_fill) = fills.first() else {
633                continue;
634            };
635
636            if !self.should_reconcile_instrument(&first_fill.instrument_id) {
637                log::debug!(
638                    "Skipping orphan fills for {}: not in reconciliation_instrument_ids",
639                    first_fill.instrument_id
640                );
641                continue;
642            }
643
644            // Skip if fill's client_order_id is in filtered list
645            if let Some(client_order_id) = &first_fill.client_order_id
646                && self
647                    .config
648                    .filtered_client_order_ids
649                    .contains(client_order_id)
650            {
651                log::debug!(
652                    "Skipping orphan fills for {client_order_id}: in filtered_client_order_ids"
653                );
654                continue;
655            }
656
657            let order = first_fill
658                .client_order_id
659                .as_ref()
660                .and_then(|id| self.get_order(id))
661                .or_else(|| self.get_order_by_venue_order_id(venue_order_id));
662
663            // Skip if resolved order's client_order_id is filtered (venue_order_id lookup path)
664            if let Some(ref order) = order
665                && self
666                    .config
667                    .filtered_client_order_ids
668                    .contains(&order.client_order_id())
669            {
670                log::debug!(
671                    "Skipping orphan fills for {}: in filtered_client_order_ids",
672                    order.client_order_id()
673                );
674                continue;
675            }
676
677            if let Some(mut order) = order {
678                let instrument_id = order.instrument_id();
679                if let Some(instrument) = self.get_instrument(&instrument_id) {
680                    let mut sorted_fills: Vec<&FillReport> = fills.iter().collect();
681                    sorted_fills.sort_by_key(|f| f.ts_event);
682
683                    for fill in sorted_fills {
684                        if let Some(event) = self.create_order_fill(&mut order, fill, &instrument) {
685                            fills_applied += 1;
686                            events.push(event);
687                        }
688                    }
689                }
690            }
691        }
692
693        events.sort_by_key(|e| e.ts_event());
694
695        for event in &events {
696            exec_engine.borrow_mut().process(event.clone());
697        }
698
699        let mut positions_created = 0usize;
700        if !self.config.filter_position_reports {
701            // Collect instruments with fills that lack venue_position_id (can't attribute to
702            // specific hedge position, so must skip all hedge reports for that instrument)
703            let instruments_with_unattributed_fills: AHashSet<InstrumentId> = mass_status
704                .fill_reports()
705                .values()
706                .flatten()
707                .filter(|f| f.venue_position_id.is_none())
708                .map(|f| f.instrument_id)
709                .chain(
710                    mass_status
711                        .order_reports()
712                        .values()
713                        .filter(|r| !r.filled_qty.is_zero() && r.venue_position_id.is_none())
714                        .map(|r| r.instrument_id),
715                )
716                .collect();
717
718            let positions_with_fills: AHashSet<PositionId> = mass_status
719                .fill_reports()
720                .values()
721                .flatten()
722                .filter_map(|f| f.venue_position_id)
723                .chain(
724                    mass_status
725                        .order_reports()
726                        .values()
727                        .filter(|r| !r.filled_qty.is_zero())
728                        .filter_map(|r| r.venue_position_id),
729                )
730                .collect();
731
732            for (instrument_id, reports) in mass_status.position_reports() {
733                if !self.should_reconcile_instrument(&instrument_id) {
734                    log::debug!(
735                        "Skipping position reports for {instrument_id}: not in reconciliation_instrument_ids"
736                    );
737                    continue;
738                }
739
740                for report in reports {
741                    if let Some(position_events) = self.reconcile_position_report(
742                        &report,
743                        &mass_status.account_id,
744                        &instruments_with_unattributed_fills,
745                        &positions_with_fills,
746                    ) {
747                        for event in position_events {
748                            exec_engine.borrow_mut().process(event.clone());
749                            events.push(event);
750                        }
751                        positions_created += 1;
752                    }
753                }
754            }
755        }
756
757        if orders_skipped_no_instrument > 0 {
758            log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
759        }
760
761        if orders_skipped_duplicate > 0 {
762            log::debug!("{orders_skipped_duplicate} orders skipped (already in sync)");
763        }
764
765        if orders_skipped_filtered > 0 {
766            log::debug!("{orders_skipped_filtered} orders skipped (filtered by config)");
767        }
768
769        log::info!(
770            color = LogColor::Blue as u8;
771            "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}, positions={positions_created}, skipped={orders_skipped_duplicate}, filtered={orders_skipped_filtered}",
772        );
773
774        ReconciliationResult {
775            events,
776            external_orders,
777        }
778    }
779
780    /// Reconciles a single execution report during runtime.
781    ///
782    /// # Errors
783    ///
784    /// Returns an error if the average price cannot be converted to a valid `Decimal`.
785    pub fn reconcile_report(
786        &mut self,
787        report: ExecutionReport,
788    ) -> anyhow::Result<Vec<OrderEventAny>> {
789        let mut events = Vec::new();
790
791        self.clear_recon_tracking(&report.client_order_id, true);
792
793        if let Some(order) = self.get_order(&report.client_order_id) {
794            let Some(account_id) = order.account_id() else {
795                log::error!("Cannot process fill report: order has no account_id");
796                return Ok(vec![]);
797            };
798            let Some(venue_order_id) = report.venue_order_id else {
799                log::error!("Cannot process fill report: report has no venue_order_id");
800                return Ok(vec![]);
801            };
802            let mut order_report = OrderStatusReport::new(
803                account_id,
804                order.instrument_id(),
805                Some(report.client_order_id),
806                venue_order_id,
807                order.order_side(),
808                order.order_type(),
809                order.time_in_force(),
810                report.status,
811                order.quantity(),
812                report.filled_qty,
813                report.ts_event, // Use ts_event as ts_accepted
814                report.ts_event, // Use ts_event as ts_last
815                self.clock.borrow().timestamp_ns(),
816                Some(UUID4::new()),
817            );
818
819            if let Some(avg_px) = report.avg_px {
820                order_report = order_report.with_avg_px(avg_px)?;
821            }
822
823            let instrument = self.get_instrument(&order.instrument_id());
824            if let Some(event) =
825                self.reconcile_order_report(&order, &order_report, instrument.as_ref())
826            {
827                events.push(event);
828            }
829        }
830
831        Ok(events)
832    }
833
834    /// Checks inflight orders and returns events for any that need reconciliation.
835    pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
836        let mut events = Vec::new();
837        let current_time = self.clock.borrow().timestamp_ns();
838        let threshold_ns = self.config.inflight_threshold_ms * NANOSECONDS_IN_MILLISECOND;
839
840        let mut to_check = Vec::new();
841
842        for (client_order_id, check) in &self.inflight_checks {
843            if current_time - check.ts_submitted > threshold_ns {
844                to_check.push(*client_order_id);
845            }
846        }
847
848        for client_order_id in to_check {
849            if self
850                .config
851                .filtered_client_order_ids
852                .contains(&client_order_id)
853            {
854                continue;
855            }
856
857            if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
858                if let Some(last_query_ts) = check.last_query_ts
859                    && current_time - last_query_ts < threshold_ns
860                {
861                    continue;
862                }
863
864                check.retry_count += 1;
865                check.last_query_ts = Some(current_time);
866                self.ts_last_query.insert(client_order_id, current_time);
867                self.recon_check_retries
868                    .insert(client_order_id, check.retry_count);
869
870                if check.retry_count >= self.config.inflight_max_retries {
871                    // Generate rejection after max retries
872                    let ts_now = self.clock.borrow().timestamp_ns();
873                    if let Some(order) = self.get_order(&client_order_id)
874                        && let Some(event) =
875                            create_reconciliation_rejected(&order, Some("INFLIGHT_TIMEOUT"), ts_now)
876                    {
877                        events.push(event);
878                    }
879                    // Remove from inflight checks regardless of whether order exists
880                    self.clear_recon_tracking(&client_order_id, true);
881                }
882            }
883        }
884
885        events
886    }
887
888    /// Checks open orders consistency between cache and venue.
889    ///
890    /// This method validates that open orders in the cache match the venue's state,
891    /// comparing order status and filled quantities, and generating reconciliation
892    /// events for any discrepancies detected.
893    ///
894    /// # Returns
895    ///
896    /// A vector of order events generated to reconcile discrepancies.
897    pub async fn check_open_orders(
898        &mut self,
899        clients: &[Rc<dyn ExecutionClient>],
900    ) -> Vec<OrderEventAny> {
901        log::debug!("Checking order consistency between cached-state and venues");
902
903        let filtered_orders: Vec<OrderAny> = {
904            let cache = self.cache.borrow();
905            let open_orders = cache.orders_open(None, None, None, None, None);
906
907            if self.config.reconciliation_instrument_ids.is_empty() {
908                open_orders.iter().map(|o| (*o).clone()).collect()
909            } else {
910                open_orders
911                    .iter()
912                    .filter(|o| {
913                        self.config
914                            .reconciliation_instrument_ids
915                            .contains(&o.instrument_id())
916                    })
917                    .map(|o| (*o).clone())
918                    .collect()
919            }
920        };
921
922        log::debug!(
923            "Found {} order{} open in cache",
924            filtered_orders.len(),
925            if filtered_orders.len() == 1 { "" } else { "s" }
926        );
927
928        let mut all_reports = Vec::new();
929        let mut venue_reported_ids = AHashSet::new();
930
931        for client in clients {
932            let mut cmd = GenerateOrderStatusReports::new(
933                UUID4::new(),
934                self.clock.borrow().timestamp_ns(),
935                true, // open_only
936                None, // instrument_id - query all
937                None, // start
938                None, // end
939                None, // params
940                None, // correlation_id
941            );
942            cmd.log_receipt_level = LogLevel::Debug;
943
944            match client.generate_order_status_reports(&cmd).await {
945                Ok(reports) => {
946                    for report in reports {
947                        if let Some(client_order_id) = &report.client_order_id {
948                            venue_reported_ids.insert(*client_order_id);
949                        }
950                        all_reports.push(report);
951                    }
952                }
953                Err(e) => {
954                    log::error!(
955                        "Failed to query order reports from {}: {e}",
956                        client.client_id()
957                    );
958                }
959            }
960        }
961
962        // Reconcile reports against cached orders
963        let ts_now = self.clock.borrow().timestamp_ns();
964        let mut events = Vec::new();
965
966        for report in all_reports {
967            if let Some(client_order_id) = &report.client_order_id
968                && let Some(order) = self.get_order(client_order_id)
969            {
970                // Check for recent local activity to avoid race conditions with in-flight fills
971                if let Some(&last_activity) = self.order_local_activity_ns.get(client_order_id)
972                    && (ts_now - last_activity) < self.config.open_check_threshold_ns
973                {
974                    let elapsed_ms = nanos_to_millis((ts_now - last_activity).as_u64());
975                    let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
976                    log::info!(
977                        "Deferring reconciliation for {client_order_id}: recent local activity ({elapsed_ms}ms < threshold={threshold_ms}ms)",
978                    );
979                    continue;
980                }
981
982                let instrument = self.get_instrument(&report.instrument_id);
983
984                if let Some(event) =
985                    self.reconcile_order_report(&order, &report, instrument.as_ref())
986                {
987                    events.push(event);
988                }
989            }
990        }
991
992        // Handle orders missing at venue
993        if !self.config.open_check_open_only {
994            let cached_ids: AHashSet<ClientOrderId> = filtered_orders
995                .iter()
996                .map(|o| o.client_order_id())
997                .collect();
998            let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
999                .difference(&venue_reported_ids)
1000                .copied()
1001                .collect();
1002
1003            for client_order_id in missing_at_venue {
1004                events.extend(self.handle_missing_order(client_order_id));
1005            }
1006        }
1007
1008        events
1009    }
1010
1011    /// Checks position consistency between cache and venue.
1012    ///
1013    /// This method validates that positions in the cache match the venue's state,
1014    /// detecting position drift and querying for missing fills when discrepancies
1015    /// are found.
1016    ///
1017    /// # Returns
1018    ///
1019    /// A vector of fill events generated to reconcile position discrepancies.
1020    pub async fn check_positions_consistency(
1021        &mut self,
1022        clients: &[Rc<dyn ExecutionClient>],
1023    ) -> Vec<OrderEventAny> {
1024        log::debug!("Checking position consistency between cached-state and venues");
1025
1026        let open_positions = {
1027            let cache = self.cache.borrow();
1028            let positions = cache.positions_open(None, None, None, None, None);
1029
1030            if self.config.reconciliation_instrument_ids.is_empty() {
1031                positions.iter().map(|p| (*p).clone()).collect()
1032            } else {
1033                positions
1034                    .iter()
1035                    .filter(|p| {
1036                        self.config
1037                            .reconciliation_instrument_ids
1038                            .contains(&p.instrument_id)
1039                    })
1040                    .map(|p| (*p).clone())
1041                    .collect::<Vec<_>>()
1042            }
1043        };
1044
1045        log::debug!(
1046            "Found {} position{} to check",
1047            open_positions.len(),
1048            if open_positions.len() == 1 { "" } else { "s" }
1049        );
1050
1051        // Query venue for position reports
1052        let mut venue_positions = AHashMap::new();
1053
1054        for client in clients {
1055            let mut cmd = GeneratePositionStatusReports::new(
1056                UUID4::new(),
1057                self.clock.borrow().timestamp_ns(),
1058                None, // instrument_id - query all
1059                None, // start
1060                None, // end
1061                None, // params
1062                None, // correlation_id
1063            );
1064            cmd.log_receipt_level = LogLevel::Debug;
1065
1066            match client.generate_position_status_reports(&cmd).await {
1067                Ok(reports) => {
1068                    for report in reports {
1069                        venue_positions.insert(report.instrument_id, report);
1070                    }
1071                }
1072                Err(e) => {
1073                    log::error!(
1074                        "Failed to query position reports from {}: {e}",
1075                        client.client_id()
1076                    );
1077                }
1078            }
1079        }
1080
1081        // Check for discrepancies
1082        let mut events = Vec::new();
1083
1084        for position in &open_positions {
1085            // Skip if not in filter
1086            if !self.config.reconciliation_instrument_ids.is_empty()
1087                && !self
1088                    .config
1089                    .reconciliation_instrument_ids
1090                    .contains(&position.instrument_id)
1091            {
1092                continue;
1093            }
1094
1095            let venue_report = venue_positions.get(&position.instrument_id);
1096
1097            if let Some(discrepancy_events) =
1098                self.check_position_discrepancy(position, venue_report)
1099            {
1100                events.extend(discrepancy_events);
1101            }
1102        }
1103
1104        events
1105    }
1106
1107    /// Registers an order as inflight for tracking.
1108    pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
1109        let ts_submitted = self.clock.borrow().timestamp_ns();
1110        self.inflight_checks.insert(
1111            client_order_id,
1112            InflightCheck {
1113                client_order_id,
1114                ts_submitted,
1115                retry_count: 0,
1116                last_query_ts: None,
1117            },
1118        );
1119        self.recon_check_retries.insert(client_order_id, 0);
1120        self.ts_last_query.remove(&client_order_id);
1121        self.order_local_activity_ns.remove(&client_order_id);
1122    }
1123
1124    /// Records local activity for the specified order.
1125    ///
1126    /// Uses the current clock time (receipt time) instead of venue time to accurately
1127    /// track when we last processed activity for this order. This avoids race conditions
1128    /// where network/queue latency makes events appear "old" even though they just arrived.
1129    pub fn record_local_activity(&mut self, client_order_id: ClientOrderId) {
1130        let ts_now = self.clock.borrow().timestamp_ns();
1131        self.order_local_activity_ns.insert(client_order_id, ts_now);
1132    }
1133
1134    /// Clears reconciliation tracking state for an order.
1135    pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
1136        self.inflight_checks.remove(client_order_id);
1137        self.recon_check_retries.remove(client_order_id);
1138        if drop_last_query {
1139            self.ts_last_query.remove(client_order_id);
1140        }
1141        self.order_local_activity_ns.remove(client_order_id);
1142    }
1143
1144    /// Claims external orders for a specific strategy and instrument.
1145    pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
1146        self.external_order_claims
1147            .insert(instrument_id, strategy_id);
1148    }
1149
1150    /// Records position activity for reconciliation tracking.
1151    pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
1152        self.position_local_activity_ns
1153            .insert(instrument_id, ts_event);
1154    }
1155
1156    /// Checks if a fill has been recently processed (for deduplication).
1157    pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
1158        self.recent_fills_cache.contains_key(trade_id)
1159    }
1160
1161    /// Marks a fill as recently processed with current timestamp.
1162    pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
1163        let ts_now = self.clock.borrow().timestamp_ns();
1164        self.recent_fills_cache.insert(trade_id, ts_now);
1165    }
1166
1167    /// Prunes expired fills from the recent fills cache.
1168    ///
1169    /// Default TTL is 60 seconds.
1170    pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
1171        let ts_now = self.clock.borrow().timestamp_ns();
1172        let ttl_ns = (ttl_secs * NANOSECONDS_IN_SECOND as f64) as u64;
1173
1174        self.recent_fills_cache
1175            .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
1176    }
1177
1178    /// Purges closed orders from the cache that are older than the configured buffer.
1179    pub fn purge_closed_orders(&mut self) {
1180        let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
1181            return;
1182        };
1183
1184        let ts_now = self.clock.borrow().timestamp_ns();
1185        let buffer_secs = (buffer_mins as u64) * 60;
1186
1187        self.cache
1188            .borrow_mut()
1189            .purge_closed_orders(ts_now, buffer_secs);
1190    }
1191
1192    /// Purges closed positions from the cache that are older than the configured buffer.
1193    pub fn purge_closed_positions(&mut self) {
1194        let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
1195            return;
1196        };
1197
1198        let ts_now = self.clock.borrow().timestamp_ns();
1199        let buffer_secs = (buffer_mins as u64) * 60;
1200
1201        self.cache
1202            .borrow_mut()
1203            .purge_closed_positions(ts_now, buffer_secs);
1204    }
1205
1206    /// Purges old account events from the cache based on the configured lookback.
1207    pub fn purge_account_events(&mut self) {
1208        let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
1209            return;
1210        };
1211
1212        let ts_now = self.clock.borrow().timestamp_ns();
1213        let lookback_secs = (lookback_mins as u64) * 60;
1214
1215        self.cache
1216            .borrow_mut()
1217            .purge_account_events(ts_now, lookback_secs);
1218    }
1219
1220    // Private helper methods
1221
1222    fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
1223        self.cache.borrow().order(client_order_id).cloned()
1224    }
1225
1226    fn get_order_by_venue_order_id(&self, venue_order_id: &VenueOrderId) -> Option<OrderAny> {
1227        let cache = self.cache.borrow();
1228        cache
1229            .client_order_id(venue_order_id)
1230            .and_then(|client_order_id| cache.order(client_order_id).cloned())
1231    }
1232
1233    fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
1234        self.cache.borrow().instrument(instrument_id).cloned()
1235    }
1236
1237    fn should_skip_order_report(&self, report: &OrderStatusReport) -> bool {
1238        if let Some(client_order_id) = &report.client_order_id
1239            && self
1240                .config
1241                .filtered_client_order_ids
1242                .contains(client_order_id)
1243        {
1244            log::debug!(
1245                "Skipping order report {client_order_id}: in filtered_client_order_ids list"
1246            );
1247            return true;
1248        }
1249
1250        if !self.should_reconcile_instrument(&report.instrument_id) {
1251            log::debug!(
1252                "Skipping order report for {}: not in reconciliation_instrument_ids",
1253                report.instrument_id
1254            );
1255            return true;
1256        }
1257
1258        false
1259    }
1260
1261    fn should_reconcile_instrument(&self, instrument_id: &InstrumentId) -> bool {
1262        self.config.reconciliation_instrument_ids.is_empty()
1263            || self
1264                .config
1265                .reconciliation_instrument_ids
1266                .contains(instrument_id)
1267    }
1268
1269    fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
1270        let mut events = Vec::new();
1271
1272        let Some(order) = self.get_order(&client_order_id) else {
1273            return events;
1274        };
1275
1276        let ts_now = self.clock.borrow().timestamp_ns();
1277        let ts_last = order.ts_last();
1278
1279        // Check if order is too recent
1280        if (ts_now - ts_last) < self.config.open_check_threshold_ns {
1281            return events;
1282        }
1283
1284        // Check local activity threshold
1285        if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
1286            && (ts_now - last_activity) < self.config.open_check_threshold_ns
1287        {
1288            return events;
1289        }
1290
1291        // Increment retry counter
1292        let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
1293        *retries += 1;
1294
1295        // If max retries exceeded, generate rejection event
1296        if *retries >= self.config.open_check_missing_retries {
1297            log::warn!(
1298                "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
1299            );
1300
1301            let ts_now = self.clock.borrow().timestamp_ns();
1302            if let Some(rejected) =
1303                create_reconciliation_rejected(&order, Some("NOT_FOUND_AT_VENUE"), ts_now)
1304            {
1305                events.push(rejected);
1306            }
1307
1308            self.clear_recon_tracking(&client_order_id, true);
1309        } else {
1310            log::debug!(
1311                "Order {} not found at venue, retry {}/{}",
1312                client_order_id,
1313                retries,
1314                self.config.open_check_missing_retries
1315            );
1316        }
1317
1318        events
1319    }
1320
1321    fn check_position_discrepancy(
1322        &mut self,
1323        position: &Position,
1324        venue_report: Option<&PositionStatusReport>,
1325    ) -> Option<Vec<OrderEventAny>> {
1326        // Use signed quantities to detect both magnitude and side discrepancies
1327        let cached_signed_qty = position.signed_decimal_qty();
1328        let venue_signed_qty = venue_report.map_or(Decimal::ZERO, |r| r.signed_decimal_qty);
1329
1330        let tolerance = Decimal::from_str("0.00000001").unwrap();
1331        if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
1332            return None; // No discrepancy
1333        }
1334
1335        // Check activity threshold
1336        let ts_now = self.clock.borrow().timestamp_ns();
1337        if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
1338            && (ts_now - last_activity) < self.config.position_check_threshold_ns
1339        {
1340            log::debug!(
1341                "Skipping position reconciliation for {}: recent activity within threshold",
1342                position.instrument_id
1343            );
1344            return None;
1345        }
1346
1347        log::warn!(
1348            "Position discrepancy detected for {}: cached_signed_qty={}, venue_signed_qty={}",
1349            position.instrument_id,
1350            cached_signed_qty,
1351            venue_signed_qty
1352        );
1353
1354        let instrument = self
1355            .cache
1356            .borrow()
1357            .instrument(&position.instrument_id)?
1358            .clone();
1359
1360        let account_id = position.account_id;
1361        let instrument_id = position.instrument_id;
1362
1363        let cached_avg_px = if position.avg_px_open > 0.0 {
1364            Decimal::from_str(&position.avg_px_open.to_string()).ok()
1365        } else {
1366            None
1367        };
1368        let venue_avg_px = venue_report.and_then(|r| r.avg_px_open);
1369
1370        // Check if position crosses zero (flips side)
1371        let crosses_zero = (cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1372            || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO);
1373
1374        if crosses_zero {
1375            // Split into two fills: close existing position, then open new position
1376            return self.reconcile_cross_zero_position(
1377                &instrument,
1378                account_id,
1379                instrument_id,
1380                cached_signed_qty,
1381                cached_avg_px,
1382                venue_signed_qty,
1383                venue_avg_px,
1384                ts_now,
1385            );
1386        }
1387
1388        let qty_diff = venue_signed_qty - cached_signed_qty;
1389        let order_side = if qty_diff > Decimal::ZERO {
1390            OrderSide::Buy
1391        } else {
1392            OrderSide::Sell
1393        };
1394
1395        let reconciliation_px = calculate_reconciliation_price(
1396            cached_signed_qty,
1397            cached_avg_px,
1398            venue_signed_qty,
1399            venue_avg_px,
1400        );
1401
1402        let fill_px = reconciliation_px.or(venue_avg_px).or(cached_avg_px)?;
1403        let fill_qty = qty_diff.abs();
1404
1405        let ts_event = ts_now.as_u64();
1406        let venue_order_id = create_synthetic_venue_order_id(ts_event);
1407        let order_qty = Quantity::from_decimal_dp(fill_qty, instrument.size_precision()).ok()?;
1408
1409        let order_report = OrderStatusReport::new(
1410            account_id,
1411            instrument_id,
1412            None,
1413            venue_order_id,
1414            order_side,
1415            OrderType::Market,
1416            TimeInForce::Gtc,
1417            OrderStatus::Filled,
1418            order_qty,
1419            order_qty,
1420            ts_now,
1421            ts_now,
1422            ts_now,
1423            None,
1424        )
1425        .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1426        .ok()?;
1427
1428        log::info!(
1429            color = LogColor::Blue as u8;
1430            "Generating synthetic fill for position reconciliation {instrument_id}: side={order_side:?}, qty={fill_qty}, px={fill_px}",
1431        );
1432
1433        let (events, _) =
1434            self.handle_external_order(&order_report, &account_id, &instrument, &[], true);
1435        Some(events)
1436    }
1437
1438    /// Handles position reconciliation when position flips sign, splitting into two
1439    /// fills: close existing position then open new position in opposite direction.
1440    #[allow(clippy::too_many_arguments)]
1441    fn reconcile_cross_zero_position(
1442        &mut self,
1443        instrument: &InstrumentAny,
1444        account_id: AccountId,
1445        instrument_id: InstrumentId,
1446        cached_signed_qty: Decimal,
1447        cached_avg_px: Option<Decimal>,
1448        venue_signed_qty: Decimal,
1449        venue_avg_px: Option<Decimal>,
1450        ts_now: UnixNanos,
1451    ) -> Option<Vec<OrderEventAny>> {
1452        log::info!(
1453            color = LogColor::Blue as u8;
1454            "Position crosses zero for {instrument_id}: cached={cached_signed_qty}, venue={venue_signed_qty}. Splitting into two fills",
1455        );
1456
1457        let mut all_events = Vec::new();
1458
1459        // Close existing position first
1460        let close_qty = cached_signed_qty.abs();
1461        let close_side = if cached_signed_qty < Decimal::ZERO {
1462            OrderSide::Buy // Close short by buying
1463        } else {
1464            OrderSide::Sell // Close long by selling
1465        };
1466
1467        if let Some(close_px) = cached_avg_px {
1468            let close_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1469            let close_order_qty =
1470                Quantity::from_decimal_dp(close_qty, instrument.size_precision()).ok()?;
1471
1472            let close_report = OrderStatusReport::new(
1473                account_id,
1474                instrument_id,
1475                None,
1476                close_venue_order_id,
1477                close_side,
1478                OrderType::Market,
1479                TimeInForce::Gtc,
1480                OrderStatus::Filled,
1481                close_order_qty,
1482                close_order_qty,
1483                ts_now,
1484                ts_now,
1485                ts_now,
1486                None,
1487            )
1488            .with_avg_px(close_px.to_f64().unwrap_or(0.0))
1489            .ok()?;
1490
1491            log::info!(
1492                color = LogColor::Blue as u8;
1493                "Generating close fill for cross-zero {instrument_id}: side={close_side:?}, qty={close_qty}, px={close_px}",
1494            );
1495
1496            let (close_events, _) =
1497                self.handle_external_order(&close_report, &account_id, instrument, &[], true);
1498            all_events.extend(close_events);
1499        } else {
1500            log::warn!("Cannot close position for {instrument_id}: no cached average price");
1501            return None;
1502        }
1503
1504        // Then open new position in opposite direction
1505        let open_qty = venue_signed_qty.abs();
1506        let open_side = if venue_signed_qty > Decimal::ZERO {
1507            OrderSide::Buy // Open long
1508        } else {
1509            OrderSide::Sell // Open short
1510        };
1511
1512        if let Some(open_px) = venue_avg_px {
1513            let open_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64() + 1);
1514            let open_order_qty =
1515                Quantity::from_decimal_dp(open_qty, instrument.size_precision()).ok()?;
1516
1517            let open_report = OrderStatusReport::new(
1518                account_id,
1519                instrument_id,
1520                None,
1521                open_venue_order_id,
1522                open_side,
1523                OrderType::Market,
1524                TimeInForce::Gtc,
1525                OrderStatus::Filled,
1526                open_order_qty,
1527                open_order_qty,
1528                ts_now,
1529                ts_now,
1530                ts_now,
1531                None,
1532            )
1533            .with_avg_px(open_px.to_f64().unwrap_or(0.0))
1534            .ok()?;
1535
1536            log::info!(
1537                color = LogColor::Blue as u8;
1538                "Generating open fill for cross-zero {instrument_id}: side={open_side:?}, qty={open_qty}, px={open_px}",
1539            );
1540
1541            let (open_events, _) =
1542                self.handle_external_order(&open_report, &account_id, instrument, &[], true);
1543            all_events.extend(open_events);
1544        } else {
1545            log::warn!("Cannot open new position for {instrument_id}: no venue average price");
1546            return Some(all_events);
1547        }
1548
1549        Some(all_events)
1550    }
1551
1552    /// Creates a position from a venue position report when no orders/fills exist.
1553    ///
1554    /// This handles the case where the venue reports an open position but there are
1555    /// no order or fill reports to create it from (e.g., orders are already closed).
1556    fn create_position_from_report(
1557        &mut self,
1558        report: &PositionStatusReport,
1559        account_id: &AccountId,
1560        instrument: &InstrumentAny,
1561    ) -> Option<Vec<OrderEventAny>> {
1562        let instrument_id = report.instrument_id;
1563        let venue_signed_qty = report.signed_decimal_qty;
1564
1565        if venue_signed_qty == Decimal::ZERO {
1566            return None;
1567        }
1568
1569        let order_side = if venue_signed_qty > Decimal::ZERO {
1570            OrderSide::Buy
1571        } else {
1572            OrderSide::Sell
1573        };
1574
1575        let qty_abs = venue_signed_qty.abs();
1576        let venue_avg_px = report.avg_px_open?;
1577
1578        let ts_now = self.clock.borrow().timestamp_ns();
1579        let venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1580        let order_qty = Quantity::from_decimal_dp(qty_abs, instrument.size_precision()).ok()?;
1581
1582        let mut order_report = OrderStatusReport::new(
1583            *account_id,
1584            instrument_id,
1585            None,
1586            venue_order_id,
1587            order_side,
1588            OrderType::Market,
1589            TimeInForce::Gtc,
1590            OrderStatus::Filled,
1591            order_qty,
1592            order_qty,
1593            ts_now,
1594            ts_now,
1595            ts_now,
1596            None,
1597        )
1598        .with_avg_px(venue_avg_px.to_f64().unwrap_or(0.0))
1599        .ok()?;
1600
1601        // Preserve venue_position_id for hedging mode
1602        if let Some(venue_position_id) = report.venue_position_id {
1603            order_report = order_report.with_venue_position_id(venue_position_id);
1604        }
1605
1606        log::info!(
1607            color = LogColor::Blue as u8;
1608            "Creating position from venue report for {instrument_id}: side={order_side:?}, qty={qty_abs}, avg_px={venue_avg_px}",
1609        );
1610
1611        let (events, _) =
1612            self.handle_external_order(&order_report, account_id, instrument, &[], true);
1613        Some(events)
1614    }
1615
1616    fn reconcile_position_report(
1617        &mut self,
1618        report: &PositionStatusReport,
1619        account_id: &AccountId,
1620        instruments_with_unattributed_fills: &AHashSet<InstrumentId>,
1621        positions_with_fills: &AHashSet<PositionId>,
1622    ) -> Option<Vec<OrderEventAny>> {
1623        if report.venue_position_id.is_some() {
1624            self.reconcile_position_report_hedging(
1625                report,
1626                account_id,
1627                instruments_with_unattributed_fills,
1628                positions_with_fills,
1629            )
1630        } else {
1631            self.reconcile_position_report_netting(report, account_id)
1632        }
1633    }
1634
1635    fn reconcile_position_report_hedging(
1636        &mut self,
1637        report: &PositionStatusReport,
1638        account_id: &AccountId,
1639        instruments_with_unattributed_fills: &AHashSet<InstrumentId>,
1640        positions_with_fills: &AHashSet<PositionId>,
1641    ) -> Option<Vec<OrderEventAny>> {
1642        let venue_position_id = report.venue_position_id?;
1643
1644        // Skip if batch already has fills for this position (will be created from fills)
1645        if positions_with_fills.contains(&venue_position_id) {
1646            log::debug!(
1647                "Skipping hedge position {venue_position_id} reconciliation: fills already in batch"
1648            );
1649            return None;
1650        }
1651
1652        // Skip if fills exist for this instrument but lack venue_position_id
1653        // (can't determine which hedge position they belong to)
1654        if instruments_with_unattributed_fills.contains(&report.instrument_id) {
1655            log::debug!(
1656                "Skipping hedge position {venue_position_id} reconciliation: unattributed fills in batch"
1657            );
1658            return None;
1659        }
1660
1661        log::info!(
1662            color = LogColor::Blue as u8;
1663            "Reconciling HEDGE position for {}, venue_position_id={}",
1664            report.instrument_id,
1665            venue_position_id
1666        );
1667
1668        let position = {
1669            let cache = self.cache.borrow();
1670            cache.position(&venue_position_id).cloned()
1671        };
1672
1673        match position {
1674            Some(position) => {
1675                let cached_signed_qty = position.signed_decimal_qty();
1676                let venue_signed_qty = report.signed_decimal_qty;
1677
1678                if cached_signed_qty == venue_signed_qty {
1679                    log::debug!(
1680                        "Hedge position {venue_position_id} matches venue: qty={cached_signed_qty}"
1681                    );
1682                    return None;
1683                }
1684
1685                if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
1686                    return None;
1687                }
1688
1689                if !self.config.generate_missing_orders {
1690                    log::error!(
1691                        "Cannot reconcile {} {}: position net qty {} != reported net qty {} \
1692                         and `generate_missing_orders` is disabled",
1693                        report.instrument_id,
1694                        venue_position_id,
1695                        cached_signed_qty,
1696                        venue_signed_qty
1697                    );
1698                    return None;
1699                }
1700
1701                self.reconcile_hedge_position_discrepancy(
1702                    report,
1703                    account_id,
1704                    &position,
1705                    cached_signed_qty,
1706                )
1707            }
1708            None => {
1709                if report.signed_decimal_qty == Decimal::ZERO {
1710                    return None;
1711                }
1712
1713                if !self.config.generate_missing_orders {
1714                    log::error!(
1715                        "Cannot reconcile position: {venue_position_id} not found and `generate_missing_orders` is disabled"
1716                    );
1717                    return None;
1718                }
1719
1720                self.reconcile_missing_hedge_position(report, account_id)
1721            }
1722        }
1723    }
1724
1725    fn reconcile_hedge_position_discrepancy(
1726        &mut self,
1727        report: &PositionStatusReport,
1728        account_id: &AccountId,
1729        position: &Position,
1730        cached_signed_qty: Decimal,
1731    ) -> Option<Vec<OrderEventAny>> {
1732        let instrument = self.get_instrument(&report.instrument_id)?;
1733        let venue_signed_qty = report.signed_decimal_qty;
1734
1735        let diff = (cached_signed_qty - venue_signed_qty).abs();
1736        let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
1737
1738        if diff_qty.is_zero() {
1739            log::debug!(
1740                "Difference quantity rounds to zero for {}, skipping",
1741                instrument.id()
1742            );
1743            return None;
1744        }
1745
1746        let venue_position_id = report.venue_position_id?;
1747        log::warn!(
1748            "Hedge position discrepancy for {} {}: cached={}, venue={}, generating reconciliation order",
1749            report.instrument_id,
1750            venue_position_id,
1751            cached_signed_qty,
1752            venue_signed_qty
1753        );
1754
1755        let current_avg_px = if position.avg_px_open > 0.0 {
1756            Decimal::from_str(&position.avg_px_open.to_string()).ok()
1757        } else {
1758            None
1759        };
1760
1761        self.create_position_reconciliation_order(
1762            report,
1763            account_id,
1764            &instrument,
1765            cached_signed_qty,
1766            diff_qty,
1767            current_avg_px,
1768        )
1769    }
1770
1771    fn reconcile_missing_hedge_position(
1772        &mut self,
1773        report: &PositionStatusReport,
1774        account_id: &AccountId,
1775    ) -> Option<Vec<OrderEventAny>> {
1776        let instrument = self.get_instrument(&report.instrument_id)?;
1777        let venue_signed_qty = report.signed_decimal_qty;
1778
1779        let qty = venue_signed_qty.abs();
1780        let diff_qty = Quantity::from_decimal_dp(qty, instrument.size_precision()).ok()?;
1781
1782        if diff_qty.is_zero() {
1783            return None;
1784        }
1785
1786        let venue_position_id = report.venue_position_id?;
1787        log::warn!(
1788            "Missing hedge position for {} {}: venue reports {}, generating reconciliation order",
1789            report.instrument_id,
1790            venue_position_id,
1791            venue_signed_qty
1792        );
1793
1794        self.create_position_reconciliation_order(
1795            report,
1796            account_id,
1797            &instrument,
1798            Decimal::ZERO,
1799            diff_qty,
1800            None,
1801        )
1802    }
1803
1804    fn reconcile_position_report_netting(
1805        &mut self,
1806        report: &PositionStatusReport,
1807        account_id: &AccountId,
1808    ) -> Option<Vec<OrderEventAny>> {
1809        let instrument_id = report.instrument_id;
1810
1811        log::info!(
1812            color = LogColor::Blue as u8;
1813            "Reconciling NET position for {instrument_id}",
1814        );
1815
1816        let instrument = self.get_instrument(&instrument_id)?;
1817
1818        let (cached_signed_qty, cached_avg_px) = {
1819            let cache = self.cache.borrow();
1820            let positions = cache.positions_open(None, Some(&instrument_id), None, None, None);
1821
1822            if positions.is_empty() {
1823                (Decimal::ZERO, None)
1824            } else {
1825                let mut total_signed_qty = Decimal::ZERO;
1826                let mut total_value = Decimal::ZERO;
1827                let mut total_qty = Decimal::ZERO;
1828
1829                for pos in positions {
1830                    total_signed_qty += pos.signed_decimal_qty();
1831                    let qty = pos.signed_decimal_qty().abs();
1832                    if pos.avg_px_open > 0.0
1833                        && qty > Decimal::ZERO
1834                        && let Ok(avg_px) = Decimal::from_str(&pos.avg_px_open.to_string())
1835                    {
1836                        total_value += avg_px * qty;
1837                        total_qty += qty;
1838                    }
1839                }
1840
1841                let avg_px = if total_qty > Decimal::ZERO {
1842                    Some(total_value / total_qty)
1843                } else {
1844                    None
1845                };
1846
1847                (total_signed_qty, avg_px)
1848            }
1849        };
1850
1851        let venue_signed_qty = report.signed_decimal_qty;
1852
1853        log::info!(
1854            color = LogColor::Blue as u8;
1855            "venue_signed_qty={venue_signed_qty}, cached_signed_qty={cached_signed_qty}",
1856        );
1857
1858        let tolerance = Decimal::from_str("0.00000001").unwrap_or(Decimal::ZERO);
1859        if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
1860            log::debug!("Position quantities match for {instrument_id}, no reconciliation needed");
1861            return None;
1862        }
1863
1864        if !self.config.generate_missing_orders {
1865            log::warn!(
1866                "Discrepancy for {instrument_id} position when `generate_missing_orders` disabled, skipping"
1867            );
1868            return None;
1869        }
1870
1871        let diff = (cached_signed_qty - venue_signed_qty).abs();
1872        let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
1873
1874        if diff_qty.is_zero() {
1875            log::debug!(
1876                "Difference quantity rounds to zero for {instrument_id}, skipping order generation"
1877            );
1878            return None;
1879        }
1880
1881        let crosses_zero = cached_signed_qty != Decimal::ZERO
1882            && venue_signed_qty != Decimal::ZERO
1883            && ((cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1884                || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO));
1885
1886        if crosses_zero {
1887            let ts_now = self.clock.borrow().timestamp_ns();
1888            return self.reconcile_cross_zero_position(
1889                &instrument,
1890                *account_id,
1891                instrument_id,
1892                cached_signed_qty,
1893                cached_avg_px,
1894                venue_signed_qty,
1895                report.avg_px_open,
1896                ts_now,
1897            );
1898        }
1899
1900        if cached_signed_qty == Decimal::ZERO {
1901            return self.create_position_from_report(report, account_id, &instrument);
1902        }
1903
1904        self.create_position_reconciliation_order(
1905            report,
1906            account_id,
1907            &instrument,
1908            cached_signed_qty,
1909            diff_qty,
1910            cached_avg_px,
1911        )
1912    }
1913
1914    fn create_position_reconciliation_order(
1915        &mut self,
1916        report: &PositionStatusReport,
1917        account_id: &AccountId,
1918        instrument: &InstrumentAny,
1919        cached_signed_qty: Decimal,
1920        diff_qty: Quantity,
1921        current_avg_px: Option<Decimal>,
1922    ) -> Option<Vec<OrderEventAny>> {
1923        let venue_signed_qty = report.signed_decimal_qty;
1924        let instrument_id = report.instrument_id;
1925
1926        let order_side = if venue_signed_qty > cached_signed_qty {
1927            OrderSide::Buy
1928        } else {
1929            OrderSide::Sell
1930        };
1931
1932        let reconciliation_px = calculate_reconciliation_price(
1933            cached_signed_qty,
1934            current_avg_px,
1935            venue_signed_qty,
1936            report.avg_px_open,
1937        );
1938
1939        let fill_px = reconciliation_px
1940            .or(report.avg_px_open)
1941            .or(current_avg_px)?;
1942
1943        let ts_now = self.clock.borrow().timestamp_ns();
1944        let venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1945
1946        let mut order_report = OrderStatusReport::new(
1947            *account_id,
1948            instrument_id,
1949            None,
1950            venue_order_id,
1951            order_side,
1952            OrderType::Market,
1953            TimeInForce::Gtc,
1954            OrderStatus::Filled,
1955            diff_qty,
1956            diff_qty,
1957            ts_now,
1958            ts_now,
1959            ts_now,
1960            None,
1961        )
1962        .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1963        .ok()?;
1964
1965        if let Some(venue_position_id) = report.venue_position_id {
1966            order_report = order_report.with_venue_position_id(venue_position_id);
1967        }
1968
1969        log::info!(
1970            color = LogColor::Blue as u8;
1971            "Generating reconciliation order for {instrument_id}: side={order_side:?}, qty={diff_qty}, px={fill_px}",
1972        );
1973
1974        let (events, _) =
1975            self.handle_external_order(&order_report, account_id, instrument, &[], true);
1976        Some(events)
1977    }
1978
1979    fn reconcile_order_report(
1980        &self,
1981        order: &OrderAny,
1982        report: &OrderStatusReport,
1983        instrument: Option<&InstrumentAny>,
1984    ) -> Option<OrderEventAny> {
1985        let ts_now = self.clock.borrow().timestamp_ns();
1986        reconcile_order_report(order, report, instrument, ts_now)
1987    }
1988
1989    /// Reconciles an order with its associated fills atomically.
1990    ///
1991    /// For terminal statuses (Canceled), fills are applied BEFORE the terminal event
1992    /// to ensure correct state transitions (matching Python behavior).
1993    fn reconcile_order_with_fills(
1994        &mut self,
1995        order: &mut OrderAny,
1996        report: &OrderStatusReport,
1997        fills: &[&FillReport],
1998        instrument: Option<&InstrumentAny>,
1999    ) -> Vec<OrderEventAny> {
2000        let mut events = Vec::new();
2001        let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2002        sorted_fills.sort_by_key(|f| f.ts_event);
2003
2004        let ts_now = self.clock.borrow().timestamp_ns();
2005
2006        match report.order_status {
2007            OrderStatus::Canceled => {
2008                // Generate Triggered event if ts_triggered is set (matching Python behavior)
2009                if report.ts_triggered.is_some() && order.status() != OrderStatus::Triggered {
2010                    events.push(create_reconciliation_triggered(order, report, ts_now));
2011                }
2012
2013                // Apply fills before Canceled event regardless of current state,
2014                // as the order may have partial fills we haven't seen yet
2015                if let Some(inst) = instrument {
2016                    for fill in &sorted_fills {
2017                        if let Some(event) = self.create_order_fill(order, fill, inst) {
2018                            events.push(event);
2019                        }
2020                    }
2021                }
2022                if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2023                    events.push(event);
2024                }
2025            }
2026            OrderStatus::Expired => {
2027                // Generate Triggered event if ts_triggered is set (matching Python behavior)
2028                if report.ts_triggered.is_some() && order.status() != OrderStatus::Triggered {
2029                    events.push(create_reconciliation_triggered(order, report, ts_now));
2030                }
2031
2032                // Apply fills before Expired event (same as Canceled)
2033                if let Some(inst) = instrument {
2034                    for fill in &sorted_fills {
2035                        if let Some(event) = self.create_order_fill(order, fill, inst) {
2036                            events.push(event);
2037                        }
2038                    }
2039                }
2040                if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2041                    events.push(event);
2042                }
2043            }
2044            _ => {
2045                if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2046                    events.push(event);
2047                }
2048                if let Some(inst) = instrument {
2049                    for fill in &sorted_fills {
2050                        if let Some(event) = self.create_order_fill(order, fill, inst) {
2051                            events.push(event);
2052                        }
2053                    }
2054                }
2055            }
2056        }
2057
2058        events
2059    }
2060
2061    fn handle_external_order(
2062        &mut self,
2063        report: &OrderStatusReport,
2064        account_id: &AccountId,
2065        instrument: &InstrumentAny,
2066        fills: &[&FillReport],
2067        is_synthetic: bool,
2068    ) -> (Vec<OrderEventAny>, Option<ExternalOrderMetadata>) {
2069        let (strategy_id, tags) =
2070            if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
2071                let order_id = report
2072                    .client_order_id
2073                    .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
2074                log::info!(
2075                    color = LogColor::Blue as u8;
2076                    "External order {} for {} claimed by strategy {}",
2077                    order_id,
2078                    report.instrument_id,
2079                    claimed_strategy,
2080                );
2081                (*claimed_strategy, None)
2082            } else {
2083                // Unclaimed orders use EXTERNAL strategy ID with tag distinguishing source
2084                let tag = if is_synthetic {
2085                    *TAG_RECONCILIATION
2086                } else {
2087                    *TAG_VENUE
2088                };
2089                (StrategyId::from("EXTERNAL"), Some(vec![tag]))
2090            };
2091
2092        // Filter unclaimed venue orders (but not synthetic reconciliation orders)
2093        if self.config.filter_unclaimed_external && !is_synthetic {
2094            return (Vec::new(), None);
2095        }
2096
2097        let client_order_id = report
2098            .client_order_id
2099            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
2100
2101        let ts_now = self.clock.borrow().timestamp_ns();
2102
2103        let initialized = OrderInitialized::new(
2104            self.config.trader_id,
2105            strategy_id,
2106            report.instrument_id,
2107            client_order_id,
2108            report.order_side,
2109            report.order_type,
2110            report.quantity,
2111            report.time_in_force,
2112            report.post_only,
2113            report.reduce_only,
2114            false, // quote_quantity
2115            true,  // reconciliation
2116            UUID4::new(),
2117            ts_now,
2118            ts_now,
2119            report.price,
2120            report.trigger_price,
2121            report.trigger_type,
2122            report.limit_offset,
2123            report.trailing_offset,
2124            Some(report.trailing_offset_type),
2125            report.expire_time,
2126            report.display_qty,
2127            None, // emulation_trigger
2128            None, // trigger_instrument_id
2129            Some(report.contingency_type),
2130            report.order_list_id,
2131            report.linked_order_ids.clone(),
2132            report.parent_order_id,
2133            None, // exec_algorithm_id
2134            None, // exec_algorithm_params
2135            None, // exec_spawn_id
2136            tags,
2137        );
2138
2139        let events = vec![OrderEventAny::Initialized(initialized)];
2140        let order = match OrderAny::from_events(events) {
2141            Ok(order) => order,
2142            Err(e) => {
2143                log::error!("Failed to create order from report: {e}");
2144                return (Vec::new(), None);
2145            }
2146        };
2147
2148        {
2149            let mut cache = self.cache.borrow_mut();
2150            if let Err(e) = cache.add_order(order.clone(), None, None, false) {
2151                log::error!("Failed to add external order to cache: {e}");
2152                return (Vec::new(), None);
2153            }
2154
2155            if let Err(e) =
2156                cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
2157            {
2158                log::warn!("Failed to add venue order ID index: {e}");
2159            }
2160        }
2161
2162        log::info!(
2163            color = LogColor::Blue as u8;
2164            "Created external order {} ({}) for {} [{}]",
2165            client_order_id,
2166            report.venue_order_id,
2167            report.instrument_id,
2168            report.order_status,
2169        );
2170
2171        let ts_now = self.clock.borrow().timestamp_ns();
2172
2173        // Generate events for external order: Accepted first, then fills (for terminal statuses),
2174        // then terminal status. This matches Python's behavior.
2175        let mut order_events =
2176            generate_external_order_status_events(&order, report, account_id, instrument, ts_now);
2177
2178        if !fills.is_empty() {
2179            let mut cached_order = self.get_order(&client_order_id).unwrap();
2180            let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2181            sorted_fills.sort_by_key(|f| f.ts_event);
2182
2183            match report.order_status {
2184                OrderStatus::Canceled | OrderStatus::Expired => {
2185                    let terminal_event = order_events.pop();
2186                    for fill in sorted_fills {
2187                        if let Some(fill_event) =
2188                            self.create_order_fill(&mut cached_order, fill, instrument)
2189                        {
2190                            order_events.push(fill_event);
2191                        }
2192                    }
2193                    if let Some(event) = terminal_event {
2194                        order_events.push(event);
2195                    }
2196                }
2197                OrderStatus::Filled | OrderStatus::PartiallyFilled => {
2198                    // Only pop if the last event is a Filled event (the inferred fill)
2199                    if order_events
2200                        .last()
2201                        .is_some_and(|e| matches!(e, OrderEventAny::Filled(_)))
2202                    {
2203                        order_events.pop();
2204                    }
2205
2206                    let mut real_fill_total = Decimal::ZERO;
2207                    for fill in &sorted_fills {
2208                        if let Some(fill_event) =
2209                            self.create_order_fill(&mut cached_order, fill, instrument)
2210                        {
2211                            real_fill_total += fill.last_qty.as_decimal();
2212                            order_events.push(fill_event);
2213                        }
2214                    }
2215
2216                    let report_filled = report.filled_qty.as_decimal();
2217                    if real_fill_total < report_filled {
2218                        let diff_decimal = report_filled - real_fill_total;
2219                        if let Ok(diff) =
2220                            Quantity::from_decimal_dp(diff_decimal, instrument.size_precision())
2221                            && let Some(inferred_fill) = create_inferred_fill_for_qty(
2222                                &cached_order,
2223                                report,
2224                                account_id,
2225                                instrument,
2226                                diff,
2227                                ts_now,
2228                            )
2229                        {
2230                            order_events.push(inferred_fill);
2231                        }
2232                    }
2233                }
2234                _ => {}
2235            }
2236        }
2237
2238        let metadata = ExternalOrderMetadata {
2239            client_order_id,
2240            venue_order_id: report.venue_order_id,
2241            instrument_id: report.instrument_id,
2242            strategy_id,
2243            ts_init: ts_now,
2244        };
2245
2246        (order_events, Some(metadata))
2247    }
2248
2249    /// Adjusts fills for instruments with incomplete first lifecycle (partial window).
2250    ///
2251    /// When historical fills don't fully explain the current position (e.g., lookback window
2252    /// started mid-position), this creates synthetic fills to align with the venue position.
2253    fn adjust_mass_status_fills(
2254        &self,
2255        mass_status: &ExecutionMassStatus,
2256    ) -> (
2257        IndexMap<VenueOrderId, OrderStatusReport>,
2258        IndexMap<VenueOrderId, Vec<FillReport>>,
2259    ) {
2260        let mut final_orders: IndexMap<VenueOrderId, OrderStatusReport> =
2261            mass_status.order_reports();
2262        let mut final_fills: IndexMap<VenueOrderId, Vec<FillReport>> = mass_status.fill_reports();
2263
2264        let mut instruments_to_adjust = Vec::new();
2265        for (instrument_id, position_reports) in mass_status.position_reports() {
2266            if !self.should_reconcile_instrument(&instrument_id) {
2267                log::debug!(
2268                    "Skipping fill adjustment for {instrument_id}: not in reconciliation_instrument_ids"
2269                );
2270                continue;
2271            }
2272
2273            // Skip hedge mode instruments (have venue_position_id) as partial-window
2274            // adjustment assumes a single net position per instrument
2275            let is_hedge_mode = position_reports
2276                .iter()
2277                .any(|r| r.venue_position_id.is_some());
2278            if is_hedge_mode {
2279                log::debug!(
2280                    "Skipping fill adjustment for {instrument_id}: hedge mode (has venue_position_id)"
2281                );
2282                continue;
2283            }
2284
2285            if let Some(instrument) = self.get_instrument(&instrument_id) {
2286                instruments_to_adjust.push(instrument);
2287            } else {
2288                log::debug!(
2289                    "Skipping fill adjustment for {instrument_id}: instrument not found in cache"
2290                );
2291            }
2292        }
2293
2294        if instruments_to_adjust.is_empty() {
2295            return (final_orders, final_fills);
2296        }
2297
2298        log_info!(
2299            "Adjusting fills for {} instrument(s) with position reports",
2300            instruments_to_adjust.len(),
2301            color = LogColor::Blue
2302        );
2303
2304        for instrument in &instruments_to_adjust {
2305            let instrument_id = instrument.id();
2306
2307            match process_mass_status_for_reconciliation(mass_status, instrument, None) {
2308                Ok(result) => {
2309                    final_orders.retain(|_, order| order.instrument_id != instrument_id);
2310                    final_fills.retain(|_, fills| {
2311                        fills
2312                            .first()
2313                            .is_none_or(|f| f.instrument_id != instrument_id)
2314                    });
2315
2316                    for (venue_order_id, order) in result.orders {
2317                        final_orders.insert(venue_order_id, order);
2318                    }
2319                    for (venue_order_id, fills) in result.fills {
2320                        final_fills.insert(venue_order_id, fills);
2321                    }
2322                }
2323                Err(e) => {
2324                    log::warn!("Failed to adjust fills for {instrument_id}: {e}");
2325                }
2326            }
2327        }
2328
2329        log_info!(
2330            "After adjustment: {} order(s), {} fill group(s)",
2331            final_orders.len(),
2332            final_fills.len(),
2333            color = LogColor::Blue
2334        );
2335
2336        (final_orders, final_fills)
2337    }
2338
2339    /// Deduplicates order reports, keeping the most advanced state per venue_order_id.
2340    ///
2341    /// When a batch contains multiple reports for the same order, we keep the one with
2342    /// the highest filled_qty (most progress), or if equal, the most terminal status.
2343    fn deduplicate_order_reports<'a>(
2344        &self,
2345        reports: impl Iterator<Item = &'a OrderStatusReport>,
2346    ) -> AHashMap<VenueOrderId, &'a OrderStatusReport> {
2347        let mut best_reports: AHashMap<VenueOrderId, &'a OrderStatusReport> = AHashMap::new();
2348
2349        for report in reports {
2350            let dominated = best_reports
2351                .get(&report.venue_order_id)
2352                .is_some_and(|existing| self.is_more_advanced(existing, report));
2353
2354            if !dominated {
2355                best_reports.insert(report.venue_order_id, report);
2356            }
2357        }
2358
2359        best_reports
2360    }
2361
2362    fn is_more_advanced(&self, a: &OrderStatusReport, b: &OrderStatusReport) -> bool {
2363        if a.filled_qty > b.filled_qty {
2364            return true;
2365        }
2366        if a.filled_qty < b.filled_qty {
2367            return false;
2368        }
2369
2370        // Equal filled_qty - compare status (terminal states are more advanced)
2371        Self::status_priority(a.order_status) > Self::status_priority(b.order_status)
2372    }
2373
2374    const fn status_priority(status: OrderStatus) -> u8 {
2375        match status {
2376            OrderStatus::Initialized | OrderStatus::Submitted | OrderStatus::Emulated => 0,
2377            OrderStatus::Released | OrderStatus::Denied => 1,
2378            OrderStatus::Accepted | OrderStatus::PendingUpdate | OrderStatus::PendingCancel => 2,
2379            OrderStatus::Triggered => 3,
2380            OrderStatus::PartiallyFilled => 4,
2381            OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => 5,
2382            OrderStatus::Filled => 6,
2383        }
2384    }
2385
2386    fn is_exact_order_match(&self, order: &OrderAny, report: &OrderStatusReport) -> bool {
2387        order.status() == report.order_status
2388            && order.filled_qty() == report.filled_qty
2389            && !should_reconciliation_update(order, report)
2390    }
2391
2392    fn create_order_fill(
2393        &mut self,
2394        order: &mut OrderAny,
2395        fill: &FillReport,
2396        instrument: &InstrumentAny,
2397    ) -> Option<OrderEventAny> {
2398        if self.processed_fills.contains_key(&fill.trade_id) {
2399            return None;
2400        }
2401
2402        self.processed_fills
2403            .insert(fill.trade_id, order.client_order_id());
2404
2405        Some(OrderEventAny::Filled(OrderFilled::new(
2406            order.trader_id(),
2407            order.strategy_id(),
2408            order.instrument_id(),
2409            order.client_order_id(),
2410            fill.venue_order_id,
2411            fill.account_id,
2412            fill.trade_id,
2413            fill.order_side,
2414            order.order_type(),
2415            fill.last_qty,
2416            fill.last_px,
2417            instrument.quote_currency(),
2418            fill.liquidity_side,
2419            fill.report_id,
2420            fill.ts_event,
2421            self.clock.borrow().timestamp_ns(),
2422            false,
2423            fill.venue_position_id,
2424            Some(fill.commission),
2425        )))
2426    }
2427}