nautilus_live/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Execution state manager for live trading.
17//!
18//! This module provides the execution manager for reconciling execution state between
19//! the local cache and connected venues, as well as purging old state during live trading.
20
21use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25    cache::Cache,
26    clients::ExecutionClient,
27    clock::Clock,
28    enums::LogColor,
29    log_info,
30    messages::execution::report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
31};
32use nautilus_core::{UUID4, UnixNanos};
33use nautilus_execution::reconciliation::{
34    calculate_reconciliation_price, create_reconciliation_rejected,
35    create_synthetic_venue_order_id, generate_external_order_status_events, reconcile_order_report,
36    should_reconciliation_update,
37};
38use nautilus_model::{
39    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
40    events::{OrderEventAny, OrderFilled, OrderInitialized},
41    identifiers::{
42        AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
43    },
44    instruments::{Instrument, InstrumentAny},
45    orders::{Order, OrderAny},
46    position::Position,
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48    types::Quantity,
49};
50use rust_decimal::{Decimal, prelude::ToPrimitive};
51use ustr::Ustr;
52
53use crate::config::LiveExecEngineConfig;
54
55/// Configuration for execution manager.
56#[derive(Debug, Clone)]
57pub struct ExecutionManagerConfig {
58    /// The trader ID for generated orders.
59    pub trader_id: TraderId,
60    /// If reconciliation is active at start-up.
61    pub reconciliation: bool,
62    /// The delay (seconds) before starting reconciliation at startup.
63    pub reconciliation_startup_delay_secs: f64,
64    /// Number of minutes to look back during reconciliation.
65    pub lookback_mins: Option<u64>,
66    /// Instrument IDs to include during reconciliation (empty => all).
67    pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
68    /// Whether to filter unclaimed external orders.
69    pub filter_unclaimed_external: bool,
70    /// Whether to filter position status reports during reconciliation.
71    pub filter_position_reports: bool,
72    /// Client order IDs excluded from reconciliation.
73    pub filtered_client_order_ids: AHashSet<ClientOrderId>,
74    /// Whether to generate missing orders from reports.
75    pub generate_missing_orders: bool,
76    /// The interval (milliseconds) between checking whether in-flight orders have exceeded their threshold.
77    pub inflight_check_interval_ms: u32,
78    /// Threshold in milliseconds for inflight order checks.
79    pub inflight_threshold_ms: u64,
80    /// Maximum number of retries for inflight checks.
81    pub inflight_max_retries: u32,
82    /// The interval (seconds) between checks for open orders at the venue.
83    pub open_check_interval_secs: Option<f64>,
84    /// The lookback minutes for open order checks.
85    pub open_check_lookback_mins: Option<u64>,
86    /// Threshold in nanoseconds before acting on venue discrepancies for open orders.
87    pub open_check_threshold_ns: u64,
88    /// Maximum retries before resolving an open order missing at the venue.
89    pub open_check_missing_retries: u32,
90    /// Whether open-order polling should only request open orders from the venue.
91    pub open_check_open_only: bool,
92    /// The maximum number of single-order queries per consistency check cycle.
93    pub max_single_order_queries_per_cycle: u32,
94    /// The delay (milliseconds) between consecutive single-order queries.
95    pub single_order_query_delay_ms: u32,
96    /// The interval (seconds) between checks for open positions at the venue.
97    pub position_check_interval_secs: Option<f64>,
98    /// The lookback minutes for position consistency checks.
99    pub position_check_lookback_mins: u64,
100    /// Threshold in nanoseconds before acting on venue discrepancies for positions.
101    pub position_check_threshold_ns: u64,
102    /// The time buffer (minutes) before closed orders can be purged.
103    pub purge_closed_orders_buffer_mins: Option<u32>,
104    /// The time buffer (minutes) before closed positions can be purged.
105    pub purge_closed_positions_buffer_mins: Option<u32>,
106    /// The time buffer (minutes) before account events can be purged.
107    pub purge_account_events_lookback_mins: Option<u32>,
108    /// If purge operations should also delete from the backing database.
109    pub purge_from_database: bool,
110}
111
112impl Default for ExecutionManagerConfig {
113    fn default() -> Self {
114        Self {
115            trader_id: TraderId::default(),
116            reconciliation: true,
117            reconciliation_startup_delay_secs: 10.0,
118            lookback_mins: Some(60),
119            reconciliation_instrument_ids: AHashSet::new(),
120            filter_unclaimed_external: false,
121            filter_position_reports: false,
122            filtered_client_order_ids: AHashSet::new(),
123            generate_missing_orders: true,
124            inflight_check_interval_ms: 2_000,
125            inflight_threshold_ms: 5_000,
126            inflight_max_retries: 5,
127            open_check_interval_secs: None,
128            open_check_lookback_mins: Some(60),
129            open_check_threshold_ns: 5_000_000_000,
130            open_check_missing_retries: 5,
131            open_check_open_only: true,
132            max_single_order_queries_per_cycle: 5,
133            single_order_query_delay_ms: 100,
134            position_check_interval_secs: None,
135            position_check_lookback_mins: 60,
136            position_check_threshold_ns: 60_000_000_000,
137            purge_closed_orders_buffer_mins: None,
138            purge_closed_positions_buffer_mins: None,
139            purge_account_events_lookback_mins: None,
140            purge_from_database: false,
141        }
142    }
143}
144
145impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
146    fn from(config: &LiveExecEngineConfig) -> Self {
147        let filtered_client_order_ids: AHashSet<ClientOrderId> = config
148            .filtered_client_order_ids
149            .clone()
150            .unwrap_or_default()
151            .into_iter()
152            .map(|value| ClientOrderId::from(value.as_str()))
153            .collect();
154
155        let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
156            .reconciliation_instrument_ids
157            .clone()
158            .unwrap_or_default()
159            .into_iter()
160            .map(|value| InstrumentId::from(value.as_str()))
161            .collect();
162
163        Self {
164            trader_id: TraderId::default(), // Must be set separately via with_trader_id
165            reconciliation: config.reconciliation,
166            reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
167            lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
168            reconciliation_instrument_ids,
169            filter_unclaimed_external: config.filter_unclaimed_external_orders,
170            filter_position_reports: config.filter_position_reports,
171            filtered_client_order_ids,
172            generate_missing_orders: config.generate_missing_orders,
173            inflight_check_interval_ms: config.inflight_check_interval_ms,
174            inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
175            inflight_max_retries: config.inflight_check_retries,
176            open_check_interval_secs: config.open_check_interval_secs,
177            open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
178            open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
179            open_check_missing_retries: config.open_check_missing_retries,
180            open_check_open_only: config.open_check_open_only,
181            max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
182            single_order_query_delay_ms: config.single_order_query_delay_ms,
183            position_check_interval_secs: config.position_check_interval_secs,
184            position_check_lookback_mins: config.position_check_lookback_mins as u64,
185            position_check_threshold_ns: (config.position_check_threshold_ms as u64) * 1_000_000,
186            purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
187            purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
188            purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
189            purge_from_database: config.purge_from_database,
190        }
191    }
192}
193
194impl ExecutionManagerConfig {
195    /// Sets the trader ID on the configuration.
196    #[must_use]
197    pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
198        self.trader_id = trader_id;
199        self
200    }
201}
202
203/// Execution report for continuous reconciliation.
204/// This is a simplified report type used during runtime reconciliation.
205#[derive(Debug, Clone)]
206pub struct ExecutionReport {
207    pub client_order_id: ClientOrderId,
208    pub venue_order_id: Option<VenueOrderId>,
209    pub status: OrderStatus,
210    pub filled_qty: Quantity,
211    pub avg_px: Option<f64>,
212    pub ts_event: UnixNanos,
213}
214
215/// Information about an inflight order check.
216#[derive(Debug, Clone)]
217struct InflightCheck {
218    #[allow(dead_code)]
219    pub client_order_id: ClientOrderId,
220    pub ts_submitted: UnixNanos,
221    pub retry_count: u32,
222    pub last_query_ts: Option<UnixNanos>,
223}
224
225/// Manager for execution state.
226///
227/// The `ExecutionManager` handles:
228/// - Startup reconciliation to align state on system start.
229/// - Continuous reconciliation of inflight orders.
230/// - External order discovery and claiming.
231/// - Fill report processing and validation.
232/// - Purging of old orders, positions, and account events.
233///
234/// # Thread Safety
235///
236/// This struct is **not thread-safe** and is designed for single-threaded use within
237/// an async runtime. Internal state is managed using `AHashMap` without synchronization,
238/// and the `clock` and `cache` use `Rc<RefCell<>>` which provide runtime borrow checking
239/// but no thread-safety guarantees.
240///
241/// If concurrent access is required, this struct must be wrapped in `Arc<Mutex<>>` or
242/// similar synchronization primitives. Alternatively, ensure that all methods are called
243/// from the same thread/task in the async runtime.
244///
245/// **Warning:** Concurrent mutable access to internal AHashMaps or concurrent borrows
246/// of `RefCell` contents will cause runtime panics.
247#[derive(Clone)]
248pub struct ExecutionManager {
249    clock: Rc<RefCell<dyn Clock>>,
250    cache: Rc<RefCell<Cache>>,
251    config: ExecutionManagerConfig,
252    inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
253    external_order_claims: AHashMap<InstrumentId, StrategyId>,
254    processed_fills: AHashMap<TradeId, ClientOrderId>,
255    recon_check_retries: AHashMap<ClientOrderId, u32>,
256    ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
257    order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
258    position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
259    recent_fills_cache: AHashMap<TradeId, UnixNanos>,
260}
261
262impl Debug for ExecutionManager {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        f.debug_struct(stringify!(ExecutionManager))
265            .field("config", &self.config)
266            .field("inflight_checks", &self.inflight_checks)
267            .field("external_order_claims", &self.external_order_claims)
268            .field("processed_fills", &self.processed_fills)
269            .field("recon_check_retries", &self.recon_check_retries)
270            .finish()
271    }
272}
273
274impl ExecutionManager {
275    /// Creates a new [`ExecutionManager`] instance.
276    pub fn new(
277        clock: Rc<RefCell<dyn Clock>>,
278        cache: Rc<RefCell<Cache>>,
279        config: ExecutionManagerConfig,
280    ) -> Self {
281        Self {
282            clock,
283            cache,
284            config,
285            inflight_checks: AHashMap::new(),
286            external_order_claims: AHashMap::new(),
287            processed_fills: AHashMap::new(),
288            recon_check_retries: AHashMap::new(),
289            ts_last_query: AHashMap::new(),
290            order_local_activity_ns: AHashMap::new(),
291            position_local_activity_ns: AHashMap::new(),
292            recent_fills_cache: AHashMap::new(),
293        }
294    }
295
296    /// Reconciles orders and fills from a mass status report.
297    pub async fn reconcile_execution_mass_status(
298        &mut self,
299        mass_status: ExecutionMassStatus,
300    ) -> Vec<OrderEventAny> {
301        let venue = mass_status.venue;
302        let order_count = mass_status.order_reports().len();
303        let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
304        let position_count = mass_status.position_reports().len();
305
306        log_info!(
307            "Reconciling ExecutionMassStatus for {}",
308            venue,
309            color = LogColor::Blue
310        );
311        log_info!(
312            "Received {} order(s), {} fill(s), {} position(s)",
313            order_count,
314            fill_count,
315            position_count,
316            color = LogColor::Blue
317        );
318
319        let mut events = Vec::new();
320        let mut orders_reconciled = 0usize;
321        let mut external_orders_created = 0usize;
322        let mut open_orders_initialized = 0usize;
323        let mut orders_skipped_no_instrument = 0usize;
324        let mut orders_skipped_duplicate = 0usize;
325        let mut fills_applied = 0usize;
326
327        // Deduplicate reports by venue_order_id, keeping the most advanced state
328        let reports = mass_status.order_reports();
329        let order_reports = self.deduplicate_order_reports(reports.values());
330
331        for report in order_reports.values() {
332            if let Some(client_order_id) = &report.client_order_id {
333                if let Some(cached_order) = self.get_order(client_order_id)
334                    && self.is_exact_order_match(&cached_order, report)
335                {
336                    log::debug!("Skipping order {client_order_id}: already in sync with venue");
337                    orders_skipped_duplicate += 1;
338                    continue;
339                }
340
341                if let Some(order) = self.get_order(client_order_id) {
342                    let instrument = self.get_instrument(&report.instrument_id);
343                    log::info!(
344                        color = LogColor::Blue as u8;
345                        "Reconciling {} {} {} [{}] -> [{}]",
346                        client_order_id,
347                        report.venue_order_id,
348                        report.instrument_id,
349                        order.status(),
350                        report.order_status,
351                    );
352                    if let Some(event) =
353                        self.reconcile_order_report(&order, report, instrument.as_ref())
354                    {
355                        orders_reconciled += 1;
356                        events.push(event);
357                    }
358                } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id)
359                {
360                    // Fallback: match by venue_order_id
361                    let instrument = self.get_instrument(&report.instrument_id);
362                    log::info!(
363                        color = LogColor::Blue as u8;
364                        "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
365                        order.client_order_id(),
366                        report.venue_order_id,
367                        report.instrument_id,
368                        order.status(),
369                        report.order_status,
370                    );
371                    if let Some(event) =
372                        self.reconcile_order_report(&order, report, instrument.as_ref())
373                    {
374                        orders_reconciled += 1;
375                        events.push(event);
376                    }
377                } else if !self.config.filter_unclaimed_external {
378                    if let Some(instrument) = self.get_instrument(&report.instrument_id) {
379                        let external_events = self.handle_external_order(
380                            report,
381                            &mass_status.account_id,
382                            &instrument,
383                        );
384                        if !external_events.is_empty() {
385                            external_orders_created += 1;
386                            if report.order_status.is_open() {
387                                open_orders_initialized += 1;
388                            }
389                            events.extend(external_events);
390                        }
391                    } else {
392                        orders_skipped_no_instrument += 1;
393                    }
394                }
395            } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id) {
396                // Fallback: match by venue_order_id
397                let instrument = self.get_instrument(&report.instrument_id);
398                log::info!(
399                    color = LogColor::Blue as u8;
400                    "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
401                    order.client_order_id(),
402                    report.venue_order_id,
403                    report.instrument_id,
404                    order.status(),
405                    report.order_status,
406                );
407                if let Some(event) =
408                    self.reconcile_order_report(&order, report, instrument.as_ref())
409                {
410                    orders_reconciled += 1;
411                    events.push(event);
412                }
413            } else if !self.config.filter_unclaimed_external {
414                if let Some(instrument) = self.get_instrument(&report.instrument_id) {
415                    let external_events =
416                        self.handle_external_order(report, &mass_status.account_id, &instrument);
417                    if !external_events.is_empty() {
418                        external_orders_created += 1;
419                        if report.order_status.is_open() {
420                            open_orders_initialized += 1;
421                        }
422                        events.extend(external_events);
423                    }
424                } else {
425                    orders_skipped_no_instrument += 1;
426                }
427            }
428        }
429
430        // Sort fills chronologically to ensure proper position updates
431        let fill_reports = mass_status.fill_reports();
432        let mut all_fills: Vec<&FillReport> = fill_reports.values().flatten().collect();
433        all_fills.sort_by_key(|f| f.ts_event);
434
435        for fill in all_fills {
436            if let Some(client_order_id) = &fill.client_order_id
437                && let Some(order) = self.get_order(client_order_id)
438            {
439                let mut order = order;
440                let instrument_id = order.instrument_id();
441
442                if let Some(instrument) = self.get_instrument(&instrument_id)
443                    && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
444                {
445                    fills_applied += 1;
446                    events.push(event);
447                }
448            }
449        }
450
451        if orders_skipped_no_instrument > 0 {
452            log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
453        }
454
455        if orders_skipped_duplicate > 0 {
456            log::debug!("{orders_skipped_duplicate} orders skipped (already in sync)");
457        }
458
459        log::info!(
460            color = LogColor::Blue as u8;
461            "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}, skipped={orders_skipped_duplicate}",
462        );
463
464        // Sort events chronologically to ensure proper position updates
465        events.sort_by_key(|e| e.ts_event());
466
467        events
468    }
469
470    /// Reconciles a single execution report during runtime.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the average price cannot be converted to a valid `Decimal`.
475    pub fn reconcile_report(
476        &mut self,
477        report: ExecutionReport,
478    ) -> anyhow::Result<Vec<OrderEventAny>> {
479        let mut events = Vec::new();
480
481        self.clear_recon_tracking(&report.client_order_id, true);
482
483        if let Some(order) = self.get_order(&report.client_order_id) {
484            let Some(account_id) = order.account_id() else {
485                log::error!("Cannot process fill report: order has no account_id");
486                return Ok(vec![]);
487            };
488            let Some(venue_order_id) = report.venue_order_id else {
489                log::error!("Cannot process fill report: report has no venue_order_id");
490                return Ok(vec![]);
491            };
492            let mut order_report = OrderStatusReport::new(
493                account_id,
494                order.instrument_id(),
495                Some(report.client_order_id),
496                venue_order_id,
497                order.order_side(),
498                order.order_type(),
499                order.time_in_force(),
500                report.status,
501                order.quantity(),
502                report.filled_qty,
503                report.ts_event, // Use ts_event as ts_accepted
504                report.ts_event, // Use ts_event as ts_last
505                self.clock.borrow().timestamp_ns(),
506                Some(UUID4::new()),
507            );
508
509            if let Some(avg_px) = report.avg_px {
510                order_report = order_report.with_avg_px(avg_px)?;
511            }
512
513            let instrument = self.get_instrument(&order.instrument_id());
514            if let Some(event) =
515                self.reconcile_order_report(&order, &order_report, instrument.as_ref())
516            {
517                events.push(event);
518            }
519        }
520
521        Ok(events)
522    }
523
524    /// Checks inflight orders and returns events for any that need reconciliation.
525    pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
526        let mut events = Vec::new();
527        let current_time = self.clock.borrow().timestamp_ns();
528        let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
529
530        let mut to_check = Vec::new();
531        for (client_order_id, check) in &self.inflight_checks {
532            if current_time - check.ts_submitted > threshold_ns {
533                to_check.push(*client_order_id);
534            }
535        }
536
537        for client_order_id in to_check {
538            if self
539                .config
540                .filtered_client_order_ids
541                .contains(&client_order_id)
542            {
543                continue;
544            }
545
546            if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
547                if let Some(last_query_ts) = check.last_query_ts
548                    && current_time - last_query_ts < threshold_ns
549                {
550                    continue;
551                }
552
553                check.retry_count += 1;
554                check.last_query_ts = Some(current_time);
555                self.ts_last_query.insert(client_order_id, current_time);
556                self.recon_check_retries
557                    .insert(client_order_id, check.retry_count);
558
559                if check.retry_count >= self.config.inflight_max_retries {
560                    // Generate rejection after max retries
561                    let ts_now = self.clock.borrow().timestamp_ns();
562                    if let Some(order) = self.get_order(&client_order_id)
563                        && let Some(event) =
564                            create_reconciliation_rejected(&order, Some("INFLIGHT_TIMEOUT"), ts_now)
565                    {
566                        events.push(event);
567                    }
568                    // Remove from inflight checks regardless of whether order exists
569                    self.clear_recon_tracking(&client_order_id, true);
570                }
571            }
572        }
573
574        events
575    }
576
577    /// Checks open orders consistency between cache and venue.
578    ///
579    /// This method validates that open orders in the cache match the venue's state,
580    /// comparing order status and filled quantities, and generating reconciliation
581    /// events for any discrepancies detected.
582    ///
583    /// # Returns
584    ///
585    /// A vector of order events generated to reconcile discrepancies.
586    pub async fn check_open_orders(
587        &mut self,
588        clients: &[Rc<dyn ExecutionClient>],
589    ) -> Vec<OrderEventAny> {
590        log::debug!("Checking order consistency between cached-state and venues");
591
592        let filtered_orders: Vec<OrderAny> = {
593            let cache = self.cache.borrow();
594            let open_orders = cache.orders_open(None, None, None, None);
595
596            if self.config.reconciliation_instrument_ids.is_empty() {
597                open_orders.iter().map(|o| (*o).clone()).collect()
598            } else {
599                open_orders
600                    .iter()
601                    .filter(|o| {
602                        self.config
603                            .reconciliation_instrument_ids
604                            .contains(&o.instrument_id())
605                    })
606                    .map(|o| (*o).clone())
607                    .collect()
608            }
609        };
610
611        log::debug!(
612            "Found {} order{} open in cache",
613            filtered_orders.len(),
614            if filtered_orders.len() == 1 { "" } else { "s" }
615        );
616
617        let mut all_reports = Vec::new();
618        let mut venue_reported_ids = AHashSet::new();
619
620        for client in clients {
621            let cmd = GenerateOrderStatusReports::new(
622                UUID4::new(),
623                self.clock.borrow().timestamp_ns(),
624                true, // open_only
625                None, // instrument_id - query all
626                None, // start
627                None, // end
628                None, // params
629                None, // correlation_id
630            );
631
632            match client.generate_order_status_reports(&cmd).await {
633                Ok(reports) => {
634                    for report in reports {
635                        if let Some(client_order_id) = &report.client_order_id {
636                            venue_reported_ids.insert(*client_order_id);
637                        }
638                        all_reports.push(report);
639                    }
640                }
641                Err(e) => {
642                    log::error!(
643                        "Failed to query order reports from {}: {e}",
644                        client.client_id()
645                    );
646                }
647            }
648        }
649
650        // Reconcile reports against cached orders
651        let mut events = Vec::new();
652        for report in all_reports {
653            if let Some(client_order_id) = &report.client_order_id
654                && let Some(order) = self.get_order(client_order_id)
655            {
656                let instrument = self.get_instrument(&report.instrument_id);
657                if let Some(event) =
658                    self.reconcile_order_report(&order, &report, instrument.as_ref())
659                {
660                    events.push(event);
661                }
662            }
663        }
664
665        // Handle orders missing at venue
666        if !self.config.open_check_open_only {
667            let cached_ids: AHashSet<ClientOrderId> = filtered_orders
668                .iter()
669                .map(|o| o.client_order_id())
670                .collect();
671            let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
672                .difference(&venue_reported_ids)
673                .copied()
674                .collect();
675
676            for client_order_id in missing_at_venue {
677                events.extend(self.handle_missing_order(client_order_id));
678            }
679        }
680
681        events
682    }
683
684    /// Checks position consistency between cache and venue.
685    ///
686    /// This method validates that positions in the cache match the venue's state,
687    /// detecting position drift and querying for missing fills when discrepancies
688    /// are found.
689    ///
690    /// # Returns
691    ///
692    /// A vector of fill events generated to reconcile position discrepancies.
693    pub async fn check_positions_consistency(
694        &mut self,
695        clients: &[Rc<dyn ExecutionClient>],
696    ) -> Vec<OrderEventAny> {
697        log::debug!("Checking position consistency between cached-state and venues");
698
699        let open_positions = {
700            let cache = self.cache.borrow();
701            let positions = cache.positions_open(None, None, None, None);
702
703            if self.config.reconciliation_instrument_ids.is_empty() {
704                positions.iter().map(|p| (*p).clone()).collect()
705            } else {
706                positions
707                    .iter()
708                    .filter(|p| {
709                        self.config
710                            .reconciliation_instrument_ids
711                            .contains(&p.instrument_id)
712                    })
713                    .map(|p| (*p).clone())
714                    .collect::<Vec<_>>()
715            }
716        };
717
718        log::debug!(
719            "Found {} position{} to check",
720            open_positions.len(),
721            if open_positions.len() == 1 { "" } else { "s" }
722        );
723
724        // Query venue for position reports
725        let mut venue_positions = AHashMap::new();
726
727        for client in clients {
728            let cmd = GeneratePositionStatusReports::new(
729                UUID4::new(),
730                self.clock.borrow().timestamp_ns(),
731                None, // instrument_id - query all
732                None, // start
733                None, // end
734                None, // params
735                None, // correlation_id
736            );
737
738            match client.generate_position_status_reports(&cmd).await {
739                Ok(reports) => {
740                    for report in reports {
741                        venue_positions.insert(report.instrument_id, report);
742                    }
743                }
744                Err(e) => {
745                    log::error!(
746                        "Failed to query position reports from {}: {e}",
747                        client.client_id()
748                    );
749                }
750            }
751        }
752
753        // Check for discrepancies
754        let mut events = Vec::new();
755
756        for position in &open_positions {
757            // Skip if not in filter
758            if !self.config.reconciliation_instrument_ids.is_empty()
759                && !self
760                    .config
761                    .reconciliation_instrument_ids
762                    .contains(&position.instrument_id)
763            {
764                continue;
765            }
766
767            let venue_report = venue_positions.get(&position.instrument_id);
768
769            if let Some(discrepancy_events) =
770                self.check_position_discrepancy(position, venue_report)
771            {
772                events.extend(discrepancy_events);
773            }
774        }
775
776        events
777    }
778
779    /// Registers an order as inflight for tracking.
780    pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
781        let ts_submitted = self.clock.borrow().timestamp_ns();
782        self.inflight_checks.insert(
783            client_order_id,
784            InflightCheck {
785                client_order_id,
786                ts_submitted,
787                retry_count: 0,
788                last_query_ts: None,
789            },
790        );
791        self.recon_check_retries.insert(client_order_id, 0);
792        self.ts_last_query.remove(&client_order_id);
793        self.order_local_activity_ns.remove(&client_order_id);
794    }
795
796    /// Records local activity for the specified order.
797    pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
798        self.order_local_activity_ns
799            .insert(client_order_id, ts_event);
800    }
801
802    /// Clears reconciliation tracking state for an order.
803    pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
804        self.inflight_checks.remove(client_order_id);
805        self.recon_check_retries.remove(client_order_id);
806        if drop_last_query {
807            self.ts_last_query.remove(client_order_id);
808        }
809        self.order_local_activity_ns.remove(client_order_id);
810    }
811
812    /// Claims external orders for a specific strategy and instrument.
813    pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
814        self.external_order_claims
815            .insert(instrument_id, strategy_id);
816    }
817
818    /// Records position activity for reconciliation tracking.
819    pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
820        self.position_local_activity_ns
821            .insert(instrument_id, ts_event);
822    }
823
824    /// Checks if a fill has been recently processed (for deduplication).
825    pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
826        self.recent_fills_cache.contains_key(trade_id)
827    }
828
829    /// Marks a fill as recently processed with current timestamp.
830    pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
831        let ts_now = self.clock.borrow().timestamp_ns();
832        self.recent_fills_cache.insert(trade_id, ts_now);
833    }
834
835    /// Prunes expired fills from the recent fills cache.
836    ///
837    /// Default TTL is 60 seconds.
838    pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
839        let ts_now = self.clock.borrow().timestamp_ns();
840        let ttl_ns = (ttl_secs * 1_000_000_000.0) as u64;
841
842        self.recent_fills_cache
843            .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
844    }
845
846    /// Purges closed orders from the cache that are older than the configured buffer.
847    pub fn purge_closed_orders(&mut self) {
848        let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
849            return;
850        };
851
852        let ts_now = self.clock.borrow().timestamp_ns();
853        let buffer_secs = (buffer_mins as u64) * 60;
854
855        self.cache
856            .borrow_mut()
857            .purge_closed_orders(ts_now, buffer_secs);
858    }
859
860    /// Purges closed positions from the cache that are older than the configured buffer.
861    pub fn purge_closed_positions(&mut self) {
862        let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
863            return;
864        };
865
866        let ts_now = self.clock.borrow().timestamp_ns();
867        let buffer_secs = (buffer_mins as u64) * 60;
868
869        self.cache
870            .borrow_mut()
871            .purge_closed_positions(ts_now, buffer_secs);
872    }
873
874    /// Purges old account events from the cache based on the configured lookback.
875    pub fn purge_account_events(&mut self) {
876        let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
877            return;
878        };
879
880        let ts_now = self.clock.borrow().timestamp_ns();
881        let lookback_secs = (lookback_mins as u64) * 60;
882
883        self.cache
884            .borrow_mut()
885            .purge_account_events(ts_now, lookback_secs);
886    }
887
888    // Private helper methods
889
890    fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
891        self.cache.borrow().order(client_order_id).cloned()
892    }
893
894    fn get_order_by_venue_order_id(&self, venue_order_id: &VenueOrderId) -> Option<OrderAny> {
895        let cache = self.cache.borrow();
896        cache
897            .client_order_id(venue_order_id)
898            .and_then(|client_order_id| cache.order(client_order_id).cloned())
899    }
900
901    fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
902        self.cache.borrow().instrument(instrument_id).cloned()
903    }
904
905    fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
906        let mut events = Vec::new();
907
908        let Some(order) = self.get_order(&client_order_id) else {
909            return events;
910        };
911
912        let ts_now = self.clock.borrow().timestamp_ns();
913        let ts_last = order.ts_last();
914
915        // Check if order is too recent
916        if (ts_now - ts_last) < self.config.open_check_threshold_ns {
917            return events;
918        }
919
920        // Check local activity threshold
921        if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
922            && (ts_now - last_activity) < self.config.open_check_threshold_ns
923        {
924            return events;
925        }
926
927        // Increment retry counter
928        let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
929        *retries += 1;
930
931        // If max retries exceeded, generate rejection event
932        if *retries >= self.config.open_check_missing_retries {
933            log::warn!(
934                "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
935            );
936
937            let ts_now = self.clock.borrow().timestamp_ns();
938            if let Some(rejected) =
939                create_reconciliation_rejected(&order, Some("NOT_FOUND_AT_VENUE"), ts_now)
940            {
941                events.push(rejected);
942            }
943
944            self.clear_recon_tracking(&client_order_id, true);
945        } else {
946            log::debug!(
947                "Order {} not found at venue, retry {}/{}",
948                client_order_id,
949                retries,
950                self.config.open_check_missing_retries
951            );
952        }
953
954        events
955    }
956
957    fn check_position_discrepancy(
958        &mut self,
959        position: &Position,
960        venue_report: Option<&PositionStatusReport>,
961    ) -> Option<Vec<OrderEventAny>> {
962        // Use signed quantities to detect both magnitude and side discrepancies
963        let cached_signed_qty =
964            Decimal::from_f64_retain(position.signed_qty).unwrap_or(Decimal::ZERO);
965        let venue_signed_qty = venue_report.map_or(Decimal::ZERO, |r| r.signed_decimal_qty);
966
967        let tolerance = Decimal::from_str("0.00000001").unwrap();
968        if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
969            return None; // No discrepancy
970        }
971
972        // Check activity threshold
973        let ts_now = self.clock.borrow().timestamp_ns();
974        if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
975            && (ts_now - last_activity) < self.config.position_check_threshold_ns
976        {
977            log::debug!(
978                "Skipping position reconciliation for {}: recent activity within threshold",
979                position.instrument_id
980            );
981            return None;
982        }
983
984        log::warn!(
985            "Position discrepancy detected for {}: cached_signed_qty={}, venue_signed_qty={}",
986            position.instrument_id,
987            cached_signed_qty,
988            venue_signed_qty
989        );
990
991        let instrument = self
992            .cache
993            .borrow()
994            .instrument(&position.instrument_id)?
995            .clone();
996
997        let account_id = position.account_id;
998        let instrument_id = position.instrument_id;
999
1000        let cached_avg_px = if position.avg_px_open > 0.0 {
1001            Some(Decimal::from_f64_retain(position.avg_px_open).unwrap_or(Decimal::ZERO))
1002        } else {
1003            None
1004        };
1005        let venue_avg_px = venue_report.and_then(|r| r.avg_px_open);
1006
1007        // Check if position crosses zero (flips side)
1008        let crosses_zero = (cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1009            || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO);
1010
1011        if crosses_zero {
1012            // Split into two fills: close existing position, then open new position
1013            return self.reconcile_cross_zero_position(
1014                &instrument,
1015                account_id,
1016                instrument_id,
1017                cached_signed_qty,
1018                cached_avg_px,
1019                venue_signed_qty,
1020                venue_avg_px,
1021                ts_now,
1022            );
1023        }
1024
1025        let qty_diff = venue_signed_qty - cached_signed_qty;
1026        let order_side = if qty_diff > Decimal::ZERO {
1027            OrderSide::Buy
1028        } else {
1029            OrderSide::Sell
1030        };
1031
1032        let reconciliation_px = calculate_reconciliation_price(
1033            cached_signed_qty,
1034            cached_avg_px,
1035            venue_signed_qty,
1036            venue_avg_px,
1037        );
1038
1039        let fill_px = reconciliation_px.or(venue_avg_px).or(cached_avg_px)?;
1040        let fill_qty = qty_diff.abs();
1041
1042        let ts_event = ts_now.as_u64();
1043        let venue_order_id = create_synthetic_venue_order_id(ts_event);
1044        let order_qty = Quantity::from_decimal_dp(fill_qty, instrument.size_precision()).ok()?;
1045
1046        let order_report = OrderStatusReport::new(
1047            account_id,
1048            instrument_id,
1049            None,
1050            venue_order_id,
1051            order_side,
1052            OrderType::Market,
1053            TimeInForce::Gtc,
1054            OrderStatus::Filled,
1055            order_qty,
1056            order_qty,
1057            ts_now,
1058            ts_now,
1059            ts_now,
1060            None,
1061        )
1062        .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1063        .ok()?;
1064
1065        log::info!(
1066            color = LogColor::Blue as u8;
1067            "Generating synthetic fill for position reconciliation {instrument_id}: side={order_side:?}, qty={fill_qty}, px={fill_px}",
1068        );
1069
1070        let events = self.handle_external_order(&order_report, &account_id, &instrument);
1071        Some(events)
1072    }
1073
1074    /// Handles position reconciliation when position flips sign, splitting into two
1075    /// fills: close existing position then open new position in opposite direction.
1076    #[allow(clippy::too_many_arguments)]
1077    fn reconcile_cross_zero_position(
1078        &mut self,
1079        instrument: &InstrumentAny,
1080        account_id: AccountId,
1081        instrument_id: InstrumentId,
1082        cached_signed_qty: Decimal,
1083        cached_avg_px: Option<Decimal>,
1084        venue_signed_qty: Decimal,
1085        venue_avg_px: Option<Decimal>,
1086        ts_now: UnixNanos,
1087    ) -> Option<Vec<OrderEventAny>> {
1088        log::info!(
1089            color = LogColor::Blue as u8;
1090            "Position crosses zero for {instrument_id}: cached={cached_signed_qty}, venue={venue_signed_qty}. Splitting into two fills",
1091        );
1092
1093        let mut all_events = Vec::new();
1094
1095        // Close existing position first
1096        let close_qty = cached_signed_qty.abs();
1097        let close_side = if cached_signed_qty < Decimal::ZERO {
1098            OrderSide::Buy // Close short by buying
1099        } else {
1100            OrderSide::Sell // Close long by selling
1101        };
1102
1103        if let Some(close_px) = cached_avg_px {
1104            let close_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1105            let close_order_qty =
1106                Quantity::from_decimal_dp(close_qty, instrument.size_precision()).ok()?;
1107
1108            let close_report = OrderStatusReport::new(
1109                account_id,
1110                instrument_id,
1111                None,
1112                close_venue_order_id,
1113                close_side,
1114                OrderType::Market,
1115                TimeInForce::Gtc,
1116                OrderStatus::Filled,
1117                close_order_qty,
1118                close_order_qty,
1119                ts_now,
1120                ts_now,
1121                ts_now,
1122                None,
1123            )
1124            .with_avg_px(close_px.to_f64().unwrap_or(0.0))
1125            .ok()?;
1126
1127            log::info!(
1128                color = LogColor::Blue as u8;
1129                "Generating close fill for cross-zero {instrument_id}: side={close_side:?}, qty={close_qty}, px={close_px}",
1130            );
1131
1132            let close_events = self.handle_external_order(&close_report, &account_id, instrument);
1133            all_events.extend(close_events);
1134        } else {
1135            log::warn!("Cannot close position for {instrument_id}: no cached average price");
1136            return None;
1137        }
1138
1139        // Then open new position in opposite direction
1140        let open_qty = venue_signed_qty.abs();
1141        let open_side = if venue_signed_qty > Decimal::ZERO {
1142            OrderSide::Buy // Open long
1143        } else {
1144            OrderSide::Sell // Open short
1145        };
1146
1147        if let Some(open_px) = venue_avg_px {
1148            let open_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64() + 1);
1149            let open_order_qty =
1150                Quantity::from_decimal_dp(open_qty, instrument.size_precision()).ok()?;
1151
1152            let open_report = OrderStatusReport::new(
1153                account_id,
1154                instrument_id,
1155                None,
1156                open_venue_order_id,
1157                open_side,
1158                OrderType::Market,
1159                TimeInForce::Gtc,
1160                OrderStatus::Filled,
1161                open_order_qty,
1162                open_order_qty,
1163                ts_now,
1164                ts_now,
1165                ts_now,
1166                None,
1167            )
1168            .with_avg_px(open_px.to_f64().unwrap_or(0.0))
1169            .ok()?;
1170
1171            log::info!(
1172                color = LogColor::Blue as u8;
1173                "Generating open fill for cross-zero {instrument_id}: side={open_side:?}, qty={open_qty}, px={open_px}",
1174            );
1175
1176            let open_events = self.handle_external_order(&open_report, &account_id, instrument);
1177            all_events.extend(open_events);
1178        } else {
1179            log::warn!("Cannot open new position for {instrument_id}: no venue average price");
1180            return Some(all_events);
1181        }
1182
1183        Some(all_events)
1184    }
1185
1186    fn reconcile_order_report(
1187        &self,
1188        order: &OrderAny,
1189        report: &OrderStatusReport,
1190        instrument: Option<&InstrumentAny>,
1191    ) -> Option<OrderEventAny> {
1192        let ts_now = self.clock.borrow().timestamp_ns();
1193        reconcile_order_report(order, report, instrument, ts_now)
1194    }
1195
1196    fn handle_external_order(
1197        &mut self,
1198        report: &OrderStatusReport,
1199        account_id: &AccountId,
1200        instrument: &InstrumentAny,
1201    ) -> Vec<OrderEventAny> {
1202        let (strategy_id, tags) =
1203            if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
1204                let order_id = report
1205                    .client_order_id
1206                    .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1207                log::info!(
1208                    color = LogColor::Blue as u8;
1209                    "External order {} for {} claimed by strategy {}",
1210                    order_id,
1211                    report.instrument_id,
1212                    claimed_strategy,
1213                );
1214                (*claimed_strategy, None)
1215            } else {
1216                // Unclaimed external orders use EXTERNAL strategy ID with VENUE tag
1217                (
1218                    StrategyId::from("EXTERNAL"),
1219                    Some(vec![Ustr::from("VENUE")]),
1220                )
1221            };
1222
1223        let client_order_id = report
1224            .client_order_id
1225            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1226
1227        let ts_now = self.clock.borrow().timestamp_ns();
1228
1229        let initialized = OrderInitialized::new(
1230            self.config.trader_id,
1231            strategy_id,
1232            report.instrument_id,
1233            client_order_id,
1234            report.order_side,
1235            report.order_type,
1236            report.quantity,
1237            report.time_in_force,
1238            report.post_only,
1239            report.reduce_only,
1240            false, // quote_quantity
1241            true,  // reconciliation
1242            UUID4::new(),
1243            ts_now,
1244            ts_now,
1245            report.price,
1246            report.trigger_price,
1247            report.trigger_type,
1248            report.limit_offset,
1249            report.trailing_offset,
1250            Some(report.trailing_offset_type),
1251            report.expire_time,
1252            report.display_qty,
1253            None, // emulation_trigger
1254            None, // trigger_instrument_id
1255            Some(report.contingency_type),
1256            report.order_list_id,
1257            report.linked_order_ids.clone(),
1258            report.parent_order_id,
1259            None, // exec_algorithm_id
1260            None, // exec_algorithm_params
1261            None, // exec_spawn_id
1262            tags,
1263        );
1264
1265        let events = vec![OrderEventAny::Initialized(initialized)];
1266        let order = match OrderAny::from_events(events) {
1267            Ok(order) => order,
1268            Err(e) => {
1269                log::error!("Failed to create order from report: {e}");
1270                return Vec::new();
1271            }
1272        };
1273
1274        {
1275            let mut cache = self.cache.borrow_mut();
1276            if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1277                log::error!("Failed to add external order to cache: {e}");
1278                return Vec::new();
1279            }
1280
1281            if let Err(e) =
1282                cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
1283            {
1284                log::warn!("Failed to add venue order ID index: {e}");
1285            }
1286        }
1287
1288        log::info!(
1289            color = LogColor::Blue as u8;
1290            "Created external order {} ({}) for {} [{}]",
1291            client_order_id,
1292            report.venue_order_id,
1293            report.instrument_id,
1294            report.order_status,
1295        );
1296
1297        let ts_now = self.clock.borrow().timestamp_ns();
1298        generate_external_order_status_events(&order, report, account_id, instrument, ts_now)
1299    }
1300
1301    /// Deduplicates order reports, keeping the most advanced state per venue_order_id.
1302    ///
1303    /// When a batch contains multiple reports for the same order, we keep the one with
1304    /// the highest filled_qty (most progress), or if equal, the most terminal status.
1305    fn deduplicate_order_reports<'a>(
1306        &self,
1307        reports: impl Iterator<Item = &'a OrderStatusReport>,
1308    ) -> AHashMap<VenueOrderId, &'a OrderStatusReport> {
1309        let mut best_reports: AHashMap<VenueOrderId, &'a OrderStatusReport> = AHashMap::new();
1310
1311        for report in reports {
1312            let dominated = best_reports
1313                .get(&report.venue_order_id)
1314                .is_some_and(|existing| self.is_more_advanced(existing, report));
1315
1316            if !dominated {
1317                best_reports.insert(report.venue_order_id, report);
1318            }
1319        }
1320
1321        best_reports
1322    }
1323
1324    /// Returns true if `a` is more advanced than `b` (higher filled_qty or more terminal).
1325    fn is_more_advanced(&self, a: &OrderStatusReport, b: &OrderStatusReport) -> bool {
1326        if a.filled_qty > b.filled_qty {
1327            return true;
1328        }
1329        if a.filled_qty < b.filled_qty {
1330            return false;
1331        }
1332
1333        // Equal filled_qty - compare status (terminal states are more advanced)
1334        Self::status_priority(a.order_status) > Self::status_priority(b.order_status)
1335    }
1336
1337    /// Returns priority for order status (higher = more terminal/advanced).
1338    const fn status_priority(status: OrderStatus) -> u8 {
1339        match status {
1340            OrderStatus::Initialized | OrderStatus::Submitted | OrderStatus::Emulated => 0,
1341            OrderStatus::Released | OrderStatus::Denied => 1,
1342            OrderStatus::Accepted | OrderStatus::PendingUpdate | OrderStatus::PendingCancel => 2,
1343            OrderStatus::Triggered => 3,
1344            OrderStatus::PartiallyFilled => 4,
1345            OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => 5,
1346            OrderStatus::Filled => 6,
1347        }
1348    }
1349
1350    /// Checks if a cached order exactly matches a venue report (already in sync).
1351    fn is_exact_order_match(&self, order: &OrderAny, report: &OrderStatusReport) -> bool {
1352        order.status() == report.order_status
1353            && order.filled_qty() == report.filled_qty
1354            && !should_reconciliation_update(order, report)
1355    }
1356
1357    fn create_order_fill(
1358        &mut self,
1359        order: &mut OrderAny,
1360        fill: &FillReport,
1361        instrument: &InstrumentAny,
1362    ) -> Option<OrderEventAny> {
1363        if self.processed_fills.contains_key(&fill.trade_id) {
1364            return None;
1365        }
1366
1367        self.processed_fills
1368            .insert(fill.trade_id, order.client_order_id());
1369
1370        Some(OrderEventAny::Filled(OrderFilled::new(
1371            order.trader_id(),
1372            order.strategy_id(),
1373            order.instrument_id(),
1374            order.client_order_id(),
1375            fill.venue_order_id,
1376            fill.account_id,
1377            fill.trade_id,
1378            fill.order_side,
1379            order.order_type(),
1380            fill.last_qty,
1381            fill.last_px,
1382            instrument.quote_currency(),
1383            fill.liquidity_side,
1384            fill.report_id,
1385            fill.ts_event,
1386            self.clock.borrow().timestamp_ns(),
1387            false,
1388            fill.venue_position_id,
1389            Some(fill.commission),
1390        )))
1391    }
1392}