1use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25 cache::Cache,
26 clients::ExecutionClient,
27 clock::Clock,
28 enums::LogColor,
29 log_info,
30 messages::execution::report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
31};
32use nautilus_core::{UUID4, UnixNanos};
33use nautilus_execution::reconciliation::{
34 calculate_reconciliation_price, create_reconciliation_rejected,
35 create_synthetic_venue_order_id, generate_external_order_status_events, reconcile_order_report,
36 should_reconciliation_update,
37};
38use nautilus_model::{
39 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
40 events::{OrderEventAny, OrderFilled, OrderInitialized},
41 identifiers::{
42 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
43 },
44 instruments::{Instrument, InstrumentAny},
45 orders::{Order, OrderAny},
46 position::Position,
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48 types::Quantity,
49};
50use rust_decimal::{Decimal, prelude::ToPrimitive};
51use ustr::Ustr;
52
53use crate::config::LiveExecEngineConfig;
54
55#[derive(Debug, Clone)]
57pub struct ExecutionManagerConfig {
58 pub trader_id: TraderId,
60 pub reconciliation: bool,
62 pub reconciliation_startup_delay_secs: f64,
64 pub lookback_mins: Option<u64>,
66 pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
68 pub filter_unclaimed_external: bool,
70 pub filter_position_reports: bool,
72 pub filtered_client_order_ids: AHashSet<ClientOrderId>,
74 pub generate_missing_orders: bool,
76 pub inflight_check_interval_ms: u32,
78 pub inflight_threshold_ms: u64,
80 pub inflight_max_retries: u32,
82 pub open_check_interval_secs: Option<f64>,
84 pub open_check_lookback_mins: Option<u64>,
86 pub open_check_threshold_ns: u64,
88 pub open_check_missing_retries: u32,
90 pub open_check_open_only: bool,
92 pub max_single_order_queries_per_cycle: u32,
94 pub single_order_query_delay_ms: u32,
96 pub position_check_interval_secs: Option<f64>,
98 pub position_check_lookback_mins: u64,
100 pub position_check_threshold_ns: u64,
102 pub purge_closed_orders_buffer_mins: Option<u32>,
104 pub purge_closed_positions_buffer_mins: Option<u32>,
106 pub purge_account_events_lookback_mins: Option<u32>,
108 pub purge_from_database: bool,
110}
111
112impl Default for ExecutionManagerConfig {
113 fn default() -> Self {
114 Self {
115 trader_id: TraderId::default(),
116 reconciliation: true,
117 reconciliation_startup_delay_secs: 10.0,
118 lookback_mins: Some(60),
119 reconciliation_instrument_ids: AHashSet::new(),
120 filter_unclaimed_external: false,
121 filter_position_reports: false,
122 filtered_client_order_ids: AHashSet::new(),
123 generate_missing_orders: true,
124 inflight_check_interval_ms: 2_000,
125 inflight_threshold_ms: 5_000,
126 inflight_max_retries: 5,
127 open_check_interval_secs: None,
128 open_check_lookback_mins: Some(60),
129 open_check_threshold_ns: 5_000_000_000,
130 open_check_missing_retries: 5,
131 open_check_open_only: true,
132 max_single_order_queries_per_cycle: 5,
133 single_order_query_delay_ms: 100,
134 position_check_interval_secs: None,
135 position_check_lookback_mins: 60,
136 position_check_threshold_ns: 60_000_000_000,
137 purge_closed_orders_buffer_mins: None,
138 purge_closed_positions_buffer_mins: None,
139 purge_account_events_lookback_mins: None,
140 purge_from_database: false,
141 }
142 }
143}
144
145impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
146 fn from(config: &LiveExecEngineConfig) -> Self {
147 let filtered_client_order_ids: AHashSet<ClientOrderId> = config
148 .filtered_client_order_ids
149 .clone()
150 .unwrap_or_default()
151 .into_iter()
152 .map(|value| ClientOrderId::from(value.as_str()))
153 .collect();
154
155 let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
156 .reconciliation_instrument_ids
157 .clone()
158 .unwrap_or_default()
159 .into_iter()
160 .map(|value| InstrumentId::from(value.as_str()))
161 .collect();
162
163 Self {
164 trader_id: TraderId::default(), reconciliation: config.reconciliation,
166 reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
167 lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
168 reconciliation_instrument_ids,
169 filter_unclaimed_external: config.filter_unclaimed_external_orders,
170 filter_position_reports: config.filter_position_reports,
171 filtered_client_order_ids,
172 generate_missing_orders: config.generate_missing_orders,
173 inflight_check_interval_ms: config.inflight_check_interval_ms,
174 inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
175 inflight_max_retries: config.inflight_check_retries,
176 open_check_interval_secs: config.open_check_interval_secs,
177 open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
178 open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
179 open_check_missing_retries: config.open_check_missing_retries,
180 open_check_open_only: config.open_check_open_only,
181 max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
182 single_order_query_delay_ms: config.single_order_query_delay_ms,
183 position_check_interval_secs: config.position_check_interval_secs,
184 position_check_lookback_mins: config.position_check_lookback_mins as u64,
185 position_check_threshold_ns: (config.position_check_threshold_ms as u64) * 1_000_000,
186 purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
187 purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
188 purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
189 purge_from_database: config.purge_from_database,
190 }
191 }
192}
193
194impl ExecutionManagerConfig {
195 #[must_use]
197 pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
198 self.trader_id = trader_id;
199 self
200 }
201}
202
203#[derive(Debug, Clone)]
206pub struct ExecutionReport {
207 pub client_order_id: ClientOrderId,
208 pub venue_order_id: Option<VenueOrderId>,
209 pub status: OrderStatus,
210 pub filled_qty: Quantity,
211 pub avg_px: Option<f64>,
212 pub ts_event: UnixNanos,
213}
214
215#[derive(Debug, Clone)]
217struct InflightCheck {
218 #[allow(dead_code)]
219 pub client_order_id: ClientOrderId,
220 pub ts_submitted: UnixNanos,
221 pub retry_count: u32,
222 pub last_query_ts: Option<UnixNanos>,
223}
224
225#[derive(Clone)]
248pub struct ExecutionManager {
249 clock: Rc<RefCell<dyn Clock>>,
250 cache: Rc<RefCell<Cache>>,
251 config: ExecutionManagerConfig,
252 inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
253 external_order_claims: AHashMap<InstrumentId, StrategyId>,
254 processed_fills: AHashMap<TradeId, ClientOrderId>,
255 recon_check_retries: AHashMap<ClientOrderId, u32>,
256 ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
257 order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
258 position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
259 recent_fills_cache: AHashMap<TradeId, UnixNanos>,
260}
261
262impl Debug for ExecutionManager {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 f.debug_struct(stringify!(ExecutionManager))
265 .field("config", &self.config)
266 .field("inflight_checks", &self.inflight_checks)
267 .field("external_order_claims", &self.external_order_claims)
268 .field("processed_fills", &self.processed_fills)
269 .field("recon_check_retries", &self.recon_check_retries)
270 .finish()
271 }
272}
273
274impl ExecutionManager {
275 pub fn new(
277 clock: Rc<RefCell<dyn Clock>>,
278 cache: Rc<RefCell<Cache>>,
279 config: ExecutionManagerConfig,
280 ) -> Self {
281 Self {
282 clock,
283 cache,
284 config,
285 inflight_checks: AHashMap::new(),
286 external_order_claims: AHashMap::new(),
287 processed_fills: AHashMap::new(),
288 recon_check_retries: AHashMap::new(),
289 ts_last_query: AHashMap::new(),
290 order_local_activity_ns: AHashMap::new(),
291 position_local_activity_ns: AHashMap::new(),
292 recent_fills_cache: AHashMap::new(),
293 }
294 }
295
296 pub async fn reconcile_execution_mass_status(
298 &mut self,
299 mass_status: ExecutionMassStatus,
300 ) -> Vec<OrderEventAny> {
301 let venue = mass_status.venue;
302 let order_count = mass_status.order_reports().len();
303 let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
304 let position_count = mass_status.position_reports().len();
305
306 log_info!(
307 "Reconciling ExecutionMassStatus for {}",
308 venue,
309 color = LogColor::Blue
310 );
311 log_info!(
312 "Received {} order(s), {} fill(s), {} position(s)",
313 order_count,
314 fill_count,
315 position_count,
316 color = LogColor::Blue
317 );
318
319 let mut events = Vec::new();
320 let mut orders_reconciled = 0usize;
321 let mut external_orders_created = 0usize;
322 let mut open_orders_initialized = 0usize;
323 let mut orders_skipped_no_instrument = 0usize;
324 let mut orders_skipped_duplicate = 0usize;
325 let mut fills_applied = 0usize;
326
327 let reports = mass_status.order_reports();
329 let order_reports = self.deduplicate_order_reports(reports.values());
330
331 for report in order_reports.values() {
332 if let Some(client_order_id) = &report.client_order_id {
333 if let Some(cached_order) = self.get_order(client_order_id)
334 && self.is_exact_order_match(&cached_order, report)
335 {
336 log::debug!("Skipping order {client_order_id}: already in sync with venue");
337 orders_skipped_duplicate += 1;
338 continue;
339 }
340
341 if let Some(order) = self.get_order(client_order_id) {
342 let instrument = self.get_instrument(&report.instrument_id);
343 log::info!(
344 color = LogColor::Blue as u8;
345 "Reconciling {} {} {} [{}] -> [{}]",
346 client_order_id,
347 report.venue_order_id,
348 report.instrument_id,
349 order.status(),
350 report.order_status,
351 );
352 if let Some(event) =
353 self.reconcile_order_report(&order, report, instrument.as_ref())
354 {
355 orders_reconciled += 1;
356 events.push(event);
357 }
358 } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id)
359 {
360 let instrument = self.get_instrument(&report.instrument_id);
362 log::info!(
363 color = LogColor::Blue as u8;
364 "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
365 order.client_order_id(),
366 report.venue_order_id,
367 report.instrument_id,
368 order.status(),
369 report.order_status,
370 );
371 if let Some(event) =
372 self.reconcile_order_report(&order, report, instrument.as_ref())
373 {
374 orders_reconciled += 1;
375 events.push(event);
376 }
377 } else if !self.config.filter_unclaimed_external {
378 if let Some(instrument) = self.get_instrument(&report.instrument_id) {
379 let external_events = self.handle_external_order(
380 report,
381 &mass_status.account_id,
382 &instrument,
383 );
384 if !external_events.is_empty() {
385 external_orders_created += 1;
386 if report.order_status.is_open() {
387 open_orders_initialized += 1;
388 }
389 events.extend(external_events);
390 }
391 } else {
392 orders_skipped_no_instrument += 1;
393 }
394 }
395 } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id) {
396 let instrument = self.get_instrument(&report.instrument_id);
398 log::info!(
399 color = LogColor::Blue as u8;
400 "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
401 order.client_order_id(),
402 report.venue_order_id,
403 report.instrument_id,
404 order.status(),
405 report.order_status,
406 );
407 if let Some(event) =
408 self.reconcile_order_report(&order, report, instrument.as_ref())
409 {
410 orders_reconciled += 1;
411 events.push(event);
412 }
413 } else if !self.config.filter_unclaimed_external {
414 if let Some(instrument) = self.get_instrument(&report.instrument_id) {
415 let external_events =
416 self.handle_external_order(report, &mass_status.account_id, &instrument);
417 if !external_events.is_empty() {
418 external_orders_created += 1;
419 if report.order_status.is_open() {
420 open_orders_initialized += 1;
421 }
422 events.extend(external_events);
423 }
424 } else {
425 orders_skipped_no_instrument += 1;
426 }
427 }
428 }
429
430 let fill_reports = mass_status.fill_reports();
432 let mut all_fills: Vec<&FillReport> = fill_reports.values().flatten().collect();
433 all_fills.sort_by_key(|f| f.ts_event);
434
435 for fill in all_fills {
436 if let Some(client_order_id) = &fill.client_order_id
437 && let Some(order) = self.get_order(client_order_id)
438 {
439 let mut order = order;
440 let instrument_id = order.instrument_id();
441
442 if let Some(instrument) = self.get_instrument(&instrument_id)
443 && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
444 {
445 fills_applied += 1;
446 events.push(event);
447 }
448 }
449 }
450
451 if orders_skipped_no_instrument > 0 {
452 log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
453 }
454
455 if orders_skipped_duplicate > 0 {
456 log::debug!("{orders_skipped_duplicate} orders skipped (already in sync)");
457 }
458
459 log::info!(
460 color = LogColor::Blue as u8;
461 "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}, skipped={orders_skipped_duplicate}",
462 );
463
464 events.sort_by_key(|e| e.ts_event());
466
467 events
468 }
469
470 pub fn reconcile_report(
476 &mut self,
477 report: ExecutionReport,
478 ) -> anyhow::Result<Vec<OrderEventAny>> {
479 let mut events = Vec::new();
480
481 self.clear_recon_tracking(&report.client_order_id, true);
482
483 if let Some(order) = self.get_order(&report.client_order_id) {
484 let Some(account_id) = order.account_id() else {
485 log::error!("Cannot process fill report: order has no account_id");
486 return Ok(vec![]);
487 };
488 let Some(venue_order_id) = report.venue_order_id else {
489 log::error!("Cannot process fill report: report has no venue_order_id");
490 return Ok(vec![]);
491 };
492 let mut order_report = OrderStatusReport::new(
493 account_id,
494 order.instrument_id(),
495 Some(report.client_order_id),
496 venue_order_id,
497 order.order_side(),
498 order.order_type(),
499 order.time_in_force(),
500 report.status,
501 order.quantity(),
502 report.filled_qty,
503 report.ts_event, report.ts_event, self.clock.borrow().timestamp_ns(),
506 Some(UUID4::new()),
507 );
508
509 if let Some(avg_px) = report.avg_px {
510 order_report = order_report.with_avg_px(avg_px)?;
511 }
512
513 let instrument = self.get_instrument(&order.instrument_id());
514 if let Some(event) =
515 self.reconcile_order_report(&order, &order_report, instrument.as_ref())
516 {
517 events.push(event);
518 }
519 }
520
521 Ok(events)
522 }
523
524 pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
526 let mut events = Vec::new();
527 let current_time = self.clock.borrow().timestamp_ns();
528 let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
529
530 let mut to_check = Vec::new();
531 for (client_order_id, check) in &self.inflight_checks {
532 if current_time - check.ts_submitted > threshold_ns {
533 to_check.push(*client_order_id);
534 }
535 }
536
537 for client_order_id in to_check {
538 if self
539 .config
540 .filtered_client_order_ids
541 .contains(&client_order_id)
542 {
543 continue;
544 }
545
546 if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
547 if let Some(last_query_ts) = check.last_query_ts
548 && current_time - last_query_ts < threshold_ns
549 {
550 continue;
551 }
552
553 check.retry_count += 1;
554 check.last_query_ts = Some(current_time);
555 self.ts_last_query.insert(client_order_id, current_time);
556 self.recon_check_retries
557 .insert(client_order_id, check.retry_count);
558
559 if check.retry_count >= self.config.inflight_max_retries {
560 let ts_now = self.clock.borrow().timestamp_ns();
562 if let Some(order) = self.get_order(&client_order_id)
563 && let Some(event) =
564 create_reconciliation_rejected(&order, Some("INFLIGHT_TIMEOUT"), ts_now)
565 {
566 events.push(event);
567 }
568 self.clear_recon_tracking(&client_order_id, true);
570 }
571 }
572 }
573
574 events
575 }
576
577 pub async fn check_open_orders(
587 &mut self,
588 clients: &[Rc<dyn ExecutionClient>],
589 ) -> Vec<OrderEventAny> {
590 log::debug!("Checking order consistency between cached-state and venues");
591
592 let filtered_orders: Vec<OrderAny> = {
593 let cache = self.cache.borrow();
594 let open_orders = cache.orders_open(None, None, None, None);
595
596 if self.config.reconciliation_instrument_ids.is_empty() {
597 open_orders.iter().map(|o| (*o).clone()).collect()
598 } else {
599 open_orders
600 .iter()
601 .filter(|o| {
602 self.config
603 .reconciliation_instrument_ids
604 .contains(&o.instrument_id())
605 })
606 .map(|o| (*o).clone())
607 .collect()
608 }
609 };
610
611 log::debug!(
612 "Found {} order{} open in cache",
613 filtered_orders.len(),
614 if filtered_orders.len() == 1 { "" } else { "s" }
615 );
616
617 let mut all_reports = Vec::new();
618 let mut venue_reported_ids = AHashSet::new();
619
620 for client in clients {
621 let cmd = GenerateOrderStatusReports::new(
622 UUID4::new(),
623 self.clock.borrow().timestamp_ns(),
624 true, None, None, None, None, None, );
631
632 match client.generate_order_status_reports(&cmd).await {
633 Ok(reports) => {
634 for report in reports {
635 if let Some(client_order_id) = &report.client_order_id {
636 venue_reported_ids.insert(*client_order_id);
637 }
638 all_reports.push(report);
639 }
640 }
641 Err(e) => {
642 log::error!(
643 "Failed to query order reports from {}: {e}",
644 client.client_id()
645 );
646 }
647 }
648 }
649
650 let mut events = Vec::new();
652 for report in all_reports {
653 if let Some(client_order_id) = &report.client_order_id
654 && let Some(order) = self.get_order(client_order_id)
655 {
656 let instrument = self.get_instrument(&report.instrument_id);
657 if let Some(event) =
658 self.reconcile_order_report(&order, &report, instrument.as_ref())
659 {
660 events.push(event);
661 }
662 }
663 }
664
665 if !self.config.open_check_open_only {
667 let cached_ids: AHashSet<ClientOrderId> = filtered_orders
668 .iter()
669 .map(|o| o.client_order_id())
670 .collect();
671 let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
672 .difference(&venue_reported_ids)
673 .copied()
674 .collect();
675
676 for client_order_id in missing_at_venue {
677 events.extend(self.handle_missing_order(client_order_id));
678 }
679 }
680
681 events
682 }
683
684 pub async fn check_positions_consistency(
694 &mut self,
695 clients: &[Rc<dyn ExecutionClient>],
696 ) -> Vec<OrderEventAny> {
697 log::debug!("Checking position consistency between cached-state and venues");
698
699 let open_positions = {
700 let cache = self.cache.borrow();
701 let positions = cache.positions_open(None, None, None, None);
702
703 if self.config.reconciliation_instrument_ids.is_empty() {
704 positions.iter().map(|p| (*p).clone()).collect()
705 } else {
706 positions
707 .iter()
708 .filter(|p| {
709 self.config
710 .reconciliation_instrument_ids
711 .contains(&p.instrument_id)
712 })
713 .map(|p| (*p).clone())
714 .collect::<Vec<_>>()
715 }
716 };
717
718 log::debug!(
719 "Found {} position{} to check",
720 open_positions.len(),
721 if open_positions.len() == 1 { "" } else { "s" }
722 );
723
724 let mut venue_positions = AHashMap::new();
726
727 for client in clients {
728 let cmd = GeneratePositionStatusReports::new(
729 UUID4::new(),
730 self.clock.borrow().timestamp_ns(),
731 None, None, None, None, None, );
737
738 match client.generate_position_status_reports(&cmd).await {
739 Ok(reports) => {
740 for report in reports {
741 venue_positions.insert(report.instrument_id, report);
742 }
743 }
744 Err(e) => {
745 log::error!(
746 "Failed to query position reports from {}: {e}",
747 client.client_id()
748 );
749 }
750 }
751 }
752
753 let mut events = Vec::new();
755
756 for position in &open_positions {
757 if !self.config.reconciliation_instrument_ids.is_empty()
759 && !self
760 .config
761 .reconciliation_instrument_ids
762 .contains(&position.instrument_id)
763 {
764 continue;
765 }
766
767 let venue_report = venue_positions.get(&position.instrument_id);
768
769 if let Some(discrepancy_events) =
770 self.check_position_discrepancy(position, venue_report)
771 {
772 events.extend(discrepancy_events);
773 }
774 }
775
776 events
777 }
778
779 pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
781 let ts_submitted = self.clock.borrow().timestamp_ns();
782 self.inflight_checks.insert(
783 client_order_id,
784 InflightCheck {
785 client_order_id,
786 ts_submitted,
787 retry_count: 0,
788 last_query_ts: None,
789 },
790 );
791 self.recon_check_retries.insert(client_order_id, 0);
792 self.ts_last_query.remove(&client_order_id);
793 self.order_local_activity_ns.remove(&client_order_id);
794 }
795
796 pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
798 self.order_local_activity_ns
799 .insert(client_order_id, ts_event);
800 }
801
802 pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
804 self.inflight_checks.remove(client_order_id);
805 self.recon_check_retries.remove(client_order_id);
806 if drop_last_query {
807 self.ts_last_query.remove(client_order_id);
808 }
809 self.order_local_activity_ns.remove(client_order_id);
810 }
811
812 pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
814 self.external_order_claims
815 .insert(instrument_id, strategy_id);
816 }
817
818 pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
820 self.position_local_activity_ns
821 .insert(instrument_id, ts_event);
822 }
823
824 pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
826 self.recent_fills_cache.contains_key(trade_id)
827 }
828
829 pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
831 let ts_now = self.clock.borrow().timestamp_ns();
832 self.recent_fills_cache.insert(trade_id, ts_now);
833 }
834
835 pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
839 let ts_now = self.clock.borrow().timestamp_ns();
840 let ttl_ns = (ttl_secs * 1_000_000_000.0) as u64;
841
842 self.recent_fills_cache
843 .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
844 }
845
846 pub fn purge_closed_orders(&mut self) {
848 let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
849 return;
850 };
851
852 let ts_now = self.clock.borrow().timestamp_ns();
853 let buffer_secs = (buffer_mins as u64) * 60;
854
855 self.cache
856 .borrow_mut()
857 .purge_closed_orders(ts_now, buffer_secs);
858 }
859
860 pub fn purge_closed_positions(&mut self) {
862 let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
863 return;
864 };
865
866 let ts_now = self.clock.borrow().timestamp_ns();
867 let buffer_secs = (buffer_mins as u64) * 60;
868
869 self.cache
870 .borrow_mut()
871 .purge_closed_positions(ts_now, buffer_secs);
872 }
873
874 pub fn purge_account_events(&mut self) {
876 let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
877 return;
878 };
879
880 let ts_now = self.clock.borrow().timestamp_ns();
881 let lookback_secs = (lookback_mins as u64) * 60;
882
883 self.cache
884 .borrow_mut()
885 .purge_account_events(ts_now, lookback_secs);
886 }
887
888 fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
891 self.cache.borrow().order(client_order_id).cloned()
892 }
893
894 fn get_order_by_venue_order_id(&self, venue_order_id: &VenueOrderId) -> Option<OrderAny> {
895 let cache = self.cache.borrow();
896 cache
897 .client_order_id(venue_order_id)
898 .and_then(|client_order_id| cache.order(client_order_id).cloned())
899 }
900
901 fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
902 self.cache.borrow().instrument(instrument_id).cloned()
903 }
904
905 fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
906 let mut events = Vec::new();
907
908 let Some(order) = self.get_order(&client_order_id) else {
909 return events;
910 };
911
912 let ts_now = self.clock.borrow().timestamp_ns();
913 let ts_last = order.ts_last();
914
915 if (ts_now - ts_last) < self.config.open_check_threshold_ns {
917 return events;
918 }
919
920 if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
922 && (ts_now - last_activity) < self.config.open_check_threshold_ns
923 {
924 return events;
925 }
926
927 let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
929 *retries += 1;
930
931 if *retries >= self.config.open_check_missing_retries {
933 log::warn!(
934 "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
935 );
936
937 let ts_now = self.clock.borrow().timestamp_ns();
938 if let Some(rejected) =
939 create_reconciliation_rejected(&order, Some("NOT_FOUND_AT_VENUE"), ts_now)
940 {
941 events.push(rejected);
942 }
943
944 self.clear_recon_tracking(&client_order_id, true);
945 } else {
946 log::debug!(
947 "Order {} not found at venue, retry {}/{}",
948 client_order_id,
949 retries,
950 self.config.open_check_missing_retries
951 );
952 }
953
954 events
955 }
956
957 fn check_position_discrepancy(
958 &mut self,
959 position: &Position,
960 venue_report: Option<&PositionStatusReport>,
961 ) -> Option<Vec<OrderEventAny>> {
962 let cached_signed_qty =
964 Decimal::from_f64_retain(position.signed_qty).unwrap_or(Decimal::ZERO);
965 let venue_signed_qty = venue_report.map_or(Decimal::ZERO, |r| r.signed_decimal_qty);
966
967 let tolerance = Decimal::from_str("0.00000001").unwrap();
968 if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
969 return None; }
971
972 let ts_now = self.clock.borrow().timestamp_ns();
974 if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
975 && (ts_now - last_activity) < self.config.position_check_threshold_ns
976 {
977 log::debug!(
978 "Skipping position reconciliation for {}: recent activity within threshold",
979 position.instrument_id
980 );
981 return None;
982 }
983
984 log::warn!(
985 "Position discrepancy detected for {}: cached_signed_qty={}, venue_signed_qty={}",
986 position.instrument_id,
987 cached_signed_qty,
988 venue_signed_qty
989 );
990
991 let instrument = self
992 .cache
993 .borrow()
994 .instrument(&position.instrument_id)?
995 .clone();
996
997 let account_id = position.account_id;
998 let instrument_id = position.instrument_id;
999
1000 let cached_avg_px = if position.avg_px_open > 0.0 {
1001 Some(Decimal::from_f64_retain(position.avg_px_open).unwrap_or(Decimal::ZERO))
1002 } else {
1003 None
1004 };
1005 let venue_avg_px = venue_report.and_then(|r| r.avg_px_open);
1006
1007 let crosses_zero = (cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1009 || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO);
1010
1011 if crosses_zero {
1012 return self.reconcile_cross_zero_position(
1014 &instrument,
1015 account_id,
1016 instrument_id,
1017 cached_signed_qty,
1018 cached_avg_px,
1019 venue_signed_qty,
1020 venue_avg_px,
1021 ts_now,
1022 );
1023 }
1024
1025 let qty_diff = venue_signed_qty - cached_signed_qty;
1026 let order_side = if qty_diff > Decimal::ZERO {
1027 OrderSide::Buy
1028 } else {
1029 OrderSide::Sell
1030 };
1031
1032 let reconciliation_px = calculate_reconciliation_price(
1033 cached_signed_qty,
1034 cached_avg_px,
1035 venue_signed_qty,
1036 venue_avg_px,
1037 );
1038
1039 let fill_px = reconciliation_px.or(venue_avg_px).or(cached_avg_px)?;
1040 let fill_qty = qty_diff.abs();
1041
1042 let ts_event = ts_now.as_u64();
1043 let venue_order_id = create_synthetic_venue_order_id(ts_event);
1044 let order_qty = Quantity::from_decimal_dp(fill_qty, instrument.size_precision()).ok()?;
1045
1046 let order_report = OrderStatusReport::new(
1047 account_id,
1048 instrument_id,
1049 None,
1050 venue_order_id,
1051 order_side,
1052 OrderType::Market,
1053 TimeInForce::Gtc,
1054 OrderStatus::Filled,
1055 order_qty,
1056 order_qty,
1057 ts_now,
1058 ts_now,
1059 ts_now,
1060 None,
1061 )
1062 .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1063 .ok()?;
1064
1065 log::info!(
1066 color = LogColor::Blue as u8;
1067 "Generating synthetic fill for position reconciliation {instrument_id}: side={order_side:?}, qty={fill_qty}, px={fill_px}",
1068 );
1069
1070 let events = self.handle_external_order(&order_report, &account_id, &instrument);
1071 Some(events)
1072 }
1073
1074 #[allow(clippy::too_many_arguments)]
1077 fn reconcile_cross_zero_position(
1078 &mut self,
1079 instrument: &InstrumentAny,
1080 account_id: AccountId,
1081 instrument_id: InstrumentId,
1082 cached_signed_qty: Decimal,
1083 cached_avg_px: Option<Decimal>,
1084 venue_signed_qty: Decimal,
1085 venue_avg_px: Option<Decimal>,
1086 ts_now: UnixNanos,
1087 ) -> Option<Vec<OrderEventAny>> {
1088 log::info!(
1089 color = LogColor::Blue as u8;
1090 "Position crosses zero for {instrument_id}: cached={cached_signed_qty}, venue={venue_signed_qty}. Splitting into two fills",
1091 );
1092
1093 let mut all_events = Vec::new();
1094
1095 let close_qty = cached_signed_qty.abs();
1097 let close_side = if cached_signed_qty < Decimal::ZERO {
1098 OrderSide::Buy } else {
1100 OrderSide::Sell };
1102
1103 if let Some(close_px) = cached_avg_px {
1104 let close_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64());
1105 let close_order_qty =
1106 Quantity::from_decimal_dp(close_qty, instrument.size_precision()).ok()?;
1107
1108 let close_report = OrderStatusReport::new(
1109 account_id,
1110 instrument_id,
1111 None,
1112 close_venue_order_id,
1113 close_side,
1114 OrderType::Market,
1115 TimeInForce::Gtc,
1116 OrderStatus::Filled,
1117 close_order_qty,
1118 close_order_qty,
1119 ts_now,
1120 ts_now,
1121 ts_now,
1122 None,
1123 )
1124 .with_avg_px(close_px.to_f64().unwrap_or(0.0))
1125 .ok()?;
1126
1127 log::info!(
1128 color = LogColor::Blue as u8;
1129 "Generating close fill for cross-zero {instrument_id}: side={close_side:?}, qty={close_qty}, px={close_px}",
1130 );
1131
1132 let close_events = self.handle_external_order(&close_report, &account_id, instrument);
1133 all_events.extend(close_events);
1134 } else {
1135 log::warn!("Cannot close position for {instrument_id}: no cached average price");
1136 return None;
1137 }
1138
1139 let open_qty = venue_signed_qty.abs();
1141 let open_side = if venue_signed_qty > Decimal::ZERO {
1142 OrderSide::Buy } else {
1144 OrderSide::Sell };
1146
1147 if let Some(open_px) = venue_avg_px {
1148 let open_venue_order_id = create_synthetic_venue_order_id(ts_now.as_u64() + 1);
1149 let open_order_qty =
1150 Quantity::from_decimal_dp(open_qty, instrument.size_precision()).ok()?;
1151
1152 let open_report = OrderStatusReport::new(
1153 account_id,
1154 instrument_id,
1155 None,
1156 open_venue_order_id,
1157 open_side,
1158 OrderType::Market,
1159 TimeInForce::Gtc,
1160 OrderStatus::Filled,
1161 open_order_qty,
1162 open_order_qty,
1163 ts_now,
1164 ts_now,
1165 ts_now,
1166 None,
1167 )
1168 .with_avg_px(open_px.to_f64().unwrap_or(0.0))
1169 .ok()?;
1170
1171 log::info!(
1172 color = LogColor::Blue as u8;
1173 "Generating open fill for cross-zero {instrument_id}: side={open_side:?}, qty={open_qty}, px={open_px}",
1174 );
1175
1176 let open_events = self.handle_external_order(&open_report, &account_id, instrument);
1177 all_events.extend(open_events);
1178 } else {
1179 log::warn!("Cannot open new position for {instrument_id}: no venue average price");
1180 return Some(all_events);
1181 }
1182
1183 Some(all_events)
1184 }
1185
1186 fn reconcile_order_report(
1187 &self,
1188 order: &OrderAny,
1189 report: &OrderStatusReport,
1190 instrument: Option<&InstrumentAny>,
1191 ) -> Option<OrderEventAny> {
1192 let ts_now = self.clock.borrow().timestamp_ns();
1193 reconcile_order_report(order, report, instrument, ts_now)
1194 }
1195
1196 fn handle_external_order(
1197 &mut self,
1198 report: &OrderStatusReport,
1199 account_id: &AccountId,
1200 instrument: &InstrumentAny,
1201 ) -> Vec<OrderEventAny> {
1202 let (strategy_id, tags) =
1203 if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
1204 let order_id = report
1205 .client_order_id
1206 .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1207 log::info!(
1208 color = LogColor::Blue as u8;
1209 "External order {} for {} claimed by strategy {}",
1210 order_id,
1211 report.instrument_id,
1212 claimed_strategy,
1213 );
1214 (*claimed_strategy, None)
1215 } else {
1216 (
1218 StrategyId::from("EXTERNAL"),
1219 Some(vec![Ustr::from("VENUE")]),
1220 )
1221 };
1222
1223 let client_order_id = report
1224 .client_order_id
1225 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1226
1227 let ts_now = self.clock.borrow().timestamp_ns();
1228
1229 let initialized = OrderInitialized::new(
1230 self.config.trader_id,
1231 strategy_id,
1232 report.instrument_id,
1233 client_order_id,
1234 report.order_side,
1235 report.order_type,
1236 report.quantity,
1237 report.time_in_force,
1238 report.post_only,
1239 report.reduce_only,
1240 false, true, UUID4::new(),
1243 ts_now,
1244 ts_now,
1245 report.price,
1246 report.trigger_price,
1247 report.trigger_type,
1248 report.limit_offset,
1249 report.trailing_offset,
1250 Some(report.trailing_offset_type),
1251 report.expire_time,
1252 report.display_qty,
1253 None, None, Some(report.contingency_type),
1256 report.order_list_id,
1257 report.linked_order_ids.clone(),
1258 report.parent_order_id,
1259 None, None, None, tags,
1263 );
1264
1265 let events = vec![OrderEventAny::Initialized(initialized)];
1266 let order = match OrderAny::from_events(events) {
1267 Ok(order) => order,
1268 Err(e) => {
1269 log::error!("Failed to create order from report: {e}");
1270 return Vec::new();
1271 }
1272 };
1273
1274 {
1275 let mut cache = self.cache.borrow_mut();
1276 if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1277 log::error!("Failed to add external order to cache: {e}");
1278 return Vec::new();
1279 }
1280
1281 if let Err(e) =
1282 cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
1283 {
1284 log::warn!("Failed to add venue order ID index: {e}");
1285 }
1286 }
1287
1288 log::info!(
1289 color = LogColor::Blue as u8;
1290 "Created external order {} ({}) for {} [{}]",
1291 client_order_id,
1292 report.venue_order_id,
1293 report.instrument_id,
1294 report.order_status,
1295 );
1296
1297 let ts_now = self.clock.borrow().timestamp_ns();
1298 generate_external_order_status_events(&order, report, account_id, instrument, ts_now)
1299 }
1300
1301 fn deduplicate_order_reports<'a>(
1306 &self,
1307 reports: impl Iterator<Item = &'a OrderStatusReport>,
1308 ) -> AHashMap<VenueOrderId, &'a OrderStatusReport> {
1309 let mut best_reports: AHashMap<VenueOrderId, &'a OrderStatusReport> = AHashMap::new();
1310
1311 for report in reports {
1312 let dominated = best_reports
1313 .get(&report.venue_order_id)
1314 .is_some_and(|existing| self.is_more_advanced(existing, report));
1315
1316 if !dominated {
1317 best_reports.insert(report.venue_order_id, report);
1318 }
1319 }
1320
1321 best_reports
1322 }
1323
1324 fn is_more_advanced(&self, a: &OrderStatusReport, b: &OrderStatusReport) -> bool {
1326 if a.filled_qty > b.filled_qty {
1327 return true;
1328 }
1329 if a.filled_qty < b.filled_qty {
1330 return false;
1331 }
1332
1333 Self::status_priority(a.order_status) > Self::status_priority(b.order_status)
1335 }
1336
1337 const fn status_priority(status: OrderStatus) -> u8 {
1339 match status {
1340 OrderStatus::Initialized | OrderStatus::Submitted | OrderStatus::Emulated => 0,
1341 OrderStatus::Released | OrderStatus::Denied => 1,
1342 OrderStatus::Accepted | OrderStatus::PendingUpdate | OrderStatus::PendingCancel => 2,
1343 OrderStatus::Triggered => 3,
1344 OrderStatus::PartiallyFilled => 4,
1345 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => 5,
1346 OrderStatus::Filled => 6,
1347 }
1348 }
1349
1350 fn is_exact_order_match(&self, order: &OrderAny, report: &OrderStatusReport) -> bool {
1352 order.status() == report.order_status
1353 && order.filled_qty() == report.filled_qty
1354 && !should_reconciliation_update(order, report)
1355 }
1356
1357 fn create_order_fill(
1358 &mut self,
1359 order: &mut OrderAny,
1360 fill: &FillReport,
1361 instrument: &InstrumentAny,
1362 ) -> Option<OrderEventAny> {
1363 if self.processed_fills.contains_key(&fill.trade_id) {
1364 return None;
1365 }
1366
1367 self.processed_fills
1368 .insert(fill.trade_id, order.client_order_id());
1369
1370 Some(OrderEventAny::Filled(OrderFilled::new(
1371 order.trader_id(),
1372 order.strategy_id(),
1373 order.instrument_id(),
1374 order.client_order_id(),
1375 fill.venue_order_id,
1376 fill.account_id,
1377 fill.trade_id,
1378 fill.order_side,
1379 order.order_type(),
1380 fill.last_qty,
1381 fill.last_px,
1382 instrument.quote_currency(),
1383 fill.liquidity_side,
1384 fill.report_id,
1385 fill.ts_event,
1386 self.clock.borrow().timestamp_ns(),
1387 false,
1388 fill.venue_position_id,
1389 Some(fill.commission),
1390 )))
1391 }
1392}