1use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25 cache::Cache,
26 clock::Clock,
27 messages::execution::report::{GenerateOrderStatusReport, GeneratePositionReports},
28};
29use nautilus_core::{UUID4, UnixNanos};
30use nautilus_model::{
31 enums::OrderStatus,
32 events::{
33 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
34 OrderTriggered,
35 },
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 orders::{Order, OrderAny},
39 position::Position,
40 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
41 types::Quantity,
42};
43use rust_decimal::Decimal;
44use ustr::Ustr;
45
46use crate::{config::LiveExecEngineConfig, execution::client::LiveExecutionClient};
47
48#[derive(Debug, Clone)]
50pub struct ExecutionManagerConfig {
51 pub reconciliation: bool,
53 pub reconciliation_startup_delay_secs: f64,
55 pub lookback_mins: Option<u64>,
57 pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
59 pub filter_unclaimed_external: bool,
61 pub filter_position_reports: bool,
63 pub filtered_client_order_ids: AHashSet<ClientOrderId>,
65 pub generate_missing_orders: bool,
67 pub inflight_check_interval_ms: u32,
69 pub inflight_threshold_ms: u64,
71 pub inflight_max_retries: u32,
73 pub open_check_interval_secs: Option<f64>,
75 pub open_check_lookback_mins: Option<u64>,
77 pub open_check_threshold_ns: u64,
79 pub open_check_missing_retries: u32,
81 pub open_check_open_only: bool,
83 pub max_single_order_queries_per_cycle: u32,
85 pub single_order_query_delay_ms: u32,
87 pub position_check_interval_secs: Option<f64>,
89 pub position_check_lookback_mins: u64,
91 pub position_check_threshold_ns: u64,
93 pub purge_closed_orders_buffer_mins: Option<u32>,
95 pub purge_closed_positions_buffer_mins: Option<u32>,
97 pub purge_account_events_lookback_mins: Option<u32>,
99 pub purge_from_database: bool,
101}
102
103impl Default for ExecutionManagerConfig {
104 fn default() -> Self {
105 Self {
106 reconciliation: true,
107 reconciliation_startup_delay_secs: 10.0,
108 lookback_mins: Some(60),
109 reconciliation_instrument_ids: AHashSet::new(),
110 filter_unclaimed_external: false,
111 filter_position_reports: false,
112 filtered_client_order_ids: AHashSet::new(),
113 generate_missing_orders: true,
114 inflight_check_interval_ms: 2_000,
115 inflight_threshold_ms: 5_000,
116 inflight_max_retries: 5,
117 open_check_interval_secs: None,
118 open_check_lookback_mins: Some(60),
119 open_check_threshold_ns: 5_000_000_000,
120 open_check_missing_retries: 5,
121 open_check_open_only: true,
122 max_single_order_queries_per_cycle: 5,
123 single_order_query_delay_ms: 100,
124 position_check_interval_secs: None,
125 position_check_lookback_mins: 60,
126 position_check_threshold_ns: 60_000_000_000,
127 purge_closed_orders_buffer_mins: None,
128 purge_closed_positions_buffer_mins: None,
129 purge_account_events_lookback_mins: None,
130 purge_from_database: false,
131 }
132 }
133}
134
135impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
136 fn from(config: &LiveExecEngineConfig) -> Self {
137 let filtered_client_order_ids: AHashSet<ClientOrderId> = config
138 .filtered_client_order_ids
139 .clone()
140 .unwrap_or_default()
141 .into_iter()
142 .map(|value| ClientOrderId::from(value.as_str()))
143 .collect();
144
145 let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
146 .reconciliation_instrument_ids
147 .clone()
148 .unwrap_or_default()
149 .into_iter()
150 .map(|value| InstrumentId::from(value.as_str()))
151 .collect();
152
153 Self {
154 reconciliation: config.reconciliation,
155 reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
156 lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
157 reconciliation_instrument_ids,
158 filter_unclaimed_external: config.filter_unclaimed_external_orders,
159 filter_position_reports: config.filter_position_reports,
160 filtered_client_order_ids,
161 generate_missing_orders: config.generate_missing_orders,
162 inflight_check_interval_ms: config.inflight_check_interval_ms,
163 inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
164 inflight_max_retries: config.inflight_check_retries,
165 open_check_interval_secs: config.open_check_interval_secs,
166 open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
167 open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
168 open_check_missing_retries: config.open_check_missing_retries,
169 open_check_open_only: config.open_check_open_only,
170 max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
171 single_order_query_delay_ms: config.single_order_query_delay_ms,
172 position_check_interval_secs: config.position_check_interval_secs,
173 position_check_lookback_mins: config.position_check_lookback_mins as u64,
174 position_check_threshold_ns: (config.position_check_threshold_ms as u64) * 1_000_000,
175 purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
176 purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
177 purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
178 purge_from_database: config.purge_from_database,
179 }
180 }
181}
182
183#[derive(Debug, Clone)]
186pub struct ExecutionReport {
187 pub client_order_id: ClientOrderId,
188 pub venue_order_id: Option<VenueOrderId>,
189 pub status: OrderStatus,
190 pub filled_qty: Quantity,
191 pub avg_px: Option<f64>,
192 pub ts_event: UnixNanos,
193}
194
195#[derive(Debug, Clone)]
197struct InflightCheck {
198 #[allow(dead_code)]
199 pub client_order_id: ClientOrderId,
200 pub ts_submitted: UnixNanos,
201 pub retry_count: u32,
202 pub last_query_ts: Option<UnixNanos>,
203}
204
205#[derive(Clone)]
228pub struct ExecutionManager {
229 clock: Rc<RefCell<dyn Clock>>,
230 cache: Rc<RefCell<Cache>>,
231 config: ExecutionManagerConfig,
232 inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
233 external_order_claims: AHashMap<InstrumentId, StrategyId>,
234 processed_fills: AHashMap<TradeId, ClientOrderId>,
235 recon_check_retries: AHashMap<ClientOrderId, u32>,
236 ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
237 order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
238 position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
239 recent_fills_cache: AHashMap<TradeId, UnixNanos>,
240}
241
242impl Debug for ExecutionManager {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 f.debug_struct(stringify!(ExecutionManager))
245 .field("config", &self.config)
246 .field("inflight_checks", &self.inflight_checks)
247 .field("external_order_claims", &self.external_order_claims)
248 .field("processed_fills", &self.processed_fills)
249 .field("recon_check_retries", &self.recon_check_retries)
250 .finish()
251 }
252}
253
254impl ExecutionManager {
255 pub fn new(
257 clock: Rc<RefCell<dyn Clock>>,
258 cache: Rc<RefCell<Cache>>,
259 config: ExecutionManagerConfig,
260 ) -> Self {
261 Self {
262 clock,
263 cache,
264 config,
265 inflight_checks: AHashMap::new(),
266 external_order_claims: AHashMap::new(),
267 processed_fills: AHashMap::new(),
268 recon_check_retries: AHashMap::new(),
269 ts_last_query: AHashMap::new(),
270 order_local_activity_ns: AHashMap::new(),
271 position_local_activity_ns: AHashMap::new(),
272 recent_fills_cache: AHashMap::new(),
273 }
274 }
275
276 pub async fn reconcile_execution_mass_status(
278 &mut self,
279 mass_status: ExecutionMassStatus,
280 ) -> Vec<OrderEventAny> {
281 let mut events = Vec::new();
282
283 for report in mass_status.order_reports().values() {
285 if let Some(client_order_id) = &report.client_order_id {
286 if let Some(order) = self.get_order(client_order_id) {
287 let mut order = order;
288 if let Some(event) = self.reconcile_order_report(&mut order, report) {
289 events.push(event);
290 }
291 }
292 } else if !self.config.filter_unclaimed_external
293 && let Some(event) = self.handle_external_order(report, &mass_status.account_id)
294 {
295 events.push(event);
296 }
297 }
298
299 for fills in mass_status.fill_reports().values() {
301 for fill in fills {
302 if let Some(client_order_id) = &fill.client_order_id
303 && let Some(order) = self.get_order(client_order_id)
304 {
305 let mut order = order;
306 let instrument_id = order.instrument_id();
307
308 if let Some(instrument) = self.get_instrument(&instrument_id)
309 && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
310 {
311 events.push(event);
312 }
313 }
314 }
315 }
316
317 events
318 }
319
320 pub fn reconcile_report(
326 &mut self,
327 report: ExecutionReport,
328 ) -> anyhow::Result<Vec<OrderEventAny>> {
329 let mut events = Vec::new();
330
331 self.clear_recon_tracking(&report.client_order_id, true);
332
333 if let Some(order) = self.get_order(&report.client_order_id) {
334 let mut order = order;
335 let mut order_report = OrderStatusReport::new(
336 order.account_id().unwrap_or_default(),
337 order.instrument_id(),
338 Some(report.client_order_id),
339 report.venue_order_id.unwrap_or_default(),
340 order.order_side(),
341 order.order_type(),
342 order.time_in_force(),
343 report.status,
344 order.quantity(),
345 report.filled_qty,
346 report.ts_event, report.ts_event, self.clock.borrow().timestamp_ns(),
349 Some(UUID4::new()),
350 );
351
352 if let Some(avg_px) = report.avg_px {
353 order_report = order_report.with_avg_px(avg_px)?;
354 }
355
356 if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
357 events.push(event);
358 }
359 }
360
361 Ok(events)
362 }
363
364 pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
366 let mut events = Vec::new();
367 let current_time = self.clock.borrow().timestamp_ns();
368 let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
369
370 let mut to_check = Vec::new();
371 for (client_order_id, check) in &self.inflight_checks {
372 if current_time - check.ts_submitted > threshold_ns {
373 to_check.push(*client_order_id);
374 }
375 }
376
377 for client_order_id in to_check {
378 if self
379 .config
380 .filtered_client_order_ids
381 .contains(&client_order_id)
382 {
383 continue;
384 }
385
386 if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
387 if let Some(last_query_ts) = check.last_query_ts
388 && current_time - last_query_ts < threshold_ns
389 {
390 continue;
391 }
392
393 check.retry_count += 1;
394 check.last_query_ts = Some(current_time);
395 self.ts_last_query.insert(client_order_id, current_time);
396 self.recon_check_retries
397 .insert(client_order_id, check.retry_count);
398
399 if check.retry_count >= self.config.inflight_max_retries {
400 if let Some(order) = self.get_order(&client_order_id) {
402 events.push(self.create_order_rejected(&order, Some("INFLIGHT_TIMEOUT")));
403 }
404 self.clear_recon_tracking(&client_order_id, true);
406 }
407 }
408 }
409
410 events
411 }
412
413 pub async fn check_open_orders(
423 &mut self,
424 clients: &[Rc<dyn LiveExecutionClient>],
425 ) -> Vec<OrderEventAny> {
426 log::debug!("Checking order consistency between cached-state and venues");
427
428 let filtered_orders: Vec<OrderAny> = {
429 let cache = self.cache.borrow();
430 let open_orders = cache.orders_open(None, None, None, None);
431
432 if !self.config.reconciliation_instrument_ids.is_empty() {
433 open_orders
434 .iter()
435 .filter(|o| {
436 self.config
437 .reconciliation_instrument_ids
438 .contains(&o.instrument_id())
439 })
440 .map(|o| (*o).clone())
441 .collect()
442 } else {
443 open_orders.iter().map(|o| (*o).clone()).collect()
444 }
445 };
446
447 log::debug!(
448 "Found {} order{} open in cache",
449 filtered_orders.len(),
450 if filtered_orders.len() == 1 { "" } else { "s" }
451 );
452
453 let mut all_reports = Vec::new();
454 let mut venue_reported_ids = AHashSet::new();
455
456 for client in clients {
457 let cmd = GenerateOrderStatusReport::new(
458 UUID4::new(),
459 self.clock.borrow().timestamp_ns(),
460 None, None, None, );
464
465 match client.generate_order_status_reports(&cmd).await {
466 Ok(reports) => {
467 for report in reports {
468 if let Some(client_order_id) = &report.client_order_id {
469 venue_reported_ids.insert(*client_order_id);
470 }
471 all_reports.push(report);
472 }
473 }
474 Err(e) => {
475 log::error!(
476 "Failed to query order reports from {}: {e}",
477 client.client_id()
478 );
479 }
480 }
481 }
482
483 let mut events = Vec::new();
485 for report in all_reports {
486 if let Some(client_order_id) = &report.client_order_id
487 && let Some(mut order) = self.get_order(client_order_id)
488 && let Some(event) = self.reconcile_order_report(&mut order, &report)
489 {
490 events.push(event);
491 }
492 }
493
494 if !self.config.open_check_open_only {
496 let cached_ids: AHashSet<ClientOrderId> = filtered_orders
497 .iter()
498 .map(|o| o.client_order_id())
499 .collect();
500 let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
501 .difference(&venue_reported_ids)
502 .copied()
503 .collect();
504
505 for client_order_id in missing_at_venue {
506 events.extend(self.handle_missing_order(client_order_id));
507 }
508 }
509
510 events
511 }
512
513 pub async fn check_positions_consistency(
523 &mut self,
524 clients: &[Rc<dyn LiveExecutionClient>],
525 ) -> Vec<OrderEventAny> {
526 log::debug!("Checking position consistency between cached-state and venues");
527
528 let open_positions = {
529 let cache = self.cache.borrow();
530 let positions = cache.positions_open(None, None, None, None);
531
532 if !self.config.reconciliation_instrument_ids.is_empty() {
533 positions
534 .iter()
535 .filter(|p| {
536 self.config
537 .reconciliation_instrument_ids
538 .contains(&p.instrument_id)
539 })
540 .map(|p| (*p).clone())
541 .collect::<Vec<_>>()
542 } else {
543 positions.iter().map(|p| (*p).clone()).collect()
544 }
545 };
546
547 log::debug!(
548 "Found {} position{} to check",
549 open_positions.len(),
550 if open_positions.len() == 1 { "" } else { "s" }
551 );
552
553 let mut venue_positions = AHashMap::new();
555
556 for client in clients {
557 let cmd = GeneratePositionReports::new(
558 UUID4::new(),
559 self.clock.borrow().timestamp_ns(),
560 None, None, None, );
564
565 match client.generate_position_status_reports(&cmd).await {
566 Ok(reports) => {
567 for report in reports {
568 venue_positions.insert(report.instrument_id, report);
569 }
570 }
571 Err(e) => {
572 log::error!(
573 "Failed to query position reports from {}: {e}",
574 client.client_id()
575 );
576 }
577 }
578 }
579
580 let mut events = Vec::new();
582
583 for position in &open_positions {
584 if !self.config.reconciliation_instrument_ids.is_empty()
586 && !self
587 .config
588 .reconciliation_instrument_ids
589 .contains(&position.instrument_id)
590 {
591 continue;
592 }
593
594 let venue_report = venue_positions.get(&position.instrument_id);
595
596 if let Some(discrepancy_events) =
597 self.check_position_discrepancy(position, venue_report)
598 {
599 events.extend(discrepancy_events);
600 }
601 }
602
603 events
604 }
605
606 pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
608 let ts_submitted = self.clock.borrow().timestamp_ns();
609 self.inflight_checks.insert(
610 client_order_id,
611 InflightCheck {
612 client_order_id,
613 ts_submitted,
614 retry_count: 0,
615 last_query_ts: None,
616 },
617 );
618 self.recon_check_retries.insert(client_order_id, 0);
619 self.ts_last_query.remove(&client_order_id);
620 self.order_local_activity_ns.remove(&client_order_id);
621 }
622
623 pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
625 self.order_local_activity_ns
626 .insert(client_order_id, ts_event);
627 }
628
629 pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
631 self.inflight_checks.remove(client_order_id);
632 self.recon_check_retries.remove(client_order_id);
633 if drop_last_query {
634 self.ts_last_query.remove(client_order_id);
635 }
636 self.order_local_activity_ns.remove(client_order_id);
637 }
638
639 pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
641 self.external_order_claims
642 .insert(instrument_id, strategy_id);
643 }
644
645 pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
647 self.position_local_activity_ns
648 .insert(instrument_id, ts_event);
649 }
650
651 pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
653 self.recent_fills_cache.contains_key(trade_id)
654 }
655
656 pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
658 let ts_now = self.clock.borrow().timestamp_ns();
659 self.recent_fills_cache.insert(trade_id, ts_now);
660 }
661
662 pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
666 let ts_now = self.clock.borrow().timestamp_ns();
667 let ttl_ns = (ttl_secs * 1_000_000_000.0) as u64;
668
669 self.recent_fills_cache
670 .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
671 }
672
673 pub fn purge_closed_orders(&mut self) {
675 let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
676 return;
677 };
678
679 let ts_now = self.clock.borrow().timestamp_ns();
680 let buffer_secs = (buffer_mins as u64) * 60;
681
682 self.cache
683 .borrow_mut()
684 .purge_closed_orders(ts_now, buffer_secs);
685 }
686
687 pub fn purge_closed_positions(&mut self) {
689 let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
690 return;
691 };
692
693 let ts_now = self.clock.borrow().timestamp_ns();
694 let buffer_secs = (buffer_mins as u64) * 60;
695
696 self.cache
697 .borrow_mut()
698 .purge_closed_positions(ts_now, buffer_secs);
699 }
700
701 pub fn purge_account_events(&mut self) {
703 let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
704 return;
705 };
706
707 let ts_now = self.clock.borrow().timestamp_ns();
708 let lookback_secs = (lookback_mins as u64) * 60;
709
710 self.cache
711 .borrow_mut()
712 .purge_account_events(ts_now, lookback_secs);
713 }
714
715 fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
718 self.cache.borrow().order(client_order_id).cloned()
719 }
720
721 fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
722 self.cache.borrow().instrument(instrument_id).cloned()
723 }
724
725 fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
726 let mut events = Vec::new();
727
728 let Some(order) = self.get_order(&client_order_id) else {
729 return events;
730 };
731
732 let ts_now = self.clock.borrow().timestamp_ns();
733 let ts_last = order.ts_last();
734
735 if (ts_now - ts_last) < self.config.open_check_threshold_ns {
737 return events;
738 }
739
740 if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
742 && (ts_now - last_activity) < self.config.open_check_threshold_ns
743 {
744 return events;
745 }
746
747 let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
749 *retries += 1;
750
751 if *retries >= self.config.open_check_missing_retries {
753 log::warn!(
754 "Order {} not found at venue after {} retries, marking as REJECTED",
755 client_order_id,
756 retries
757 );
758
759 let rejected = self.create_order_rejected(&order, Some("NOT_FOUND_AT_VENUE"));
760 events.push(rejected);
761
762 self.clear_recon_tracking(&client_order_id, true);
763 } else {
764 log::debug!(
765 "Order {} not found at venue, retry {}/{}",
766 client_order_id,
767 retries,
768 self.config.open_check_missing_retries
769 );
770 }
771
772 events
773 }
774
775 fn check_position_discrepancy(
776 &mut self,
777 position: &Position,
778 venue_report: Option<&PositionStatusReport>,
779 ) -> Option<Vec<OrderEventAny>> {
780 let cached_qty = position.quantity.as_decimal();
781
782 let venue_qty = if let Some(report) = venue_report {
783 report.quantity.as_decimal()
784 } else {
785 Decimal::ZERO
786 };
787
788 let tolerance = Decimal::from_str("0.00000001").unwrap();
790 if (cached_qty - venue_qty).abs() <= tolerance {
791 return None; }
793
794 let ts_now = self.clock.borrow().timestamp_ns();
796 if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
797 && (ts_now - last_activity) < self.config.position_check_threshold_ns
798 {
799 log::debug!(
800 "Skipping position reconciliation for {}: recent activity within threshold",
801 position.instrument_id
802 );
803 return None;
804 }
805
806 log::warn!(
807 "Position discrepancy detected for {}: cached_qty={}, venue_qty={}",
808 position.instrument_id,
809 cached_qty,
810 venue_qty
811 );
812
813 None
816 }
817
818 fn reconcile_order_report(
819 &mut self,
820 order: &mut OrderAny,
821 report: &OrderStatusReport,
822 ) -> Option<OrderEventAny> {
823 if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
825 return None; }
827
828 let event = match report.order_status {
829 OrderStatus::Accepted => self.create_order_accepted(order, report),
830 OrderStatus::Rejected => {
831 self.create_order_rejected(order, report.cancel_reason.as_deref())
832 }
833 OrderStatus::Triggered => self.create_order_triggered(order, report),
834 OrderStatus::Canceled => self.create_order_canceled(order, report),
835 OrderStatus::Expired => self.create_order_expired(order, report),
836 _ => return None,
837 };
838
839 Some(event)
840 }
841
842 fn handle_external_order(
843 &self,
844 _report: &OrderStatusReport,
845 _account_id: &AccountId,
846 ) -> Option<OrderEventAny> {
847 None
850 }
851
852 fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
853 OrderEventAny::Accepted(OrderAccepted::new(
854 order.trader_id(),
855 order.strategy_id(),
856 order.instrument_id(),
857 order.client_order_id(),
858 order.venue_order_id().unwrap_or(report.venue_order_id),
859 order.account_id().unwrap_or_default(),
860 UUID4::new(),
861 report.ts_accepted,
862 self.clock.borrow().timestamp_ns(),
863 false,
864 ))
865 }
866
867 fn create_order_rejected(&self, order: &OrderAny, reason: Option<&str>) -> OrderEventAny {
868 let reason = reason.unwrap_or("UNKNOWN");
869 OrderEventAny::Rejected(OrderRejected::new(
870 order.trader_id(),
871 order.strategy_id(),
872 order.instrument_id(),
873 order.client_order_id(),
874 order.account_id().unwrap_or_default(),
875 Ustr::from(reason),
876 UUID4::new(),
877 self.clock.borrow().timestamp_ns(),
878 self.clock.borrow().timestamp_ns(),
879 false,
880 false, ))
882 }
883
884 fn create_order_triggered(
885 &self,
886 order: &OrderAny,
887 report: &OrderStatusReport,
888 ) -> OrderEventAny {
889 OrderEventAny::Triggered(OrderTriggered::new(
890 order.trader_id(),
891 order.strategy_id(),
892 order.instrument_id(),
893 order.client_order_id(),
894 UUID4::new(),
895 report
896 .ts_triggered
897 .unwrap_or(self.clock.borrow().timestamp_ns()),
898 self.clock.borrow().timestamp_ns(),
899 false,
900 order.venue_order_id(),
901 order.account_id(),
902 ))
903 }
904
905 fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
906 OrderEventAny::Canceled(OrderCanceled::new(
907 order.trader_id(),
908 order.strategy_id(),
909 order.instrument_id(),
910 order.client_order_id(),
911 UUID4::new(),
912 report.ts_last,
913 self.clock.borrow().timestamp_ns(),
914 false,
915 order.venue_order_id(),
916 order.account_id(),
917 ))
918 }
919
920 fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
921 OrderEventAny::Expired(OrderExpired::new(
922 order.trader_id(),
923 order.strategy_id(),
924 order.instrument_id(),
925 order.client_order_id(),
926 UUID4::new(),
927 report.ts_last,
928 self.clock.borrow().timestamp_ns(),
929 false,
930 order.venue_order_id(),
931 order.account_id(),
932 ))
933 }
934
935 fn create_order_fill(
936 &mut self,
937 order: &mut OrderAny,
938 fill: &FillReport,
939 instrument: &InstrumentAny,
940 ) -> Option<OrderEventAny> {
941 if self.processed_fills.contains_key(&fill.trade_id) {
942 return None;
943 }
944
945 self.processed_fills
946 .insert(fill.trade_id, order.client_order_id());
947
948 Some(OrderEventAny::Filled(OrderFilled::new(
949 order.trader_id(),
950 order.strategy_id(),
951 order.instrument_id(),
952 order.client_order_id(),
953 fill.venue_order_id,
954 order.account_id().unwrap_or_default(),
955 fill.trade_id,
956 fill.order_side,
957 order.order_type(),
958 fill.last_qty,
959 fill.last_px,
960 instrument.quote_currency(),
961 fill.liquidity_side,
962 fill.report_id,
963 fill.ts_event,
964 self.clock.borrow().timestamp_ns(),
965 false,
966 fill.venue_position_id,
967 Some(fill.commission),
968 )))
969 }
970}
971
972#[cfg(test)]
977mod tests {
978 use std::{cell::RefCell, rc::Rc};
979
980 use nautilus_common::{cache::Cache, clock::TestClock};
981 use nautilus_core::{UUID4, UnixNanos};
982 use nautilus_model::{
983 enums::OrderStatus,
984 identifiers::{AccountId, ClientId, ClientOrderId, Venue, VenueOrderId},
985 reports::ExecutionMassStatus,
986 types::Quantity,
987 };
988 use rstest::rstest;
989
990 use super::*;
991
992 fn create_test_manager() -> ExecutionManager {
993 let clock = Rc::new(RefCell::new(TestClock::new()));
994 let cache = Rc::new(RefCell::new(Cache::default()));
995 let config = ExecutionManagerConfig::default();
996 ExecutionManager::new(clock, cache, config)
997 }
998
999 #[rstest]
1000 fn test_reconciliation_manager_new() {
1001 let manager = create_test_manager();
1002 assert_eq!(manager.inflight_checks.len(), 0);
1003 assert_eq!(manager.external_order_claims.len(), 0);
1004 assert_eq!(manager.processed_fills.len(), 0);
1005 }
1006
1007 #[rstest]
1008 fn test_register_inflight() {
1009 let mut manager = create_test_manager();
1010 let client_order_id = ClientOrderId::from("O-123456");
1011
1012 manager.register_inflight(client_order_id);
1013
1014 assert_eq!(manager.inflight_checks.len(), 1);
1015 assert!(manager.inflight_checks.contains_key(&client_order_id));
1016 }
1017
1018 #[rstest]
1019 fn test_claim_external_orders() {
1020 let mut manager = create_test_manager();
1021 let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1022 let strategy_id = StrategyId::from("STRATEGY-001");
1023
1024 manager.claim_external_orders(instrument_id, strategy_id);
1025
1026 assert_eq!(manager.external_order_claims.len(), 1);
1027 assert_eq!(
1028 manager.external_order_claims.get(&instrument_id),
1029 Some(&strategy_id)
1030 );
1031 }
1032
1033 #[rstest]
1034 fn test_reconcile_report_removes_from_inflight() {
1035 let mut manager = create_test_manager();
1036 let client_order_id = ClientOrderId::from("O-123456");
1037
1038 manager.register_inflight(client_order_id);
1039 assert_eq!(manager.inflight_checks.len(), 1);
1040
1041 let report = ExecutionReport {
1042 client_order_id,
1043 venue_order_id: Some(VenueOrderId::from("V-123456")),
1044 status: OrderStatus::Accepted,
1045 filled_qty: Quantity::from(0),
1046 avg_px: None,
1047 ts_event: UnixNanos::default(),
1048 };
1049
1050 manager.reconcile_report(report).unwrap();
1052 assert_eq!(manager.inflight_checks.len(), 0);
1053 }
1054
1055 #[rstest]
1056 fn test_check_inflight_orders_generates_rejection_after_max_retries() {
1057 let clock = Rc::new(RefCell::new(TestClock::new()));
1058 let cache = Rc::new(RefCell::new(Cache::default()));
1059 let config = ExecutionManagerConfig {
1060 inflight_threshold_ms: 100,
1061 inflight_max_retries: 2,
1062 ..ExecutionManagerConfig::default()
1063 };
1064 let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1065
1066 let client_order_id = ClientOrderId::from("O-123456");
1067 manager.register_inflight(client_order_id);
1068
1069 clock
1071 .borrow_mut()
1072 .advance_time(UnixNanos::from(200_000_000), true);
1073 let events = manager.check_inflight_orders();
1074 assert_eq!(events.len(), 0);
1075 let first_check = manager
1076 .inflight_checks
1077 .get(&client_order_id)
1078 .expect("inflight check present");
1079 assert_eq!(first_check.retry_count, 1);
1080 let first_query_ts = first_check.last_query_ts.expect("last query recorded");
1081
1082 clock
1084 .borrow_mut()
1085 .advance_time(UnixNanos::from(400_000_000), true);
1086 let events = manager.check_inflight_orders();
1087 assert_eq!(events.len(), 0); assert!(!manager.inflight_checks.contains_key(&client_order_id));
1089 assert!(clock.borrow().timestamp_ns() > first_query_ts);
1091 }
1092
1093 #[rstest]
1094 fn test_check_inflight_orders_skips_recent_query() {
1095 let clock = Rc::new(RefCell::new(TestClock::new()));
1096 let cache = Rc::new(RefCell::new(Cache::default()));
1097 let config = ExecutionManagerConfig {
1098 inflight_threshold_ms: 100,
1099 inflight_max_retries: 3,
1100 ..ExecutionManagerConfig::default()
1101 };
1102 let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1103
1104 let client_order_id = ClientOrderId::from("O-ABCDEF");
1105 manager.register_inflight(client_order_id);
1106
1107 clock
1109 .borrow_mut()
1110 .advance_time(UnixNanos::from(200_000_000), true);
1111 let events = manager.check_inflight_orders();
1112 assert!(events.is_empty());
1113 let initial_check = manager
1114 .inflight_checks
1115 .get(&client_order_id)
1116 .expect("inflight check retained");
1117 assert_eq!(initial_check.retry_count, 1);
1118 let last_query_ts = initial_check.last_query_ts.expect("last query recorded");
1119
1120 clock
1122 .borrow_mut()
1123 .advance_time(UnixNanos::from(250_000_000), true);
1124 let events = manager.check_inflight_orders();
1125 assert!(events.is_empty());
1126 let second_check = manager
1127 .inflight_checks
1128 .get(&client_order_id)
1129 .expect("inflight check retained");
1130 assert_eq!(second_check.retry_count, 1);
1131 assert_eq!(second_check.last_query_ts, Some(last_query_ts));
1132 }
1133
1134 #[rstest]
1135 fn test_check_inflight_orders_skips_filtered_ids() {
1136 let clock = Rc::new(RefCell::new(TestClock::new()));
1137 let cache = Rc::new(RefCell::new(Cache::default()));
1138 let filtered_id = ClientOrderId::from("O-FILTERED");
1139 let mut config = ExecutionManagerConfig::default();
1140 config.filtered_client_order_ids.insert(filtered_id);
1141 config.inflight_threshold_ms = 100;
1142 let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1143
1144 manager.register_inflight(filtered_id);
1145 clock
1146 .borrow_mut()
1147 .advance_time(UnixNanos::from(200_000_000), true);
1148 let events = manager.check_inflight_orders();
1149 assert!(events.is_empty());
1150 assert!(manager.inflight_checks.contains_key(&filtered_id));
1151 }
1152
1153 #[rstest]
1154 fn test_record_and_clear_tracking() {
1155 let mut manager = create_test_manager();
1156 let client_order_id = ClientOrderId::from("O-TRACK");
1157
1158 manager.register_inflight(client_order_id);
1159 let ts_now = UnixNanos::from(1_000_000);
1160 manager.record_local_activity(client_order_id, ts_now);
1161
1162 assert_eq!(
1163 manager
1164 .order_local_activity_ns
1165 .get(&client_order_id)
1166 .copied(),
1167 Some(ts_now)
1168 );
1169
1170 manager.clear_recon_tracking(&client_order_id, true);
1171 assert!(!manager.inflight_checks.contains_key(&client_order_id));
1172 assert!(
1173 !manager
1174 .order_local_activity_ns
1175 .contains_key(&client_order_id)
1176 );
1177 assert!(!manager.recon_check_retries.contains_key(&client_order_id));
1178 assert!(!manager.ts_last_query.contains_key(&client_order_id));
1179 }
1180
1181 #[tokio::test]
1182 async fn test_reconcile_execution_mass_status_with_empty() {
1183 let mut manager = create_test_manager();
1184 let account_id = AccountId::from("ACCOUNT-001");
1185 let venue = Venue::from("BINANCE");
1186
1187 let client_id = ClientId::from("BINANCE");
1188 let mass_status = ExecutionMassStatus::new(
1189 client_id,
1190 account_id,
1191 venue,
1192 UnixNanos::default(),
1193 Some(UUID4::new()),
1194 );
1195
1196 let events = manager.reconcile_execution_mass_status(mass_status).await;
1197 assert_eq!(events.len(), 0);
1198 }
1199
1200 #[rstest]
1201 fn test_reconciliation_config_default() {
1202 let config = ExecutionManagerConfig::default();
1203
1204 assert_eq!(config.lookback_mins, Some(60));
1205 assert_eq!(config.inflight_threshold_ms, 5000);
1206 assert_eq!(config.inflight_max_retries, 5);
1207 assert!(!config.filter_unclaimed_external);
1208 assert!(config.generate_missing_orders);
1209 }
1210}