1use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr, sync::LazyLock};
22
23use ahash::{AHashMap, AHashSet};
24use indexmap::IndexMap;
25use nautilus_common::{
26 cache::Cache,
27 clients::ExecutionClient,
28 clock::Clock,
29 enums::{LogColor, LogLevel},
30 log_info,
31 messages::execution::report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
32};
33use nautilus_core::{
34 UUID4, UnixNanos,
35 datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, nanos_to_millis},
36};
37use nautilus_execution::{
38 engine::ExecutionEngine,
39 reconciliation::{
40 calculate_reconciliation_price, create_inferred_fill_for_qty,
41 create_reconciliation_rejected, create_reconciliation_triggered,
42 create_synthetic_venue_order_id, generate_external_order_status_events,
43 process_mass_status_for_reconciliation, reconcile_order_report,
44 should_reconciliation_update,
45 },
46};
47use nautilus_model::{
48 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
49 events::{OrderEventAny, OrderFilled, OrderInitialized},
50 identifiers::{
51 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
52 VenueOrderId,
53 },
54 instruments::{Instrument, InstrumentAny},
55 orders::{Order, OrderAny},
56 position::Position,
57 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
58 types::Quantity,
59};
60use rust_decimal::{Decimal, prelude::ToPrimitive};
61use ustr::Ustr;
62
63use crate::config::LiveExecEngineConfig;
64
65static TAG_VENUE: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("VENUE"));
67
68static TAG_RECONCILIATION: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("RECONCILIATION"));
70
71#[derive(Debug, Clone)]
73pub struct ExternalOrderMetadata {
74 pub client_order_id: ClientOrderId,
75 pub venue_order_id: VenueOrderId,
76 pub instrument_id: InstrumentId,
77 pub strategy_id: StrategyId,
78 pub ts_init: UnixNanos,
79}
80
81#[derive(Debug, Default)]
83pub struct ReconciliationResult {
84 pub events: Vec<OrderEventAny>,
86 pub external_orders: Vec<ExternalOrderMetadata>,
88}
89
90#[derive(Debug, Clone)]
92pub struct ExecutionManagerConfig {
93 pub trader_id: TraderId,
95 pub reconciliation: bool,
97 pub reconciliation_startup_delay_secs: f64,
99 pub lookback_mins: Option<u64>,
101 pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
103 pub filter_unclaimed_external: bool,
105 pub filter_position_reports: bool,
107 pub filtered_client_order_ids: AHashSet<ClientOrderId>,
109 pub generate_missing_orders: bool,
111 pub inflight_check_interval_ms: u32,
113 pub inflight_threshold_ms: u64,
115 pub inflight_max_retries: u32,
117 pub open_check_interval_secs: Option<f64>,
119 pub open_check_lookback_mins: Option<u64>,
121 pub open_check_threshold_ns: u64,
123 pub open_check_missing_retries: u32,
125 pub open_check_open_only: bool,
127 pub max_single_order_queries_per_cycle: u32,
129 pub single_order_query_delay_ms: u32,
131 pub position_check_interval_secs: Option<f64>,
133 pub position_check_lookback_mins: u64,
135 pub position_check_threshold_ns: u64,
137 pub purge_closed_orders_buffer_mins: Option<u32>,
139 pub purge_closed_positions_buffer_mins: Option<u32>,
141 pub purge_account_events_lookback_mins: Option<u32>,
143 pub purge_from_database: bool,
145}
146
147impl Default for ExecutionManagerConfig {
148 fn default() -> Self {
149 Self {
150 trader_id: TraderId::default(),
151 reconciliation: true,
152 reconciliation_startup_delay_secs: 10.0,
153 lookback_mins: Some(60),
154 reconciliation_instrument_ids: AHashSet::new(),
155 filter_unclaimed_external: false,
156 filter_position_reports: false,
157 filtered_client_order_ids: AHashSet::new(),
158 generate_missing_orders: true,
159 inflight_check_interval_ms: 2_000,
160 inflight_threshold_ms: 5_000,
161 inflight_max_retries: 5,
162 open_check_interval_secs: None,
163 open_check_lookback_mins: Some(60),
164 open_check_threshold_ns: 5_000_000_000,
165 open_check_missing_retries: 5,
166 open_check_open_only: true,
167 max_single_order_queries_per_cycle: 5,
168 single_order_query_delay_ms: 100,
169 position_check_interval_secs: None,
170 position_check_lookback_mins: 60,
171 position_check_threshold_ns: 60_000_000_000,
172 purge_closed_orders_buffer_mins: None,
173 purge_closed_positions_buffer_mins: None,
174 purge_account_events_lookback_mins: None,
175 purge_from_database: false,
176 }
177 }
178}
179
180impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
181 fn from(config: &LiveExecEngineConfig) -> Self {
182 let filtered_client_order_ids: AHashSet<ClientOrderId> = config
183 .filtered_client_order_ids
184 .clone()
185 .unwrap_or_default()
186 .into_iter()
187 .map(|value| ClientOrderId::from(value.as_str()))
188 .collect();
189
190 let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
191 .reconciliation_instrument_ids
192 .clone()
193 .unwrap_or_default()
194 .into_iter()
195 .map(InstrumentId::from)
196 .collect();
197
198 let open_check_threshold_ns =
199 (config.open_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
200 let position_check_threshold_ns =
201 (config.position_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
202
203 Self {
204 trader_id: TraderId::default(), reconciliation: config.reconciliation,
206 reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
207 lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
208 reconciliation_instrument_ids,
209 filter_unclaimed_external: config.filter_unclaimed_external_orders,
210 filter_position_reports: config.filter_position_reports,
211 filtered_client_order_ids,
212 generate_missing_orders: config.generate_missing_orders,
213 inflight_check_interval_ms: config.inflight_check_interval_ms,
214 inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
215 inflight_max_retries: config.inflight_check_retries,
216 open_check_interval_secs: config.open_check_interval_secs,
217 open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
218 open_check_threshold_ns,
219 open_check_missing_retries: config.open_check_missing_retries,
220 open_check_open_only: config.open_check_open_only,
221 max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
222 single_order_query_delay_ms: config.single_order_query_delay_ms,
223 position_check_interval_secs: config.position_check_interval_secs,
224 position_check_lookback_mins: config.position_check_lookback_mins as u64,
225 position_check_threshold_ns,
226 purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
227 purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
228 purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
229 purge_from_database: config.purge_from_database,
230 }
231 }
232}
233
234impl ExecutionManagerConfig {
235 #[must_use]
237 pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
238 self.trader_id = trader_id;
239 self
240 }
241}
242
243#[derive(Debug, Clone)]
246pub struct ExecutionReport {
247 pub client_order_id: ClientOrderId,
248 pub venue_order_id: Option<VenueOrderId>,
249 pub status: OrderStatus,
250 pub filled_qty: Quantity,
251 pub avg_px: Option<f64>,
252 pub ts_event: UnixNanos,
253}
254
255#[derive(Debug, Clone)]
257struct InflightCheck {
258 #[allow(dead_code)]
259 pub client_order_id: ClientOrderId,
260 pub ts_submitted: UnixNanos,
261 pub retry_count: u32,
262 pub last_query_ts: Option<UnixNanos>,
263}
264
265#[derive(Clone)]
288pub struct ExecutionManager {
289 clock: Rc<RefCell<dyn Clock>>,
290 cache: Rc<RefCell<Cache>>,
291 config: ExecutionManagerConfig,
292 inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
293 external_order_claims: AHashMap<InstrumentId, StrategyId>,
294 processed_fills: AHashMap<TradeId, ClientOrderId>,
295 recon_check_retries: AHashMap<ClientOrderId, u32>,
296 ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
297 order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
298 position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
299 recent_fills_cache: AHashMap<TradeId, UnixNanos>,
300}
301
302impl Debug for ExecutionManager {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 f.debug_struct(stringify!(ExecutionManager))
305 .field("config", &self.config)
306 .field("inflight_checks", &self.inflight_checks)
307 .field("external_order_claims", &self.external_order_claims)
308 .field("processed_fills", &self.processed_fills)
309 .field("recon_check_retries", &self.recon_check_retries)
310 .finish()
311 }
312}
313
314impl ExecutionManager {
315 pub fn new(
317 clock: Rc<RefCell<dyn Clock>>,
318 cache: Rc<RefCell<Cache>>,
319 config: ExecutionManagerConfig,
320 ) -> Self {
321 Self {
322 clock,
323 cache,
324 config,
325 inflight_checks: AHashMap::new(),
326 external_order_claims: AHashMap::new(),
327 processed_fills: AHashMap::new(),
328 recon_check_retries: AHashMap::new(),
329 ts_last_query: AHashMap::new(),
330 order_local_activity_ns: AHashMap::new(),
331 position_local_activity_ns: AHashMap::new(),
332 recent_fills_cache: AHashMap::new(),
333 }
334 }
335
336 pub async fn reconcile_execution_mass_status(
342 &mut self,
343 mass_status: ExecutionMassStatus,
344 exec_engine: Rc<RefCell<ExecutionEngine>>,
345 ) -> ReconciliationResult {
346 let venue = mass_status.venue;
347 let order_count = mass_status.order_reports().len();
348 let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
349 let position_count = mass_status.position_reports().len();
350
351 log_info!(
352 "Reconciling ExecutionMassStatus for {venue}",
353 color = LogColor::Blue
354 );
355 log_info!(
356 "Received {order_count} order(s), {fill_count} fill(s), {position_count} position(s)",
357 color = LogColor::Blue
358 );
359
360 let (adjusted_order_reports, adjusted_fill_reports) =
361 self.adjust_mass_status_fills(&mass_status);
362
363 let mut events = Vec::new();
364 let mut external_orders = Vec::new();
365 let mut orders_reconciled = 0usize;
366 let mut external_orders_created = 0usize;
367 let mut open_orders_initialized = 0usize;
368 let mut orders_skipped_no_instrument = 0usize;
369 let mut orders_skipped_duplicate = 0usize;
370 let mut fills_applied = 0usize;
371
372 let fill_reports = &adjusted_fill_reports;
373 let mut seen_trade_ids: AHashSet<TradeId> = AHashSet::new();
374
375 for fills in fill_reports.values() {
376 for fill in fills {
377 if !seen_trade_ids.insert(fill.trade_id) {
378 log::warn!("Duplicate trade_id {} in mass status", fill.trade_id);
379 }
380 }
381 }
382
383 let order_reports = self.deduplicate_order_reports(adjusted_order_reports.values());
385 let mut orders_skipped_filtered = 0usize;
386
387 for report in order_reports.values() {
388 if self.should_skip_order_report(report) {
389 orders_skipped_filtered += 1;
390 continue;
391 }
392
393 if let Some(client_order_id) = &report.client_order_id {
394 if let Some(cached_order) = self.get_order(client_order_id)
395 && self.is_exact_order_match(&cached_order, report)
396 {
397 log::debug!("Skipping order {client_order_id}: already in sync with venue");
398 orders_skipped_duplicate += 1;
399
400 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
402 client_order_id,
403 &report.venue_order_id,
404 false,
405 ) {
406 log::warn!("Failed to add venue order ID index: {e}");
407 }
408
409 continue;
410 }
411
412 if let Some(cached_order) = self.get_order(client_order_id)
414 && cached_order.is_closed()
415 && cached_order
416 .tags()
417 .is_some_and(|tags| tags.contains(&*TAG_RECONCILIATION))
418 {
419 log::debug!(
420 "Skipping closed reconciliation order {client_order_id}: \
421 synthetic position adjustment from previous session",
422 );
423 orders_skipped_duplicate += 1;
424 continue;
425 }
426
427 if let Some(mut order) = self.get_order(client_order_id) {
428 let instrument = self.get_instrument(&report.instrument_id);
429 log::info!(
430 color = LogColor::Blue as u8;
431 "Reconciling {} {} {} [{}] -> [{}]",
432 client_order_id,
433 report.venue_order_id,
434 report.instrument_id,
435 order.status(),
436 report.order_status,
437 );
438
439 let order_fills: Vec<&FillReport> = fill_reports
440 .get(&report.venue_order_id)
441 .map(|f| f.iter().collect())
442 .unwrap_or_default();
443 let order_events = self.reconcile_order_with_fills(
444 &mut order,
445 report,
446 &order_fills,
447 instrument.as_ref(),
448 );
449 if !order_events.is_empty() {
450 orders_reconciled += 1;
451 fills_applied += order_events
452 .iter()
453 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
454 .count();
455 events.extend(order_events);
456 }
457
458 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
460 client_order_id,
461 &report.venue_order_id,
462 false,
463 ) {
464 log::warn!("Failed to add venue order ID index: {e}");
465 }
466 } else if let Some(mut order) =
467 self.get_order_by_venue_order_id(&report.venue_order_id)
468 {
469 let instrument = self.get_instrument(&report.instrument_id);
471
472 log::info!(
473 color = LogColor::Blue as u8;
474 "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
475 order.client_order_id(),
476 report.venue_order_id,
477 report.instrument_id,
478 order.status(),
479 report.order_status,
480 );
481
482 let order_fills: Vec<&FillReport> = fill_reports
483 .get(&report.venue_order_id)
484 .map(|f| f.iter().collect())
485 .unwrap_or_default();
486 let order_events = self.reconcile_order_with_fills(
487 &mut order,
488 report,
489 &order_fills,
490 instrument.as_ref(),
491 );
492
493 if !order_events.is_empty() {
494 orders_reconciled += 1;
495 fills_applied += order_events
496 .iter()
497 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
498 .count();
499 events.extend(order_events);
500 }
501
502 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
503 &order.client_order_id(),
504 &report.venue_order_id,
505 false,
506 ) {
507 log::warn!("Failed to add venue order ID index: {e}");
508 }
509 } else if !self.config.filter_unclaimed_external {
510 if let Some(instrument) = self.get_instrument(&report.instrument_id) {
511 let order_fills: Vec<&FillReport> = fill_reports
512 .get(&report.venue_order_id)
513 .map(|f| f.iter().collect())
514 .unwrap_or_default();
515 let (external_events, metadata) = self.handle_external_order(
516 report,
517 &mass_status.account_id,
518 &instrument,
519 &order_fills,
520 false, );
522
523 if !external_events.is_empty() {
524 external_orders_created += 1;
525 fills_applied += external_events
526 .iter()
527 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
528 .count();
529
530 if report.order_status.is_open() {
531 open_orders_initialized += 1;
532 }
533
534 events.extend(external_events);
535
536 if let Some(m) = metadata {
537 external_orders.push(m);
538 }
539 }
540 } else {
541 orders_skipped_no_instrument += 1;
542 }
543 }
544 } else if let Some(mut order) = self.get_order_by_venue_order_id(&report.venue_order_id)
545 {
546 let instrument = self.get_instrument(&report.instrument_id);
548 log::info!(
549 color = LogColor::Blue as u8;
550 "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
551 order.client_order_id(),
552 report.venue_order_id,
553 report.instrument_id,
554 order.status(),
555 report.order_status,
556 );
557
558 let order_fills: Vec<&FillReport> = fill_reports
559 .get(&report.venue_order_id)
560 .map(|f| f.iter().collect())
561 .unwrap_or_default();
562 let order_events = self.reconcile_order_with_fills(
563 &mut order,
564 report,
565 &order_fills,
566 instrument.as_ref(),
567 );
568
569 if !order_events.is_empty() {
570 orders_reconciled += 1;
571 fills_applied += order_events
572 .iter()
573 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
574 .count();
575 events.extend(order_events);
576 }
577
578 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
579 &order.client_order_id(),
580 &report.venue_order_id,
581 false,
582 ) {
583 log::warn!("Failed to add venue order ID index: {e}");
584 }
585 } else if let Some(instrument) = self.get_instrument(&report.instrument_id) {
586 let is_synthetic = report.venue_order_id.as_str().starts_with("S-");
588
589 let order_fills: Vec<&FillReport> = fill_reports
590 .get(&report.venue_order_id)
591 .map(|f| f.iter().collect())
592 .unwrap_or_default();
593 let (external_events, metadata) = self.handle_external_order(
594 report,
595 &mass_status.account_id,
596 &instrument,
597 &order_fills,
598 is_synthetic,
599 );
600
601 if !external_events.is_empty() {
602 external_orders_created += 1;
603 fills_applied += external_events
604 .iter()
605 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
606 .count();
607
608 if report.order_status.is_open() {
609 open_orders_initialized += 1;
610 }
611
612 events.extend(external_events);
613
614 if let Some(m) = metadata {
615 external_orders.push(m);
616 }
617 }
618 } else {
619 orders_skipped_no_instrument += 1;
620 }
621 }
622
623 let processed_venue_order_ids: AHashSet<VenueOrderId> =
625 order_reports.keys().copied().collect();
626
627 for (venue_order_id, fills) in fill_reports {
628 if processed_venue_order_ids.contains(venue_order_id) {
629 continue;
630 }
631
632 let Some(first_fill) = fills.first() else {
633 continue;
634 };
635
636 if !self.should_reconcile_instrument(&first_fill.instrument_id) {
637 log::debug!(
638 "Skipping orphan fills for {}: not in reconciliation_instrument_ids",
639 first_fill.instrument_id
640 );
641 continue;
642 }
643
644 if let Some(client_order_id) = &first_fill.client_order_id
646 && self
647 .config
648 .filtered_client_order_ids
649 .contains(client_order_id)
650 {
651 log::debug!(
652 "Skipping orphan fills for {client_order_id}: in filtered_client_order_ids"
653 );
654 continue;
655 }
656
657 let order = first_fill
658 .client_order_id
659 .as_ref()
660 .and_then(|id| self.get_order(id))
661 .or_else(|| self.get_order_by_venue_order_id(venue_order_id));
662
663 if let Some(ref order) = order
665 && self
666 .config
667 .filtered_client_order_ids
668 .contains(&order.client_order_id())
669 {
670 log::debug!(
671 "Skipping orphan fills for {}: in filtered_client_order_ids",
672 order.client_order_id()
673 );
674 continue;
675 }
676
677 if let Some(mut order) = order {
678 let instrument_id = order.instrument_id();
679 if let Some(instrument) = self.get_instrument(&instrument_id) {
680 let mut sorted_fills: Vec<&FillReport> = fills.iter().collect();
681 sorted_fills.sort_by_key(|f| f.ts_event);
682
683 for fill in sorted_fills {
684 if let Some(event) = self.create_order_fill(&mut order, fill, &instrument) {
685 fills_applied += 1;
686 events.push(event);
687 }
688 }
689 }
690 }
691 }
692
693 events.sort_by_key(|e| e.ts_event());
694
695 for event in &events {
696 exec_engine.borrow_mut().process(event.clone());
697 }
698
699 let mut positions_created = 0usize;
700 if !self.config.filter_position_reports {
701 let instruments_with_unattributed_fills: AHashSet<InstrumentId> = mass_status
704 .fill_reports()
705 .values()
706 .flatten()
707 .filter(|f| f.venue_position_id.is_none())
708 .map(|f| f.instrument_id)
709 .chain(
710 mass_status
711 .order_reports()
712 .values()
713 .filter(|r| !r.filled_qty.is_zero() && r.venue_position_id.is_none())
714 .map(|r| r.instrument_id),
715 )
716 .collect();
717
718 let positions_with_fills: AHashSet<PositionId> = mass_status
719 .fill_reports()
720 .values()
721 .flatten()
722 .filter_map(|f| f.venue_position_id)
723 .chain(
724 mass_status
725 .order_reports()
726 .values()
727 .filter(|r| !r.filled_qty.is_zero())
728 .filter_map(|r| r.venue_position_id),
729 )
730 .collect();
731
732 for (instrument_id, reports) in mass_status.position_reports() {
733 if !self.should_reconcile_instrument(&instrument_id) {
734 log::debug!(
735 "Skipping position reports for {instrument_id}: not in reconciliation_instrument_ids"
736 );
737 continue;
738 }
739
740 for report in reports {
741 if let Some(position_events) = self.reconcile_position_report(
742 &report,
743 &mass_status.account_id,
744 &instruments_with_unattributed_fills,
745 &positions_with_fills,
746 ) {
747 for event in position_events {
748 exec_engine.borrow_mut().process(event.clone());
749 events.push(event);
750 }
751 positions_created += 1;
752 }
753 }
754 }
755 }
756
757 if orders_skipped_no_instrument > 0 {
758 log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
759 }
760
761 if orders_skipped_duplicate > 0 {
762 log::debug!("{orders_skipped_duplicate} orders skipped (already in sync)");
763 }
764
765 if orders_skipped_filtered > 0 {
766 log::debug!("{orders_skipped_filtered} orders skipped (filtered by config)");
767 }
768
769 log::info!(
770 color = LogColor::Blue as u8;
771 "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}, positions={positions_created}, skipped={orders_skipped_duplicate}, filtered={orders_skipped_filtered}",
772 );
773
774 ReconciliationResult {
775 events,
776 external_orders,
777 }
778 }
779
780 pub fn reconcile_report(
786 &mut self,
787 report: ExecutionReport,
788 ) -> anyhow::Result<Vec<OrderEventAny>> {
789 let mut events = Vec::new();
790
791 self.clear_recon_tracking(&report.client_order_id, true);
792
793 if let Some(order) = self.get_order(&report.client_order_id) {
794 let Some(account_id) = order.account_id() else {
795 log::error!("Cannot process fill report: order has no account_id");
796 return Ok(vec![]);
797 };
798 let Some(venue_order_id) = report.venue_order_id else {
799 log::error!("Cannot process fill report: report has no venue_order_id");
800 return Ok(vec![]);
801 };
802 let mut order_report = OrderStatusReport::new(
803 account_id,
804 order.instrument_id(),
805 Some(report.client_order_id),
806 venue_order_id,
807 order.order_side(),
808 order.order_type(),
809 order.time_in_force(),
810 report.status,
811 order.quantity(),
812 report.filled_qty,
813 report.ts_event, report.ts_event, self.clock.borrow().timestamp_ns(),
816 Some(UUID4::new()),
817 );
818
819 if let Some(avg_px) = report.avg_px {
820 order_report = order_report.with_avg_px(avg_px)?;
821 }
822
823 let instrument = self.get_instrument(&order.instrument_id());
824 if let Some(event) =
825 self.reconcile_order_report(&order, &order_report, instrument.as_ref())
826 {
827 events.push(event);
828 }
829 }
830
831 Ok(events)
832 }
833
834 pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
836 let mut events = Vec::new();
837 let current_time = self.clock.borrow().timestamp_ns();
838 let threshold_ns = self.config.inflight_threshold_ms * NANOSECONDS_IN_MILLISECOND;
839
840 let mut to_check = Vec::new();
841
842 for (client_order_id, check) in &self.inflight_checks {
843 if current_time - check.ts_submitted > threshold_ns {
844 to_check.push(*client_order_id);
845 }
846 }
847
848 for client_order_id in to_check {
849 if self
850 .config
851 .filtered_client_order_ids
852 .contains(&client_order_id)
853 {
854 continue;
855 }
856
857 if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
858 if let Some(last_query_ts) = check.last_query_ts
859 && current_time - last_query_ts < threshold_ns
860 {
861 continue;
862 }
863
864 check.retry_count += 1;
865 check.last_query_ts = Some(current_time);
866 self.ts_last_query.insert(client_order_id, current_time);
867 self.recon_check_retries
868 .insert(client_order_id, check.retry_count);
869
870 if check.retry_count >= self.config.inflight_max_retries {
871 let ts_now = self.clock.borrow().timestamp_ns();
873 if let Some(order) = self.get_order(&client_order_id)
874 && let Some(event) =
875 create_reconciliation_rejected(&order, Some("INFLIGHT_TIMEOUT"), ts_now)
876 {
877 events.push(event);
878 }
879 self.clear_recon_tracking(&client_order_id, true);
881 }
882 }
883 }
884
885 events
886 }
887
888 pub async fn check_open_orders(
898 &mut self,
899 clients: &[Rc<dyn ExecutionClient>],
900 ) -> Vec<OrderEventAny> {
901 log::debug!("Checking order consistency between cached-state and venues");
902
903 let filtered_orders: Vec<OrderAny> = {
904 let cache = self.cache.borrow();
905 let open_orders = cache.orders_open(None, None, None, None, None);
906
907 if self.config.reconciliation_instrument_ids.is_empty() {
908 open_orders.iter().map(|o| (*o).clone()).collect()
909 } else {
910 open_orders
911 .iter()
912 .filter(|o| {
913 self.config
914 .reconciliation_instrument_ids
915 .contains(&o.instrument_id())
916 })
917 .map(|o| (*o).clone())
918 .collect()
919 }
920 };
921
922 log::debug!(
923 "Found {} order{} open in cache",
924 filtered_orders.len(),
925 if filtered_orders.len() == 1 { "" } else { "s" }
926 );
927
928 let mut all_reports = Vec::new();
929 let mut venue_reported_ids = AHashSet::new();
930
931 for client in clients {
932 let mut cmd = GenerateOrderStatusReports::new(
933 UUID4::new(),
934 self.clock.borrow().timestamp_ns(),
935 true, None, None, None, None, None, );
942 cmd.log_receipt_level = LogLevel::Debug;
943
944 match client.generate_order_status_reports(&cmd).await {
945 Ok(reports) => {
946 for report in reports {
947 if let Some(client_order_id) = &report.client_order_id {
948 venue_reported_ids.insert(*client_order_id);
949 }
950 all_reports.push(report);
951 }
952 }
953 Err(e) => {
954 log::error!(
955 "Failed to query order reports from {}: {e}",
956 client.client_id()
957 );
958 }
959 }
960 }
961
962 let ts_now = self.clock.borrow().timestamp_ns();
964 let mut events = Vec::new();
965
966 for report in all_reports {
967 if let Some(client_order_id) = &report.client_order_id
968 && let Some(order) = self.get_order(client_order_id)
969 {
970 if let Some(&last_activity) = self.order_local_activity_ns.get(client_order_id)
972 && (ts_now - last_activity) < self.config.open_check_threshold_ns
973 {
974 let elapsed_ms = nanos_to_millis((ts_now - last_activity).as_u64());
975 let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
976 log::info!(
977 "Deferring reconciliation for {client_order_id}: recent local activity ({elapsed_ms}ms < threshold={threshold_ms}ms)",
978 );
979 continue;
980 }
981
982 let instrument = self.get_instrument(&report.instrument_id);
983
984 if let Some(event) =
985 self.reconcile_order_report(&order, &report, instrument.as_ref())
986 {
987 events.push(event);
988 }
989 }
990 }
991
992 if !self.config.open_check_open_only {
994 let cached_ids: AHashSet<ClientOrderId> = filtered_orders
995 .iter()
996 .map(|o| o.client_order_id())
997 .collect();
998 let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
999 .difference(&venue_reported_ids)
1000 .copied()
1001 .collect();
1002
1003 for client_order_id in missing_at_venue {
1004 events.extend(self.handle_missing_order(client_order_id));
1005 }
1006 }
1007
1008 events
1009 }
1010
1011 pub async fn check_positions_consistency(
1021 &mut self,
1022 clients: &[Rc<dyn ExecutionClient>],
1023 ) -> Vec<OrderEventAny> {
1024 log::debug!("Checking position consistency between cached-state and venues");
1025
1026 let open_positions = {
1027 let cache = self.cache.borrow();
1028 let positions = cache.positions_open(None, None, None, None, None);
1029
1030 if self.config.reconciliation_instrument_ids.is_empty() {
1031 positions.iter().map(|p| (*p).clone()).collect()
1032 } else {
1033 positions
1034 .iter()
1035 .filter(|p| {
1036 self.config
1037 .reconciliation_instrument_ids
1038 .contains(&p.instrument_id)
1039 })
1040 .map(|p| (*p).clone())
1041 .collect::<Vec<_>>()
1042 }
1043 };
1044
1045 log::debug!(
1046 "Found {} position{} to check",
1047 open_positions.len(),
1048 if open_positions.len() == 1 { "" } else { "s" }
1049 );
1050
1051 let mut venue_positions = AHashMap::new();
1053
1054 for client in clients {
1055 let mut cmd = GeneratePositionStatusReports::new(
1056 UUID4::new(),
1057 self.clock.borrow().timestamp_ns(),
1058 None, None, None, None, None, );
1064 cmd.log_receipt_level = LogLevel::Debug;
1065
1066 match client.generate_position_status_reports(&cmd).await {
1067 Ok(reports) => {
1068 for report in reports {
1069 venue_positions.insert(report.instrument_id, report);
1070 }
1071 }
1072 Err(e) => {
1073 log::error!(
1074 "Failed to query position reports from {}: {e}",
1075 client.client_id()
1076 );
1077 }
1078 }
1079 }
1080
1081 let mut events = Vec::new();
1083
1084 for position in &open_positions {
1085 if !self.config.reconciliation_instrument_ids.is_empty()
1087 && !self
1088 .config
1089 .reconciliation_instrument_ids
1090 .contains(&position.instrument_id)
1091 {
1092 continue;
1093 }
1094
1095 let venue_report = venue_positions.get(&position.instrument_id);
1096
1097 if let Some(discrepancy_events) =
1098 self.check_position_discrepancy(position, venue_report)
1099 {
1100 events.extend(discrepancy_events);
1101 }
1102 }
1103
1104 events
1105 }
1106
1107 pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
1109 let ts_submitted = self.clock.borrow().timestamp_ns();
1110 self.inflight_checks.insert(
1111 client_order_id,
1112 InflightCheck {
1113 client_order_id,
1114 ts_submitted,
1115 retry_count: 0,
1116 last_query_ts: None,
1117 },
1118 );
1119 self.recon_check_retries.insert(client_order_id, 0);
1120 self.ts_last_query.remove(&client_order_id);
1121 self.order_local_activity_ns.remove(&client_order_id);
1122 }
1123
1124 pub fn record_local_activity(&mut self, client_order_id: ClientOrderId) {
1130 let ts_now = self.clock.borrow().timestamp_ns();
1131 self.order_local_activity_ns.insert(client_order_id, ts_now);
1132 }
1133
1134 pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
1136 self.inflight_checks.remove(client_order_id);
1137 self.recon_check_retries.remove(client_order_id);
1138 if drop_last_query {
1139 self.ts_last_query.remove(client_order_id);
1140 }
1141 self.order_local_activity_ns.remove(client_order_id);
1142 }
1143
1144 pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
1146 self.external_order_claims
1147 .insert(instrument_id, strategy_id);
1148 }
1149
1150 pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
1152 self.position_local_activity_ns
1153 .insert(instrument_id, ts_event);
1154 }
1155
1156 pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
1158 self.recent_fills_cache.contains_key(trade_id)
1159 }
1160
1161 pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
1163 let ts_now = self.clock.borrow().timestamp_ns();
1164 self.recent_fills_cache.insert(trade_id, ts_now);
1165 }
1166
1167 pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
1171 let ts_now = self.clock.borrow().timestamp_ns();
1172 let ttl_ns = (ttl_secs * NANOSECONDS_IN_SECOND as f64) as u64;
1173
1174 self.recent_fills_cache
1175 .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
1176 }
1177
1178 pub fn purge_closed_orders(&mut self) {
1180 let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
1181 return;
1182 };
1183
1184 let ts_now = self.clock.borrow().timestamp_ns();
1185 let buffer_secs = (buffer_mins as u64) * 60;
1186
1187 self.cache
1188 .borrow_mut()
1189 .purge_closed_orders(ts_now, buffer_secs);
1190 }
1191
1192 pub fn purge_closed_positions(&mut self) {
1194 let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
1195 return;
1196 };
1197
1198 let ts_now = self.clock.borrow().timestamp_ns();
1199 let buffer_secs = (buffer_mins as u64) * 60;
1200
1201 self.cache
1202 .borrow_mut()
1203 .purge_closed_positions(ts_now, buffer_secs);
1204 }
1205
1206 pub fn purge_account_events(&mut self) {
1208 let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
1209 return;
1210 };
1211
1212 let ts_now = self.clock.borrow().timestamp_ns();
1213 let lookback_secs = (lookback_mins as u64) * 60;
1214
1215 self.cache
1216 .borrow_mut()
1217 .purge_account_events(ts_now, lookback_secs);
1218 }
1219
1220 fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
1223 self.cache.borrow().order(client_order_id).cloned()
1224 }
1225
1226 fn get_order_by_venue_order_id(&self, venue_order_id: &VenueOrderId) -> Option<OrderAny> {
1227 let cache = self.cache.borrow();
1228 cache
1229 .client_order_id(venue_order_id)
1230 .and_then(|client_order_id| cache.order(client_order_id).cloned())
1231 }
1232
1233 fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
1234 self.cache.borrow().instrument(instrument_id).cloned()
1235 }
1236
1237 fn should_skip_order_report(&self, report: &OrderStatusReport) -> bool {
1238 if let Some(client_order_id) = &report.client_order_id
1239 && self
1240 .config
1241 .filtered_client_order_ids
1242 .contains(client_order_id)
1243 {
1244 log::debug!(
1245 "Skipping order report {client_order_id}: in filtered_client_order_ids list"
1246 );
1247 return true;
1248 }
1249
1250 if !self.should_reconcile_instrument(&report.instrument_id) {
1251 log::debug!(
1252 "Skipping order report for {}: not in reconciliation_instrument_ids",
1253 report.instrument_id
1254 );
1255 return true;
1256 }
1257
1258 false
1259 }
1260
1261 fn should_reconcile_instrument(&self, instrument_id: &InstrumentId) -> bool {
1262 self.config.reconciliation_instrument_ids.is_empty()
1263 || self
1264 .config
1265 .reconciliation_instrument_ids
1266 .contains(instrument_id)
1267 }
1268
1269 fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
1270 let mut events = Vec::new();
1271
1272 let Some(order) = self.get_order(&client_order_id) else {
1273 return events;
1274 };
1275
1276 let ts_now = self.clock.borrow().timestamp_ns();
1277 let ts_last = order.ts_last();
1278
1279 if (ts_now - ts_last) < self.config.open_check_threshold_ns {
1281 return events;
1282 }
1283
1284 if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
1286 && (ts_now - last_activity) < self.config.open_check_threshold_ns
1287 {
1288 return events;
1289 }
1290
1291 let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
1293 *retries += 1;
1294
1295 if *retries >= self.config.open_check_missing_retries {
1297 log::warn!(
1298 "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
1299 );
1300
1301 let ts_now = self.clock.borrow().timestamp_ns();
1302 if let Some(rejected) =
1303 create_reconciliation_rejected(&order, Some("NOT_FOUND_AT_VENUE"), ts_now)
1304 {
1305 events.push(rejected);
1306 }
1307
1308 self.clear_recon_tracking(&client_order_id, true);
1309 } else {
1310 log::debug!(
1311 "Order {} not found at venue, retry {}/{}",
1312 client_order_id,
1313 retries,
1314 self.config.open_check_missing_retries
1315 );
1316 }
1317
1318 events
1319 }
1320
1321 fn check_position_discrepancy(
1322 &mut self,
1323 position: &Position,
1324 venue_report: Option<&PositionStatusReport>,
1325 ) -> Option<Vec<OrderEventAny>> {
1326 let cached_signed_qty = position.signed_decimal_qty();
1328 let venue_signed_qty = venue_report.map_or(Decimal::ZERO, |r| r.signed_decimal_qty);
1329
1330 let tolerance = Decimal::from_str("0.00000001").unwrap();
1331 if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
1332 return None; }
1334
1335 let ts_now = self.clock.borrow().timestamp_ns();
1337 if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
1338 && (ts_now - last_activity) < self.config.position_check_threshold_ns
1339 {
1340 log::debug!(
1341 "Skipping position reconciliation for {}: recent activity within threshold",
1342 position.instrument_id
1343 );
1344 return None;
1345 }
1346
1347 log::warn!(
1348 "Position discrepancy detected for {}: cached_signed_qty={}, venue_signed_qty={}",
1349 position.instrument_id,
1350 cached_signed_qty,
1351 venue_signed_qty
1352 );
1353
1354 let instrument = self
1355 .cache
1356 .borrow()
1357 .instrument(&position.instrument_id)?
1358 .clone();
1359
1360 let account_id = position.account_id;
1361 let instrument_id = position.instrument_id;
1362
1363 let cached_avg_px = if position.avg_px_open > 0.0 {
1364 Decimal::from_str(&position.avg_px_open.to_string()).ok()
1365 } else {
1366 None
1367 };
1368 let venue_avg_px = venue_report.and_then(|r| r.avg_px_open);
1369
1370 let crosses_zero = (cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1372 || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO);
1373
1374 if crosses_zero {
1375 return self.reconcile_cross_zero_position(
1377 &instrument,
1378 account_id,
1379 instrument_id,
1380 cached_signed_qty,
1381 cached_avg_px,
1382 venue_signed_qty,
1383 venue_avg_px,
1384 ts_now,
1385 );
1386 }
1387
1388 let qty_diff = venue_signed_qty - cached_signed_qty;
1389 let order_side = if qty_diff > Decimal::ZERO {
1390 OrderSide::Buy
1391 } else {
1392 OrderSide::Sell
1393 };
1394
1395 let reconciliation_px = calculate_reconciliation_price(
1396 cached_signed_qty,
1397 cached_avg_px,
1398 venue_signed_qty,
1399 venue_avg_px,
1400 );
1401
1402 let fill_px = reconciliation_px.or(venue_avg_px).or(cached_avg_px)?;
1403 let fill_qty = qty_diff.abs();
1404
1405 let ts_event = ts_now.as_u64();
1406 let venue_order_id = create_synthetic_venue_order_id(ts_event);
1407 let order_qty = Quantity::from_decimal_dp(fill_qty, instrument.size_precision()).ok()?;
1408
1409 let order_report = OrderStatusReport::new(
1410 account_id,
1411 instrument_id,
1412 None,
1413 venue_order_id,
1414 order_side,
1415 OrderType::Market,
1416 TimeInForce::Gtc,
1417 OrderStatus::Filled,
1418 order_qty,
1419 order_qty,
1420 ts_now,
1421 ts_now,
1422 ts_now,
1423 None,
1424 )
1425 .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1426 .ok()?;
1427
1428 log::info!(
1429 color = LogColor::Blue as u8;
1430 "Generating synthetic fill for position reconciliation {instrument_id}: side={order_side:?}, qty={fill_qty}, px={fill_px}",
1431 );
1432
1433 let (events, _) =
1434 self.handle_external_order(&order_report, &account_id, &instrument, &[], true);
1435 Some(events)
1436 }
1437
1438 #[allow(clippy::too_many_arguments)]
1441 fn reconcile_cross_zero_position(
1442 &mut self,
1443 instrument: &InstrumentAny,
1444 account_id: AccountId,
1445 instrument_id: InstrumentId,
1446 cached_signed_qty: Decimal,
1447 cached_avg_px: Option<Decimal>,
1448 venue_signed_qty: Decimal,
1449 venue_avg_px: Option<Decimal>,
1450 ts_now: UnixNanos,
1451 ) -> Option<Vec<OrderEventAny>> {
1452 log::info!(
1453 color = LogColor::Blue as u8;
1454 "Position crosses zero for {instrument_id}: cached={cached_signed_qty}, venue={venue_signed_qty}. Splitting into two fills",
1455 );
1456
1457 let mut all_events = Vec::new();
1458
1459 let close_qty = cached_signed_qty.abs();
1461 let close_side = if cached_signed_qty < Decimal::ZERO {
1462 OrderSide::Buy } else {
1464 OrderSide::Sell };
1466
1467 if let Some(close_px) = cached_avg_px {
1468 let close_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1469 let close_order_qty =
1470 Quantity::from_decimal_dp(close_qty, instrument.size_precision()).ok()?;
1471
1472 let close_report = OrderStatusReport::new(
1473 account_id,
1474 instrument_id,
1475 None,
1476 close_venue_order_id,
1477 close_side,
1478 OrderType::Market,
1479 TimeInForce::Gtc,
1480 OrderStatus::Filled,
1481 close_order_qty,
1482 close_order_qty,
1483 ts_now,
1484 ts_now,
1485 ts_now,
1486 None,
1487 )
1488 .with_avg_px(close_px.to_f64().unwrap_or(0.0))
1489 .ok()?;
1490
1491 log::info!(
1492 color = LogColor::Blue as u8;
1493 "Generating close fill for cross-zero {instrument_id}: side={close_side:?}, qty={close_qty}, px={close_px}",
1494 );
1495
1496 let (close_events, _) =
1497 self.handle_external_order(&close_report, &account_id, instrument, &[], true);
1498 all_events.extend(close_events);
1499 } else {
1500 log::warn!("Cannot close position for {instrument_id}: no cached average price");
1501 return None;
1502 }
1503
1504 let open_qty = venue_signed_qty.abs();
1506 let open_side = if venue_signed_qty > Decimal::ZERO {
1507 OrderSide::Buy } else {
1509 OrderSide::Sell };
1511
1512 if let Some(open_px) = venue_avg_px {
1513 let open_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64() + 1);
1514 let open_order_qty =
1515 Quantity::from_decimal_dp(open_qty, instrument.size_precision()).ok()?;
1516
1517 let open_report = OrderStatusReport::new(
1518 account_id,
1519 instrument_id,
1520 None,
1521 open_venue_order_id,
1522 open_side,
1523 OrderType::Market,
1524 TimeInForce::Gtc,
1525 OrderStatus::Filled,
1526 open_order_qty,
1527 open_order_qty,
1528 ts_now,
1529 ts_now,
1530 ts_now,
1531 None,
1532 )
1533 .with_avg_px(open_px.to_f64().unwrap_or(0.0))
1534 .ok()?;
1535
1536 log::info!(
1537 color = LogColor::Blue as u8;
1538 "Generating open fill for cross-zero {instrument_id}: side={open_side:?}, qty={open_qty}, px={open_px}",
1539 );
1540
1541 let (open_events, _) =
1542 self.handle_external_order(&open_report, &account_id, instrument, &[], true);
1543 all_events.extend(open_events);
1544 } else {
1545 log::warn!("Cannot open new position for {instrument_id}: no venue average price");
1546 return Some(all_events);
1547 }
1548
1549 Some(all_events)
1550 }
1551
1552 fn create_position_from_report(
1557 &mut self,
1558 report: &PositionStatusReport,
1559 account_id: &AccountId,
1560 instrument: &InstrumentAny,
1561 ) -> Option<Vec<OrderEventAny>> {
1562 let instrument_id = report.instrument_id;
1563 let venue_signed_qty = report.signed_decimal_qty;
1564
1565 if venue_signed_qty == Decimal::ZERO {
1566 return None;
1567 }
1568
1569 let order_side = if venue_signed_qty > Decimal::ZERO {
1570 OrderSide::Buy
1571 } else {
1572 OrderSide::Sell
1573 };
1574
1575 let qty_abs = venue_signed_qty.abs();
1576 let venue_avg_px = report.avg_px_open?;
1577
1578 let ts_now = self.clock.borrow().timestamp_ns();
1579 let venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1580 let order_qty = Quantity::from_decimal_dp(qty_abs, instrument.size_precision()).ok()?;
1581
1582 let mut order_report = OrderStatusReport::new(
1583 *account_id,
1584 instrument_id,
1585 None,
1586 venue_order_id,
1587 order_side,
1588 OrderType::Market,
1589 TimeInForce::Gtc,
1590 OrderStatus::Filled,
1591 order_qty,
1592 order_qty,
1593 ts_now,
1594 ts_now,
1595 ts_now,
1596 None,
1597 )
1598 .with_avg_px(venue_avg_px.to_f64().unwrap_or(0.0))
1599 .ok()?;
1600
1601 if let Some(venue_position_id) = report.venue_position_id {
1603 order_report = order_report.with_venue_position_id(venue_position_id);
1604 }
1605
1606 log::info!(
1607 color = LogColor::Blue as u8;
1608 "Creating position from venue report for {instrument_id}: side={order_side:?}, qty={qty_abs}, avg_px={venue_avg_px}",
1609 );
1610
1611 let (events, _) =
1612 self.handle_external_order(&order_report, account_id, instrument, &[], true);
1613 Some(events)
1614 }
1615
1616 fn reconcile_position_report(
1617 &mut self,
1618 report: &PositionStatusReport,
1619 account_id: &AccountId,
1620 instruments_with_unattributed_fills: &AHashSet<InstrumentId>,
1621 positions_with_fills: &AHashSet<PositionId>,
1622 ) -> Option<Vec<OrderEventAny>> {
1623 if report.venue_position_id.is_some() {
1624 self.reconcile_position_report_hedging(
1625 report,
1626 account_id,
1627 instruments_with_unattributed_fills,
1628 positions_with_fills,
1629 )
1630 } else {
1631 self.reconcile_position_report_netting(report, account_id)
1632 }
1633 }
1634
1635 fn reconcile_position_report_hedging(
1636 &mut self,
1637 report: &PositionStatusReport,
1638 account_id: &AccountId,
1639 instruments_with_unattributed_fills: &AHashSet<InstrumentId>,
1640 positions_with_fills: &AHashSet<PositionId>,
1641 ) -> Option<Vec<OrderEventAny>> {
1642 let venue_position_id = report.venue_position_id?;
1643
1644 if positions_with_fills.contains(&venue_position_id) {
1646 log::debug!(
1647 "Skipping hedge position {venue_position_id} reconciliation: fills already in batch"
1648 );
1649 return None;
1650 }
1651
1652 if instruments_with_unattributed_fills.contains(&report.instrument_id) {
1655 log::debug!(
1656 "Skipping hedge position {venue_position_id} reconciliation: unattributed fills in batch"
1657 );
1658 return None;
1659 }
1660
1661 log::info!(
1662 color = LogColor::Blue as u8;
1663 "Reconciling HEDGE position for {}, venue_position_id={}",
1664 report.instrument_id,
1665 venue_position_id
1666 );
1667
1668 let position = {
1669 let cache = self.cache.borrow();
1670 cache.position(&venue_position_id).cloned()
1671 };
1672
1673 match position {
1674 Some(position) => {
1675 let cached_signed_qty = position.signed_decimal_qty();
1676 let venue_signed_qty = report.signed_decimal_qty;
1677
1678 if cached_signed_qty == venue_signed_qty {
1679 log::debug!(
1680 "Hedge position {venue_position_id} matches venue: qty={cached_signed_qty}"
1681 );
1682 return None;
1683 }
1684
1685 if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
1686 return None;
1687 }
1688
1689 if !self.config.generate_missing_orders {
1690 log::error!(
1691 "Cannot reconcile {} {}: position net qty {} != reported net qty {} \
1692 and `generate_missing_orders` is disabled",
1693 report.instrument_id,
1694 venue_position_id,
1695 cached_signed_qty,
1696 venue_signed_qty
1697 );
1698 return None;
1699 }
1700
1701 self.reconcile_hedge_position_discrepancy(
1702 report,
1703 account_id,
1704 &position,
1705 cached_signed_qty,
1706 )
1707 }
1708 None => {
1709 if report.signed_decimal_qty == Decimal::ZERO {
1710 return None;
1711 }
1712
1713 if !self.config.generate_missing_orders {
1714 log::error!(
1715 "Cannot reconcile position: {venue_position_id} not found and `generate_missing_orders` is disabled"
1716 );
1717 return None;
1718 }
1719
1720 self.reconcile_missing_hedge_position(report, account_id)
1721 }
1722 }
1723 }
1724
1725 fn reconcile_hedge_position_discrepancy(
1726 &mut self,
1727 report: &PositionStatusReport,
1728 account_id: &AccountId,
1729 position: &Position,
1730 cached_signed_qty: Decimal,
1731 ) -> Option<Vec<OrderEventAny>> {
1732 let instrument = self.get_instrument(&report.instrument_id)?;
1733 let venue_signed_qty = report.signed_decimal_qty;
1734
1735 let diff = (cached_signed_qty - venue_signed_qty).abs();
1736 let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
1737
1738 if diff_qty.is_zero() {
1739 log::debug!(
1740 "Difference quantity rounds to zero for {}, skipping",
1741 instrument.id()
1742 );
1743 return None;
1744 }
1745
1746 let venue_position_id = report.venue_position_id?;
1747 log::warn!(
1748 "Hedge position discrepancy for {} {}: cached={}, venue={}, generating reconciliation order",
1749 report.instrument_id,
1750 venue_position_id,
1751 cached_signed_qty,
1752 venue_signed_qty
1753 );
1754
1755 let current_avg_px = if position.avg_px_open > 0.0 {
1756 Decimal::from_str(&position.avg_px_open.to_string()).ok()
1757 } else {
1758 None
1759 };
1760
1761 self.create_position_reconciliation_order(
1762 report,
1763 account_id,
1764 &instrument,
1765 cached_signed_qty,
1766 diff_qty,
1767 current_avg_px,
1768 )
1769 }
1770
1771 fn reconcile_missing_hedge_position(
1772 &mut self,
1773 report: &PositionStatusReport,
1774 account_id: &AccountId,
1775 ) -> Option<Vec<OrderEventAny>> {
1776 let instrument = self.get_instrument(&report.instrument_id)?;
1777 let venue_signed_qty = report.signed_decimal_qty;
1778
1779 let qty = venue_signed_qty.abs();
1780 let diff_qty = Quantity::from_decimal_dp(qty, instrument.size_precision()).ok()?;
1781
1782 if diff_qty.is_zero() {
1783 return None;
1784 }
1785
1786 let venue_position_id = report.venue_position_id?;
1787 log::warn!(
1788 "Missing hedge position for {} {}: venue reports {}, generating reconciliation order",
1789 report.instrument_id,
1790 venue_position_id,
1791 venue_signed_qty
1792 );
1793
1794 self.create_position_reconciliation_order(
1795 report,
1796 account_id,
1797 &instrument,
1798 Decimal::ZERO,
1799 diff_qty,
1800 None,
1801 )
1802 }
1803
1804 fn reconcile_position_report_netting(
1805 &mut self,
1806 report: &PositionStatusReport,
1807 account_id: &AccountId,
1808 ) -> Option<Vec<OrderEventAny>> {
1809 let instrument_id = report.instrument_id;
1810
1811 log::info!(
1812 color = LogColor::Blue as u8;
1813 "Reconciling NET position for {instrument_id}",
1814 );
1815
1816 let instrument = self.get_instrument(&instrument_id)?;
1817
1818 let (cached_signed_qty, cached_avg_px) = {
1819 let cache = self.cache.borrow();
1820 let positions = cache.positions_open(None, Some(&instrument_id), None, None, None);
1821
1822 if positions.is_empty() {
1823 (Decimal::ZERO, None)
1824 } else {
1825 let mut total_signed_qty = Decimal::ZERO;
1826 let mut total_value = Decimal::ZERO;
1827 let mut total_qty = Decimal::ZERO;
1828
1829 for pos in positions {
1830 total_signed_qty += pos.signed_decimal_qty();
1831 let qty = pos.signed_decimal_qty().abs();
1832 if pos.avg_px_open > 0.0
1833 && qty > Decimal::ZERO
1834 && let Ok(avg_px) = Decimal::from_str(&pos.avg_px_open.to_string())
1835 {
1836 total_value += avg_px * qty;
1837 total_qty += qty;
1838 }
1839 }
1840
1841 let avg_px = if total_qty > Decimal::ZERO {
1842 Some(total_value / total_qty)
1843 } else {
1844 None
1845 };
1846
1847 (total_signed_qty, avg_px)
1848 }
1849 };
1850
1851 let venue_signed_qty = report.signed_decimal_qty;
1852
1853 log::info!(
1854 color = LogColor::Blue as u8;
1855 "venue_signed_qty={venue_signed_qty}, cached_signed_qty={cached_signed_qty}",
1856 );
1857
1858 let tolerance = Decimal::from_str("0.00000001").unwrap_or(Decimal::ZERO);
1859 if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
1860 log::debug!("Position quantities match for {instrument_id}, no reconciliation needed");
1861 return None;
1862 }
1863
1864 if !self.config.generate_missing_orders {
1865 log::warn!(
1866 "Discrepancy for {instrument_id} position when `generate_missing_orders` disabled, skipping"
1867 );
1868 return None;
1869 }
1870
1871 let diff = (cached_signed_qty - venue_signed_qty).abs();
1872 let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
1873
1874 if diff_qty.is_zero() {
1875 log::debug!(
1876 "Difference quantity rounds to zero for {instrument_id}, skipping order generation"
1877 );
1878 return None;
1879 }
1880
1881 let crosses_zero = cached_signed_qty != Decimal::ZERO
1882 && venue_signed_qty != Decimal::ZERO
1883 && ((cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1884 || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO));
1885
1886 if crosses_zero {
1887 let ts_now = self.clock.borrow().timestamp_ns();
1888 return self.reconcile_cross_zero_position(
1889 &instrument,
1890 *account_id,
1891 instrument_id,
1892 cached_signed_qty,
1893 cached_avg_px,
1894 venue_signed_qty,
1895 report.avg_px_open,
1896 ts_now,
1897 );
1898 }
1899
1900 if cached_signed_qty == Decimal::ZERO {
1901 return self.create_position_from_report(report, account_id, &instrument);
1902 }
1903
1904 self.create_position_reconciliation_order(
1905 report,
1906 account_id,
1907 &instrument,
1908 cached_signed_qty,
1909 diff_qty,
1910 cached_avg_px,
1911 )
1912 }
1913
1914 fn create_position_reconciliation_order(
1915 &mut self,
1916 report: &PositionStatusReport,
1917 account_id: &AccountId,
1918 instrument: &InstrumentAny,
1919 cached_signed_qty: Decimal,
1920 diff_qty: Quantity,
1921 current_avg_px: Option<Decimal>,
1922 ) -> Option<Vec<OrderEventAny>> {
1923 let venue_signed_qty = report.signed_decimal_qty;
1924 let instrument_id = report.instrument_id;
1925
1926 let order_side = if venue_signed_qty > cached_signed_qty {
1927 OrderSide::Buy
1928 } else {
1929 OrderSide::Sell
1930 };
1931
1932 let reconciliation_px = calculate_reconciliation_price(
1933 cached_signed_qty,
1934 current_avg_px,
1935 venue_signed_qty,
1936 report.avg_px_open,
1937 );
1938
1939 let fill_px = reconciliation_px
1940 .or(report.avg_px_open)
1941 .or(current_avg_px)?;
1942
1943 let ts_now = self.clock.borrow().timestamp_ns();
1944 let venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1945
1946 let mut order_report = OrderStatusReport::new(
1947 *account_id,
1948 instrument_id,
1949 None,
1950 venue_order_id,
1951 order_side,
1952 OrderType::Market,
1953 TimeInForce::Gtc,
1954 OrderStatus::Filled,
1955 diff_qty,
1956 diff_qty,
1957 ts_now,
1958 ts_now,
1959 ts_now,
1960 None,
1961 )
1962 .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1963 .ok()?;
1964
1965 if let Some(venue_position_id) = report.venue_position_id {
1966 order_report = order_report.with_venue_position_id(venue_position_id);
1967 }
1968
1969 log::info!(
1970 color = LogColor::Blue as u8;
1971 "Generating reconciliation order for {instrument_id}: side={order_side:?}, qty={diff_qty}, px={fill_px}",
1972 );
1973
1974 let (events, _) =
1975 self.handle_external_order(&order_report, account_id, instrument, &[], true);
1976 Some(events)
1977 }
1978
1979 fn reconcile_order_report(
1980 &self,
1981 order: &OrderAny,
1982 report: &OrderStatusReport,
1983 instrument: Option<&InstrumentAny>,
1984 ) -> Option<OrderEventAny> {
1985 let ts_now = self.clock.borrow().timestamp_ns();
1986 reconcile_order_report(order, report, instrument, ts_now)
1987 }
1988
1989 fn reconcile_order_with_fills(
1994 &mut self,
1995 order: &mut OrderAny,
1996 report: &OrderStatusReport,
1997 fills: &[&FillReport],
1998 instrument: Option<&InstrumentAny>,
1999 ) -> Vec<OrderEventAny> {
2000 let mut events = Vec::new();
2001 let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2002 sorted_fills.sort_by_key(|f| f.ts_event);
2003
2004 let ts_now = self.clock.borrow().timestamp_ns();
2005
2006 match report.order_status {
2007 OrderStatus::Canceled => {
2008 if report.ts_triggered.is_some() && order.status() != OrderStatus::Triggered {
2010 events.push(create_reconciliation_triggered(order, report, ts_now));
2011 }
2012
2013 if let Some(inst) = instrument {
2016 for fill in &sorted_fills {
2017 if let Some(event) = self.create_order_fill(order, fill, inst) {
2018 events.push(event);
2019 }
2020 }
2021 }
2022 if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2023 events.push(event);
2024 }
2025 }
2026 OrderStatus::Expired => {
2027 if report.ts_triggered.is_some() && order.status() != OrderStatus::Triggered {
2029 events.push(create_reconciliation_triggered(order, report, ts_now));
2030 }
2031
2032 if let Some(inst) = instrument {
2034 for fill in &sorted_fills {
2035 if let Some(event) = self.create_order_fill(order, fill, inst) {
2036 events.push(event);
2037 }
2038 }
2039 }
2040 if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2041 events.push(event);
2042 }
2043 }
2044 _ => {
2045 if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2046 events.push(event);
2047 }
2048 if let Some(inst) = instrument {
2049 for fill in &sorted_fills {
2050 if let Some(event) = self.create_order_fill(order, fill, inst) {
2051 events.push(event);
2052 }
2053 }
2054 }
2055 }
2056 }
2057
2058 events
2059 }
2060
2061 fn handle_external_order(
2062 &mut self,
2063 report: &OrderStatusReport,
2064 account_id: &AccountId,
2065 instrument: &InstrumentAny,
2066 fills: &[&FillReport],
2067 is_synthetic: bool,
2068 ) -> (Vec<OrderEventAny>, Option<ExternalOrderMetadata>) {
2069 let (strategy_id, tags) =
2070 if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
2071 let order_id = report
2072 .client_order_id
2073 .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
2074 log::info!(
2075 color = LogColor::Blue as u8;
2076 "External order {} for {} claimed by strategy {}",
2077 order_id,
2078 report.instrument_id,
2079 claimed_strategy,
2080 );
2081 (*claimed_strategy, None)
2082 } else {
2083 let tag = if is_synthetic {
2085 *TAG_RECONCILIATION
2086 } else {
2087 *TAG_VENUE
2088 };
2089 (StrategyId::from("EXTERNAL"), Some(vec![tag]))
2090 };
2091
2092 if self.config.filter_unclaimed_external && !is_synthetic {
2094 return (Vec::new(), None);
2095 }
2096
2097 let client_order_id = report
2098 .client_order_id
2099 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
2100
2101 let ts_now = self.clock.borrow().timestamp_ns();
2102
2103 let initialized = OrderInitialized::new(
2104 self.config.trader_id,
2105 strategy_id,
2106 report.instrument_id,
2107 client_order_id,
2108 report.order_side,
2109 report.order_type,
2110 report.quantity,
2111 report.time_in_force,
2112 report.post_only,
2113 report.reduce_only,
2114 false, true, UUID4::new(),
2117 ts_now,
2118 ts_now,
2119 report.price,
2120 report.trigger_price,
2121 report.trigger_type,
2122 report.limit_offset,
2123 report.trailing_offset,
2124 Some(report.trailing_offset_type),
2125 report.expire_time,
2126 report.display_qty,
2127 None, None, Some(report.contingency_type),
2130 report.order_list_id,
2131 report.linked_order_ids.clone(),
2132 report.parent_order_id,
2133 None, None, None, tags,
2137 );
2138
2139 let events = vec![OrderEventAny::Initialized(initialized)];
2140 let order = match OrderAny::from_events(events) {
2141 Ok(order) => order,
2142 Err(e) => {
2143 log::error!("Failed to create order from report: {e}");
2144 return (Vec::new(), None);
2145 }
2146 };
2147
2148 {
2149 let mut cache = self.cache.borrow_mut();
2150 if let Err(e) = cache.add_order(order.clone(), None, None, false) {
2151 log::error!("Failed to add external order to cache: {e}");
2152 return (Vec::new(), None);
2153 }
2154
2155 if let Err(e) =
2156 cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
2157 {
2158 log::warn!("Failed to add venue order ID index: {e}");
2159 }
2160 }
2161
2162 log::info!(
2163 color = LogColor::Blue as u8;
2164 "Created external order {} ({}) for {} [{}]",
2165 client_order_id,
2166 report.venue_order_id,
2167 report.instrument_id,
2168 report.order_status,
2169 );
2170
2171 let ts_now = self.clock.borrow().timestamp_ns();
2172
2173 let mut order_events =
2176 generate_external_order_status_events(&order, report, account_id, instrument, ts_now);
2177
2178 if !fills.is_empty() {
2179 let mut cached_order = self.get_order(&client_order_id).unwrap();
2180 let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2181 sorted_fills.sort_by_key(|f| f.ts_event);
2182
2183 match report.order_status {
2184 OrderStatus::Canceled | OrderStatus::Expired => {
2185 let terminal_event = order_events.pop();
2186 for fill in sorted_fills {
2187 if let Some(fill_event) =
2188 self.create_order_fill(&mut cached_order, fill, instrument)
2189 {
2190 order_events.push(fill_event);
2191 }
2192 }
2193 if let Some(event) = terminal_event {
2194 order_events.push(event);
2195 }
2196 }
2197 OrderStatus::Filled | OrderStatus::PartiallyFilled => {
2198 if order_events
2200 .last()
2201 .is_some_and(|e| matches!(e, OrderEventAny::Filled(_)))
2202 {
2203 order_events.pop();
2204 }
2205
2206 let mut real_fill_total = Decimal::ZERO;
2207 for fill in &sorted_fills {
2208 if let Some(fill_event) =
2209 self.create_order_fill(&mut cached_order, fill, instrument)
2210 {
2211 real_fill_total += fill.last_qty.as_decimal();
2212 order_events.push(fill_event);
2213 }
2214 }
2215
2216 let report_filled = report.filled_qty.as_decimal();
2217 if real_fill_total < report_filled {
2218 let diff_decimal = report_filled - real_fill_total;
2219 if let Ok(diff) =
2220 Quantity::from_decimal_dp(diff_decimal, instrument.size_precision())
2221 && let Some(inferred_fill) = create_inferred_fill_for_qty(
2222 &cached_order,
2223 report,
2224 account_id,
2225 instrument,
2226 diff,
2227 ts_now,
2228 )
2229 {
2230 order_events.push(inferred_fill);
2231 }
2232 }
2233 }
2234 _ => {}
2235 }
2236 }
2237
2238 let metadata = ExternalOrderMetadata {
2239 client_order_id,
2240 venue_order_id: report.venue_order_id,
2241 instrument_id: report.instrument_id,
2242 strategy_id,
2243 ts_init: ts_now,
2244 };
2245
2246 (order_events, Some(metadata))
2247 }
2248
2249 fn adjust_mass_status_fills(
2254 &self,
2255 mass_status: &ExecutionMassStatus,
2256 ) -> (
2257 IndexMap<VenueOrderId, OrderStatusReport>,
2258 IndexMap<VenueOrderId, Vec<FillReport>>,
2259 ) {
2260 let mut final_orders: IndexMap<VenueOrderId, OrderStatusReport> =
2261 mass_status.order_reports();
2262 let mut final_fills: IndexMap<VenueOrderId, Vec<FillReport>> = mass_status.fill_reports();
2263
2264 let mut instruments_to_adjust = Vec::new();
2265 for (instrument_id, position_reports) in mass_status.position_reports() {
2266 if !self.should_reconcile_instrument(&instrument_id) {
2267 log::debug!(
2268 "Skipping fill adjustment for {instrument_id}: not in reconciliation_instrument_ids"
2269 );
2270 continue;
2271 }
2272
2273 let is_hedge_mode = position_reports
2276 .iter()
2277 .any(|r| r.venue_position_id.is_some());
2278 if is_hedge_mode {
2279 log::debug!(
2280 "Skipping fill adjustment for {instrument_id}: hedge mode (has venue_position_id)"
2281 );
2282 continue;
2283 }
2284
2285 if let Some(instrument) = self.get_instrument(&instrument_id) {
2286 instruments_to_adjust.push(instrument);
2287 } else {
2288 log::debug!(
2289 "Skipping fill adjustment for {instrument_id}: instrument not found in cache"
2290 );
2291 }
2292 }
2293
2294 if instruments_to_adjust.is_empty() {
2295 return (final_orders, final_fills);
2296 }
2297
2298 log_info!(
2299 "Adjusting fills for {} instrument(s) with position reports",
2300 instruments_to_adjust.len(),
2301 color = LogColor::Blue
2302 );
2303
2304 for instrument in &instruments_to_adjust {
2305 let instrument_id = instrument.id();
2306
2307 match process_mass_status_for_reconciliation(mass_status, instrument, None) {
2308 Ok(result) => {
2309 final_orders.retain(|_, order| order.instrument_id != instrument_id);
2310 final_fills.retain(|_, fills| {
2311 fills
2312 .first()
2313 .is_none_or(|f| f.instrument_id != instrument_id)
2314 });
2315
2316 for (venue_order_id, order) in result.orders {
2317 final_orders.insert(venue_order_id, order);
2318 }
2319 for (venue_order_id, fills) in result.fills {
2320 final_fills.insert(venue_order_id, fills);
2321 }
2322 }
2323 Err(e) => {
2324 log::warn!("Failed to adjust fills for {instrument_id}: {e}");
2325 }
2326 }
2327 }
2328
2329 log_info!(
2330 "After adjustment: {} order(s), {} fill group(s)",
2331 final_orders.len(),
2332 final_fills.len(),
2333 color = LogColor::Blue
2334 );
2335
2336 (final_orders, final_fills)
2337 }
2338
2339 fn deduplicate_order_reports<'a>(
2344 &self,
2345 reports: impl Iterator<Item = &'a OrderStatusReport>,
2346 ) -> AHashMap<VenueOrderId, &'a OrderStatusReport> {
2347 let mut best_reports: AHashMap<VenueOrderId, &'a OrderStatusReport> = AHashMap::new();
2348
2349 for report in reports {
2350 let dominated = best_reports
2351 .get(&report.venue_order_id)
2352 .is_some_and(|existing| self.is_more_advanced(existing, report));
2353
2354 if !dominated {
2355 best_reports.insert(report.venue_order_id, report);
2356 }
2357 }
2358
2359 best_reports
2360 }
2361
2362 fn is_more_advanced(&self, a: &OrderStatusReport, b: &OrderStatusReport) -> bool {
2363 if a.filled_qty > b.filled_qty {
2364 return true;
2365 }
2366 if a.filled_qty < b.filled_qty {
2367 return false;
2368 }
2369
2370 Self::status_priority(a.order_status) > Self::status_priority(b.order_status)
2372 }
2373
2374 const fn status_priority(status: OrderStatus) -> u8 {
2375 match status {
2376 OrderStatus::Initialized | OrderStatus::Submitted | OrderStatus::Emulated => 0,
2377 OrderStatus::Released | OrderStatus::Denied => 1,
2378 OrderStatus::Accepted | OrderStatus::PendingUpdate | OrderStatus::PendingCancel => 2,
2379 OrderStatus::Triggered => 3,
2380 OrderStatus::PartiallyFilled => 4,
2381 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => 5,
2382 OrderStatus::Filled => 6,
2383 }
2384 }
2385
2386 fn is_exact_order_match(&self, order: &OrderAny, report: &OrderStatusReport) -> bool {
2387 order.status() == report.order_status
2388 && order.filled_qty() == report.filled_qty
2389 && !should_reconciliation_update(order, report)
2390 }
2391
2392 fn create_order_fill(
2393 &mut self,
2394 order: &mut OrderAny,
2395 fill: &FillReport,
2396 instrument: &InstrumentAny,
2397 ) -> Option<OrderEventAny> {
2398 if self.processed_fills.contains_key(&fill.trade_id) {
2399 return None;
2400 }
2401
2402 self.processed_fills
2403 .insert(fill.trade_id, order.client_order_id());
2404
2405 Some(OrderEventAny::Filled(OrderFilled::new(
2406 order.trader_id(),
2407 order.strategy_id(),
2408 order.instrument_id(),
2409 order.client_order_id(),
2410 fill.venue_order_id,
2411 fill.account_id,
2412 fill.trade_id,
2413 fill.order_side,
2414 order.order_type(),
2415 fill.last_qty,
2416 fill.last_px,
2417 instrument.quote_currency(),
2418 fill.liquidity_side,
2419 fill.report_id,
2420 fill.ts_event,
2421 self.clock.borrow().timestamp_ns(),
2422 false,
2423 fill.venue_position_id,
2424 Some(fill.commission),
2425 )))
2426 }
2427}