1use std::{
22 cell::RefCell,
23 collections::{HashMap, HashSet},
24 fmt::Debug,
25 rc::Rc,
26};
27
28use nautilus_common::{cache::Cache, clock::Clock};
29use nautilus_core::{UUID4, UnixNanos};
30use nautilus_model::{
31 enums::OrderStatus,
32 events::{
33 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
34 OrderTriggered,
35 },
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 orders::{Order, OrderAny},
39 reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
40 types::Quantity,
41};
42use ustr::Ustr;
43
44#[derive(Debug, Clone)]
46pub struct ReconciliationConfig {
47 pub lookback_mins: Option<u64>,
49 pub inflight_threshold_ms: u64,
51 pub inflight_max_retries: u32,
53 pub filter_unclaimed_external: bool,
55 pub generate_missing_orders: bool,
57 pub filtered_client_order_ids: HashSet<ClientOrderId>,
59 pub open_check_threshold_ns: u64,
61 pub open_check_missing_retries: u32,
63 pub open_check_open_only: bool,
65 pub open_check_lookback_mins: Option<u64>,
67 pub filter_position_reports: bool,
69 pub reconciliation_instrument_ids: HashSet<InstrumentId>,
71}
72
73impl Default for ReconciliationConfig {
74 fn default() -> Self {
75 Self {
76 lookback_mins: Some(60),
77 inflight_threshold_ms: 5000,
78 inflight_max_retries: 5,
79 filter_unclaimed_external: false,
80 generate_missing_orders: true,
81 filtered_client_order_ids: HashSet::new(),
82 open_check_threshold_ns: 5_000_000_000,
83 open_check_missing_retries: 5,
84 open_check_open_only: true,
85 open_check_lookback_mins: Some(60),
86 filter_position_reports: false,
87 reconciliation_instrument_ids: HashSet::new(),
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
95pub struct ExecutionReport {
96 pub client_order_id: ClientOrderId,
97 pub venue_order_id: Option<VenueOrderId>,
98 pub status: OrderStatus,
99 pub filled_qty: Quantity,
100 pub avg_px: Option<f64>,
101 pub ts_event: UnixNanos,
102}
103
104#[derive(Debug, Clone)]
106struct InflightCheck {
107 #[allow(dead_code)]
108 pub client_order_id: ClientOrderId,
109 pub ts_submitted: UnixNanos,
110 pub retry_count: u32,
111 pub last_query_ts: Option<UnixNanos>,
112}
113
114#[derive(Clone)]
122pub struct ReconciliationManager {
123 clock: Rc<RefCell<dyn Clock>>,
124 cache: Rc<RefCell<Cache>>,
125 config: ReconciliationConfig,
126 inflight_checks: HashMap<ClientOrderId, InflightCheck>,
127 external_order_claims: HashMap<InstrumentId, StrategyId>,
128 processed_fills: HashMap<TradeId, ClientOrderId>,
129 recon_check_retries: HashMap<ClientOrderId, u32>,
130 ts_last_query: HashMap<ClientOrderId, UnixNanos>,
131 order_local_activity_ns: HashMap<ClientOrderId, UnixNanos>,
132}
133
134impl Debug for ReconciliationManager {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 f.debug_struct(stringify!(ReconciliationManager))
137 .field("config", &self.config)
138 .field("inflight_checks", &self.inflight_checks)
139 .field("external_order_claims", &self.external_order_claims)
140 .field("processed_fills", &self.processed_fills)
141 .field("recon_check_retries", &self.recon_check_retries)
142 .finish()
143 }
144}
145
146impl ReconciliationManager {
147 pub fn new(
149 clock: Rc<RefCell<dyn Clock>>,
150 cache: Rc<RefCell<Cache>>,
151 config: ReconciliationConfig,
152 ) -> Self {
153 Self {
154 clock,
155 cache,
156 config,
157 inflight_checks: HashMap::new(),
158 external_order_claims: HashMap::new(),
159 processed_fills: HashMap::new(),
160 recon_check_retries: HashMap::new(),
161 ts_last_query: HashMap::new(),
162 order_local_activity_ns: HashMap::new(),
163 }
164 }
165
166 pub async fn reconcile_execution_mass_status(
168 &mut self,
169 mass_status: ExecutionMassStatus,
170 ) -> Vec<OrderEventAny> {
171 let mut events = Vec::new();
172
173 for report in mass_status.order_reports().values() {
175 if let Some(client_order_id) = &report.client_order_id {
176 if let Some(order) = self.get_order(client_order_id) {
177 let mut order = order;
178 if let Some(event) = self.reconcile_order_report(&mut order, report) {
179 events.push(event);
180 }
181 }
182 } else if !self.config.filter_unclaimed_external
183 && let Some(event) = self.handle_external_order(report, &mass_status.account_id)
184 {
185 events.push(event);
186 }
187 }
188
189 for fills in mass_status.fill_reports().values() {
191 for fill in fills {
192 if let Some(client_order_id) = &fill.client_order_id
193 && let Some(order) = self.get_order(client_order_id)
194 {
195 let mut order = order;
196 let instrument_id = order.instrument_id();
198 if let Some(instrument) = self.get_instrument(&instrument_id)
199 && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
200 {
201 events.push(event);
202 }
203 }
204 }
205 }
206
207 events
208 }
209
210 pub fn reconcile_report(&mut self, report: ExecutionReport) -> Vec<OrderEventAny> {
212 let mut events = Vec::new();
213
214 self.clear_recon_tracking(&report.client_order_id, true);
216
217 if let Some(order) = self.get_order(&report.client_order_id) {
218 let mut order = order;
219 let order_report = OrderStatusReport::new(
221 order.account_id().unwrap_or_default(),
222 order.instrument_id(),
223 Some(report.client_order_id),
224 report.venue_order_id.unwrap_or_default(),
225 order.order_side(),
226 order.order_type(),
227 order.time_in_force(),
228 report.status,
229 order.quantity(),
230 report.filled_qty,
231 report.ts_event, report.ts_event, self.clock.borrow().timestamp_ns(),
234 Some(UUID4::new()),
235 )
236 .with_avg_px(report.avg_px.unwrap_or(0.0));
237
238 if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
239 events.push(event);
240 }
241 }
242
243 events
244 }
245
246 pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
248 let mut events = Vec::new();
249 let current_time = self.clock.borrow().timestamp_ns();
250 let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
251
252 let mut to_check = Vec::new();
253 for (client_order_id, check) in &self.inflight_checks {
254 if current_time - check.ts_submitted > threshold_ns {
255 to_check.push(*client_order_id);
256 }
257 }
258
259 for client_order_id in to_check {
260 if self
261 .config
262 .filtered_client_order_ids
263 .contains(&client_order_id)
264 {
265 continue;
266 }
267
268 if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
269 if let Some(last_query_ts) = check.last_query_ts
270 && current_time - last_query_ts < threshold_ns
271 {
272 continue;
273 }
274
275 check.retry_count += 1;
276 check.last_query_ts = Some(current_time);
277 self.ts_last_query.insert(client_order_id, current_time);
278 self.recon_check_retries
279 .insert(client_order_id, check.retry_count);
280
281 if check.retry_count >= self.config.inflight_max_retries {
282 if let Some(order) = self.get_order(&client_order_id) {
284 events.push(self.create_order_rejected(&order, Some("INFLIGHT_TIMEOUT")));
285 }
286 self.clear_recon_tracking(&client_order_id, true);
288 }
289 }
290 }
291
292 events
293 }
294
295 pub async fn check_open_orders(&mut self) -> Vec<OrderEventAny> {
297 Vec::new()
300 }
301
302 pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
304 let ts_submitted = self.clock.borrow().timestamp_ns();
305 self.inflight_checks.insert(
306 client_order_id,
307 InflightCheck {
308 client_order_id,
309 ts_submitted,
310 retry_count: 0,
311 last_query_ts: None,
312 },
313 );
314 self.recon_check_retries.insert(client_order_id, 0);
315 self.ts_last_query.remove(&client_order_id);
316 self.order_local_activity_ns.remove(&client_order_id);
317 }
318
319 pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
321 self.order_local_activity_ns
322 .insert(client_order_id, ts_event);
323 }
324
325 pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
327 self.inflight_checks.remove(client_order_id);
328 self.recon_check_retries.remove(client_order_id);
329 if drop_last_query {
330 self.ts_last_query.remove(client_order_id);
331 }
332 self.order_local_activity_ns.remove(client_order_id);
333 }
334
335 pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
337 self.external_order_claims
338 .insert(instrument_id, strategy_id);
339 }
340
341 fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
344 self.cache.borrow().order(client_order_id).cloned()
345 }
346
347 fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
348 self.cache.borrow().instrument(instrument_id).cloned()
349 }
350
351 fn reconcile_order_report(
352 &mut self,
353 order: &mut OrderAny,
354 report: &OrderStatusReport,
355 ) -> Option<OrderEventAny> {
356 if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
358 return None; }
360
361 match report.order_status {
363 OrderStatus::Accepted => Some(self.create_order_accepted(order, report)),
364 OrderStatus::Rejected => {
365 Some(self.create_order_rejected(order, report.cancel_reason.as_deref()))
366 }
367 OrderStatus::Triggered => Some(self.create_order_triggered(order, report)),
368 OrderStatus::Canceled => Some(self.create_order_canceled(order, report)),
369 OrderStatus::Expired => Some(self.create_order_expired(order, report)),
370 _ => None,
371 }
372 }
373
374 fn handle_external_order(
375 &self,
376 _report: &OrderStatusReport,
377 _account_id: &AccountId,
378 ) -> Option<OrderEventAny> {
379 None
382 }
383
384 fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
385 OrderEventAny::Accepted(OrderAccepted::new(
386 order.trader_id(),
387 order.strategy_id(),
388 order.instrument_id(),
389 order.client_order_id(),
390 order.venue_order_id().unwrap_or(report.venue_order_id),
391 order.account_id().unwrap_or_default(),
392 UUID4::new(),
393 report.ts_accepted,
394 self.clock.borrow().timestamp_ns(),
395 false,
396 ))
397 }
398
399 fn create_order_rejected(&self, order: &OrderAny, reason: Option<&str>) -> OrderEventAny {
400 let reason = reason.unwrap_or("UNKNOWN");
401 OrderEventAny::Rejected(OrderRejected::new(
402 order.trader_id(),
403 order.strategy_id(),
404 order.instrument_id(),
405 order.client_order_id(),
406 order.account_id().unwrap_or_default(),
407 Ustr::from(reason),
408 UUID4::new(),
409 self.clock.borrow().timestamp_ns(),
410 self.clock.borrow().timestamp_ns(),
411 false,
412 false, ))
414 }
415
416 fn create_order_triggered(
417 &self,
418 order: &OrderAny,
419 report: &OrderStatusReport,
420 ) -> OrderEventAny {
421 OrderEventAny::Triggered(OrderTriggered::new(
422 order.trader_id(),
423 order.strategy_id(),
424 order.instrument_id(),
425 order.client_order_id(),
426 UUID4::new(),
427 report
428 .ts_triggered
429 .unwrap_or(self.clock.borrow().timestamp_ns()),
430 self.clock.borrow().timestamp_ns(),
431 false,
432 order.venue_order_id(),
433 order.account_id(),
434 ))
435 }
436
437 fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
438 OrderEventAny::Canceled(OrderCanceled::new(
439 order.trader_id(),
440 order.strategy_id(),
441 order.instrument_id(),
442 order.client_order_id(),
443 UUID4::new(),
444 report.ts_last,
445 self.clock.borrow().timestamp_ns(),
446 false,
447 order.venue_order_id(),
448 order.account_id(),
449 ))
450 }
451
452 #[allow(dead_code)]
453 fn create_order_canceled_simple(&self, order: &OrderAny, ts_event: UnixNanos) -> OrderEventAny {
454 OrderEventAny::Canceled(OrderCanceled::new(
455 order.trader_id(),
456 order.strategy_id(),
457 order.instrument_id(),
458 order.client_order_id(),
459 UUID4::new(),
460 ts_event,
461 self.clock.borrow().timestamp_ns(),
462 false,
463 order.venue_order_id(),
464 order.account_id(),
465 ))
466 }
467
468 fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
469 OrderEventAny::Expired(OrderExpired::new(
470 order.trader_id(),
471 order.strategy_id(),
472 order.instrument_id(),
473 order.client_order_id(),
474 UUID4::new(),
475 report.ts_last,
476 self.clock.borrow().timestamp_ns(),
477 false,
478 order.venue_order_id(),
479 order.account_id(),
480 ))
481 }
482
483 fn create_order_fill(
484 &mut self,
485 order: &mut OrderAny,
486 fill: &FillReport,
487 instrument: &InstrumentAny,
488 ) -> Option<OrderEventAny> {
489 if self.processed_fills.contains_key(&fill.trade_id) {
491 return None;
492 }
493
494 self.processed_fills
496 .insert(fill.trade_id, order.client_order_id());
497
498 Some(OrderEventAny::Filled(OrderFilled::new(
499 order.trader_id(),
500 order.strategy_id(),
501 order.instrument_id(),
502 order.client_order_id(),
503 fill.venue_order_id,
504 order.account_id().unwrap_or_default(),
505 fill.trade_id,
506 fill.order_side,
507 order.order_type(),
508 fill.last_qty,
509 fill.last_px,
510 instrument.quote_currency(),
511 fill.liquidity_side,
512 fill.report_id,
513 fill.ts_event,
514 self.clock.borrow().timestamp_ns(),
515 false,
516 fill.venue_position_id,
517 Some(fill.commission),
518 )))
519 }
520}
521
522#[cfg(test)]
527mod tests {
528 use std::{cell::RefCell, rc::Rc};
529
530 use nautilus_common::{cache::Cache, clock::TestClock};
531 use nautilus_core::{UUID4, UnixNanos};
532 use nautilus_model::{
533 enums::OrderStatus,
534 identifiers::{AccountId, ClientId, ClientOrderId, VenueOrderId},
535 reports::ExecutionMassStatus,
536 types::Quantity,
537 };
538 use rstest::rstest;
539
540 use super::*;
541
542 fn create_test_manager() -> ReconciliationManager {
543 let clock = Rc::new(RefCell::new(TestClock::new()));
544 let cache = Rc::new(RefCell::new(Cache::default()));
545 let config = ReconciliationConfig::default();
546 ReconciliationManager::new(clock, cache, config)
547 }
548
549 #[rstest]
550 fn test_reconciliation_manager_new() {
551 let manager = create_test_manager();
552 assert_eq!(manager.inflight_checks.len(), 0);
553 assert_eq!(manager.external_order_claims.len(), 0);
554 assert_eq!(manager.processed_fills.len(), 0);
555 }
556
557 #[rstest]
558 fn test_register_inflight() {
559 let mut manager = create_test_manager();
560 let client_order_id = ClientOrderId::from("O-123456");
561
562 manager.register_inflight(client_order_id);
563
564 assert_eq!(manager.inflight_checks.len(), 1);
565 assert!(manager.inflight_checks.contains_key(&client_order_id));
566 }
567
568 #[rstest]
569 fn test_claim_external_orders() {
570 let mut manager = create_test_manager();
571 let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
572 let strategy_id = StrategyId::from("STRATEGY-001");
573
574 manager.claim_external_orders(instrument_id, strategy_id);
575
576 assert_eq!(manager.external_order_claims.len(), 1);
577 assert_eq!(
578 manager.external_order_claims.get(&instrument_id),
579 Some(&strategy_id)
580 );
581 }
582
583 #[rstest]
584 fn test_reconcile_report_removes_from_inflight() {
585 let mut manager = create_test_manager();
586 let client_order_id = ClientOrderId::from("O-123456");
587
588 manager.register_inflight(client_order_id);
590 assert_eq!(manager.inflight_checks.len(), 1);
591
592 let report = ExecutionReport {
594 client_order_id,
595 venue_order_id: Some(VenueOrderId::from("V-123456")),
596 status: OrderStatus::Accepted,
597 filled_qty: Quantity::from(0),
598 avg_px: None,
599 ts_event: UnixNanos::default(),
600 };
601
602 manager.reconcile_report(report);
604 assert_eq!(manager.inflight_checks.len(), 0);
605 }
606
607 #[rstest]
608 fn test_check_inflight_orders_generates_rejection_after_max_retries() {
609 let clock = Rc::new(RefCell::new(TestClock::new()));
610 let cache = Rc::new(RefCell::new(Cache::default()));
611 let config = ReconciliationConfig {
612 inflight_threshold_ms: 100,
613 inflight_max_retries: 2,
614 ..ReconciliationConfig::default()
615 };
616 let mut manager = ReconciliationManager::new(clock.clone(), cache.clone(), config);
617
618 let client_order_id = ClientOrderId::from("O-123456");
619 manager.register_inflight(client_order_id);
620
621 clock
623 .borrow_mut()
624 .advance_time(UnixNanos::from(200_000_000), true);
625 let events = manager.check_inflight_orders();
626 assert_eq!(events.len(), 0);
627 let first_check = manager
628 .inflight_checks
629 .get(&client_order_id)
630 .expect("inflight check present");
631 assert_eq!(first_check.retry_count, 1);
632 let first_query_ts = first_check.last_query_ts.expect("last query recorded");
633
634 clock
636 .borrow_mut()
637 .advance_time(UnixNanos::from(400_000_000), true);
638 let events = manager.check_inflight_orders();
639 assert_eq!(events.len(), 0); assert!(!manager.inflight_checks.contains_key(&client_order_id));
641 assert!(clock.borrow().timestamp_ns() > first_query_ts);
643 }
644
645 #[rstest]
646 fn test_check_inflight_orders_skips_recent_query() {
647 let clock = Rc::new(RefCell::new(TestClock::new()));
648 let cache = Rc::new(RefCell::new(Cache::default()));
649 let config = ReconciliationConfig {
650 inflight_threshold_ms: 100,
651 inflight_max_retries: 3,
652 ..ReconciliationConfig::default()
653 };
654 let mut manager = ReconciliationManager::new(clock.clone(), cache, config);
655
656 let client_order_id = ClientOrderId::from("O-ABCDEF");
657 manager.register_inflight(client_order_id);
658
659 clock
661 .borrow_mut()
662 .advance_time(UnixNanos::from(200_000_000), true);
663 let events = manager.check_inflight_orders();
664 assert!(events.is_empty());
665 let initial_check = manager
666 .inflight_checks
667 .get(&client_order_id)
668 .expect("inflight check retained");
669 assert_eq!(initial_check.retry_count, 1);
670 let last_query_ts = initial_check.last_query_ts.expect("last query recorded");
671
672 clock
674 .borrow_mut()
675 .advance_time(UnixNanos::from(250_000_000), true);
676 let events = manager.check_inflight_orders();
677 assert!(events.is_empty());
678 let second_check = manager
679 .inflight_checks
680 .get(&client_order_id)
681 .expect("inflight check retained");
682 assert_eq!(second_check.retry_count, 1);
683 assert_eq!(second_check.last_query_ts, Some(last_query_ts));
684 }
685
686 #[rstest]
687 fn test_check_inflight_orders_skips_filtered_ids() {
688 let clock = Rc::new(RefCell::new(TestClock::new()));
689 let cache = Rc::new(RefCell::new(Cache::default()));
690 let filtered_id = ClientOrderId::from("O-FILTERED");
691 let mut config = ReconciliationConfig::default();
692 config.filtered_client_order_ids.insert(filtered_id);
693 config.inflight_threshold_ms = 100;
694 let mut manager = ReconciliationManager::new(clock.clone(), cache, config);
695
696 manager.register_inflight(filtered_id);
697 clock
698 .borrow_mut()
699 .advance_time(UnixNanos::from(200_000_000), true);
700 let events = manager.check_inflight_orders();
701 assert!(events.is_empty());
702 assert!(manager.inflight_checks.contains_key(&filtered_id));
703 }
704
705 #[rstest]
706 fn test_record_and_clear_tracking() {
707 let mut manager = create_test_manager();
708 let client_order_id = ClientOrderId::from("O-TRACK");
709
710 manager.register_inflight(client_order_id);
711 let ts_now = UnixNanos::from(1_000_000);
712 manager.record_local_activity(client_order_id, ts_now);
713
714 assert_eq!(
715 manager
716 .order_local_activity_ns
717 .get(&client_order_id)
718 .copied(),
719 Some(ts_now)
720 );
721
722 manager.clear_recon_tracking(&client_order_id, true);
723 assert!(!manager.inflight_checks.contains_key(&client_order_id));
724 assert!(
725 !manager
726 .order_local_activity_ns
727 .contains_key(&client_order_id)
728 );
729 assert!(!manager.recon_check_retries.contains_key(&client_order_id));
730 assert!(!manager.ts_last_query.contains_key(&client_order_id));
731 }
732
733 #[tokio::test]
734 async fn test_reconcile_execution_mass_status_with_empty() {
735 let mut manager = create_test_manager();
736 let account_id = AccountId::from("ACCOUNT-001");
737 let venue = nautilus_model::identifiers::Venue::from("BINANCE");
738
739 let client_id = ClientId::from("BINANCE");
740 let mass_status = ExecutionMassStatus::new(
741 client_id,
742 account_id,
743 venue,
744 UnixNanos::default(),
745 Some(UUID4::new()),
746 );
747
748 let events = manager.reconcile_execution_mass_status(mass_status).await;
749 assert_eq!(events.len(), 0);
750 }
751
752 #[rstest]
753 fn test_reconciliation_config_default() {
754 let config = ReconciliationConfig::default();
755
756 assert_eq!(config.lookback_mins, Some(60));
757 assert_eq!(config.inflight_threshold_ms, 5000);
758 assert_eq!(config.inflight_max_retries, 5);
759 assert!(!config.filter_unclaimed_external);
760 assert!(config.generate_missing_orders);
761 }
762}