1use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
22
23use nautilus_common::{cache::Cache, clock::Clock};
24use nautilus_core::{UUID4, UnixNanos};
25use nautilus_model::{
26 enums::OrderStatus,
27 events::{
28 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
29 OrderTriggered,
30 },
31 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
32 instruments::{Instrument, InstrumentAny},
33 orders::{Order, OrderAny},
34 reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
35 types::Quantity,
36};
37use ustr::Ustr;
38
39#[derive(Debug, Clone)]
41pub struct ReconciliationConfig {
42 pub lookback_mins: Option<u64>,
44 pub inflight_threshold_ms: u64,
46 pub inflight_max_retries: u8,
48 pub filter_unclaimed_external: bool,
50 pub generate_missing_orders: bool,
52}
53
54impl Default for ReconciliationConfig {
55 fn default() -> Self {
56 Self {
57 lookback_mins: Some(60),
58 inflight_threshold_ms: 5000,
59 inflight_max_retries: 3,
60 filter_unclaimed_external: true,
61 generate_missing_orders: false,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
69pub struct ExecutionReport {
70 pub client_order_id: ClientOrderId,
71 pub venue_order_id: Option<VenueOrderId>,
72 pub status: OrderStatus,
73 pub filled_qty: Quantity,
74 pub avg_px: Option<f64>,
75 pub ts_event: UnixNanos,
76}
77
78#[derive(Debug, Clone)]
80struct InflightCheck {
81 #[allow(dead_code)]
82 pub client_order_id: ClientOrderId,
83 pub ts_submitted: UnixNanos,
84 pub retry_count: u8,
85}
86
87#[derive(Clone)]
95pub struct ReconciliationManager {
96 clock: Rc<RefCell<dyn Clock>>,
97 cache: Rc<RefCell<Cache>>,
98 config: ReconciliationConfig,
99 inflight_checks: HashMap<ClientOrderId, InflightCheck>,
100 external_order_claims: HashMap<InstrumentId, StrategyId>,
101 processed_fills: HashMap<TradeId, ClientOrderId>,
102}
103
104impl Debug for ReconciliationManager {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("ReconciliationManager")
107 .field("config", &self.config)
108 .field("inflight_checks", &self.inflight_checks)
109 .field("external_order_claims", &self.external_order_claims)
110 .field("processed_fills", &self.processed_fills)
111 .finish()
112 }
113}
114
115impl ReconciliationManager {
116 pub fn new(
118 clock: Rc<RefCell<dyn Clock>>,
119 cache: Rc<RefCell<Cache>>,
120 config: ReconciliationConfig,
121 ) -> Self {
122 Self {
123 clock,
124 cache,
125 config,
126 inflight_checks: HashMap::new(),
127 external_order_claims: HashMap::new(),
128 processed_fills: HashMap::new(),
129 }
130 }
131
132 pub async fn reconcile_execution_mass_status(
134 &mut self,
135 mass_status: ExecutionMassStatus,
136 ) -> Vec<OrderEventAny> {
137 let mut events = Vec::new();
138
139 for report in mass_status.order_reports().values() {
141 if let Some(client_order_id) = &report.client_order_id {
142 if let Some(order) = self.get_order(client_order_id) {
143 let mut order = order;
144 if let Some(event) = self.reconcile_order_report(&mut order, report) {
145 events.push(event);
146 }
147 }
148 } else if !self.config.filter_unclaimed_external
149 && let Some(event) = self.handle_external_order(report, &mass_status.account_id)
150 {
151 events.push(event);
152 }
153 }
154
155 for fills in mass_status.fill_reports().values() {
157 for fill in fills {
158 if let Some(client_order_id) = &fill.client_order_id
159 && let Some(order) = self.get_order(client_order_id)
160 {
161 let mut order = order;
162 let instrument_id = order.instrument_id();
164 if let Some(instrument) = self.get_instrument(&instrument_id)
165 && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
166 {
167 events.push(event);
168 }
169 }
170 }
171 }
172
173 events
174 }
175
176 pub fn reconcile_report(&mut self, report: ExecutionReport) -> Vec<OrderEventAny> {
178 let mut events = Vec::new();
179
180 self.inflight_checks.remove(&report.client_order_id);
182
183 if let Some(order) = self.get_order(&report.client_order_id) {
184 let mut order = order;
185 let order_report = OrderStatusReport::new(
187 order.account_id().unwrap_or_default(),
188 order.instrument_id(),
189 Some(report.client_order_id),
190 report.venue_order_id.unwrap_or_default(),
191 order.order_side(),
192 order.order_type(),
193 order.time_in_force(),
194 report.status,
195 order.quantity(),
196 report.filled_qty,
197 report.ts_event, report.ts_event, self.clock.borrow().timestamp_ns(),
200 Some(UUID4::new()),
201 )
202 .with_avg_px(report.avg_px.unwrap_or(0.0));
203
204 if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
205 events.push(event);
206 }
207 }
208
209 events
210 }
211
212 pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
214 let mut events = Vec::new();
215 let current_time = self.clock.borrow().timestamp_ns();
216 let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
217
218 let mut to_check = Vec::new();
219 for (client_order_id, check) in &self.inflight_checks {
220 if current_time - check.ts_submitted > threshold_ns {
221 to_check.push(*client_order_id);
222 }
223 }
224
225 for client_order_id in to_check {
226 if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
227 check.retry_count += 1;
228 if check.retry_count >= self.config.inflight_max_retries {
229 if let Some(order) = self.get_order(&client_order_id) {
231 events.push(self.create_order_rejected(&order));
232 }
233 self.inflight_checks.remove(&client_order_id);
235 }
236 }
238 }
239
240 events
241 }
242
243 pub async fn check_open_orders(&mut self) -> Vec<OrderEventAny> {
245 Vec::new()
248 }
249
250 pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
252 let ts_submitted = self.clock.borrow().timestamp_ns();
253 self.inflight_checks.insert(
254 client_order_id,
255 InflightCheck {
256 client_order_id,
257 ts_submitted,
258 retry_count: 0,
259 },
260 );
261 }
262
263 pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
265 self.external_order_claims
266 .insert(instrument_id, strategy_id);
267 }
268
269 fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
272 self.cache.borrow().order(client_order_id).cloned()
273 }
274
275 fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
276 self.cache.borrow().instrument(instrument_id).cloned()
277 }
278
279 fn reconcile_order_report(
280 &mut self,
281 order: &mut OrderAny,
282 report: &OrderStatusReport,
283 ) -> Option<OrderEventAny> {
284 if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
286 return None; }
288
289 match report.order_status {
291 OrderStatus::Accepted => Some(self.create_order_accepted(order, report)),
292 OrderStatus::Rejected => Some(self.create_order_rejected(order)),
293 OrderStatus::Triggered => Some(self.create_order_triggered(order, report)),
294 OrderStatus::Canceled => Some(self.create_order_canceled(order, report)),
295 OrderStatus::Expired => Some(self.create_order_expired(order, report)),
296 _ => None,
297 }
298 }
299
300 fn handle_external_order(
301 &self,
302 _report: &OrderStatusReport,
303 _account_id: &AccountId,
304 ) -> Option<OrderEventAny> {
305 None
308 }
309
310 fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
311 OrderEventAny::Accepted(OrderAccepted::new(
312 order.trader_id(),
313 order.strategy_id(),
314 order.instrument_id(),
315 order.client_order_id(),
316 order.venue_order_id().unwrap_or(report.venue_order_id),
317 order.account_id().unwrap_or_default(),
318 UUID4::new(),
319 report.ts_accepted,
320 self.clock.borrow().timestamp_ns(),
321 false,
322 ))
323 }
324
325 fn create_order_rejected(&self, order: &OrderAny) -> OrderEventAny {
326 OrderEventAny::Rejected(OrderRejected::new(
327 order.trader_id(),
328 order.strategy_id(),
329 order.instrument_id(),
330 order.client_order_id(),
331 order.account_id().unwrap_or_default(),
332 Ustr::from("Inflight check timeout"),
333 UUID4::new(),
334 self.clock.borrow().timestamp_ns(),
335 self.clock.borrow().timestamp_ns(),
336 false,
337 false, ))
339 }
340
341 fn create_order_triggered(
342 &self,
343 order: &OrderAny,
344 report: &OrderStatusReport,
345 ) -> OrderEventAny {
346 OrderEventAny::Triggered(OrderTriggered::new(
347 order.trader_id(),
348 order.strategy_id(),
349 order.instrument_id(),
350 order.client_order_id(),
351 UUID4::new(),
352 report
353 .ts_triggered
354 .unwrap_or(self.clock.borrow().timestamp_ns()),
355 self.clock.borrow().timestamp_ns(),
356 false,
357 order.venue_order_id(),
358 order.account_id(),
359 ))
360 }
361
362 fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
363 OrderEventAny::Canceled(OrderCanceled::new(
364 order.trader_id(),
365 order.strategy_id(),
366 order.instrument_id(),
367 order.client_order_id(),
368 UUID4::new(),
369 report.ts_last,
370 self.clock.borrow().timestamp_ns(),
371 false,
372 order.venue_order_id(),
373 order.account_id(),
374 ))
375 }
376
377 #[allow(dead_code)]
378 fn create_order_canceled_simple(&self, order: &OrderAny, ts_event: UnixNanos) -> OrderEventAny {
379 OrderEventAny::Canceled(OrderCanceled::new(
380 order.trader_id(),
381 order.strategy_id(),
382 order.instrument_id(),
383 order.client_order_id(),
384 UUID4::new(),
385 ts_event,
386 self.clock.borrow().timestamp_ns(),
387 false,
388 order.venue_order_id(),
389 order.account_id(),
390 ))
391 }
392
393 fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
394 OrderEventAny::Expired(OrderExpired::new(
395 order.trader_id(),
396 order.strategy_id(),
397 order.instrument_id(),
398 order.client_order_id(),
399 UUID4::new(),
400 report.ts_last,
401 self.clock.borrow().timestamp_ns(),
402 false,
403 order.venue_order_id(),
404 order.account_id(),
405 ))
406 }
407
408 fn create_order_fill(
409 &mut self,
410 order: &mut OrderAny,
411 fill: &FillReport,
412 instrument: &InstrumentAny,
413 ) -> Option<OrderEventAny> {
414 if self.processed_fills.contains_key(&fill.trade_id) {
416 return None;
417 }
418
419 self.processed_fills
421 .insert(fill.trade_id, order.client_order_id());
422
423 Some(OrderEventAny::Filled(OrderFilled::new(
424 order.trader_id(),
425 order.strategy_id(),
426 order.instrument_id(),
427 order.client_order_id(),
428 fill.venue_order_id,
429 order.account_id().unwrap_or_default(),
430 fill.trade_id,
431 fill.order_side,
432 order.order_type(),
433 fill.last_qty,
434 fill.last_px,
435 instrument.quote_currency(),
436 fill.liquidity_side,
437 fill.report_id,
438 fill.ts_event,
439 self.clock.borrow().timestamp_ns(),
440 false,
441 fill.venue_position_id,
442 Some(fill.commission),
443 )))
444 }
445}
446
447#[cfg(test)]
452mod tests {
453 use std::{cell::RefCell, rc::Rc};
454
455 use nautilus_common::{cache::Cache, clock::TestClock};
456 use nautilus_core::{UUID4, UnixNanos};
457 use nautilus_model::{
458 enums::OrderStatus,
459 identifiers::{AccountId, ClientId, ClientOrderId, VenueOrderId},
460 reports::ExecutionMassStatus,
461 types::Quantity,
462 };
463 use rstest::rstest;
464
465 use super::*;
466
467 fn create_test_manager() -> ReconciliationManager {
468 let clock = Rc::new(RefCell::new(TestClock::new()));
469 let cache = Rc::new(RefCell::new(Cache::default()));
470 let config = ReconciliationConfig::default();
471 ReconciliationManager::new(clock, cache, config)
472 }
473
474 #[rstest]
475 fn test_reconciliation_manager_new() {
476 let manager = create_test_manager();
477 assert_eq!(manager.inflight_checks.len(), 0);
478 assert_eq!(manager.external_order_claims.len(), 0);
479 assert_eq!(manager.processed_fills.len(), 0);
480 }
481
482 #[rstest]
483 fn test_register_inflight() {
484 let mut manager = create_test_manager();
485 let client_order_id = ClientOrderId::from("O-123456");
486
487 manager.register_inflight(client_order_id);
488
489 assert_eq!(manager.inflight_checks.len(), 1);
490 assert!(manager.inflight_checks.contains_key(&client_order_id));
491 }
492
493 #[rstest]
494 fn test_claim_external_orders() {
495 let mut manager = create_test_manager();
496 let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
497 let strategy_id = StrategyId::from("STRATEGY-001");
498
499 manager.claim_external_orders(instrument_id, strategy_id);
500
501 assert_eq!(manager.external_order_claims.len(), 1);
502 assert_eq!(
503 manager.external_order_claims.get(&instrument_id),
504 Some(&strategy_id)
505 );
506 }
507
508 #[rstest]
509 fn test_reconcile_report_removes_from_inflight() {
510 let mut manager = create_test_manager();
511 let client_order_id = ClientOrderId::from("O-123456");
512
513 manager.register_inflight(client_order_id);
515 assert_eq!(manager.inflight_checks.len(), 1);
516
517 let report = ExecutionReport {
519 client_order_id,
520 venue_order_id: Some(VenueOrderId::from("V-123456")),
521 status: OrderStatus::Accepted,
522 filled_qty: Quantity::from(0),
523 avg_px: None,
524 ts_event: UnixNanos::default(),
525 };
526
527 manager.reconcile_report(report);
529 assert_eq!(manager.inflight_checks.len(), 0);
530 }
531
532 #[rstest]
533 fn test_check_inflight_orders_generates_rejection_after_max_retries() {
534 let clock = Rc::new(RefCell::new(TestClock::new()));
535 let cache = Rc::new(RefCell::new(Cache::default()));
536 let config = ReconciliationConfig {
537 inflight_threshold_ms: 100,
538 inflight_max_retries: 2,
539 ..ReconciliationConfig::default()
540 };
541 let mut manager = ReconciliationManager::new(clock.clone(), cache.clone(), config);
542
543 let client_order_id = ClientOrderId::from("O-123456");
544 manager.register_inflight(client_order_id);
545
546 clock
548 .borrow_mut()
549 .advance_time(UnixNanos::from(200_000_000), true);
550 let events = manager.check_inflight_orders();
551 assert_eq!(events.len(), 0);
552 assert_eq!(
553 manager
554 .inflight_checks
555 .get(&client_order_id)
556 .unwrap()
557 .retry_count,
558 1
559 );
560
561 clock
563 .borrow_mut()
564 .advance_time(UnixNanos::from(400_000_000), true);
565 let events = manager.check_inflight_orders();
566 assert_eq!(events.len(), 0); assert!(!manager.inflight_checks.contains_key(&client_order_id));
568 }
569
570 #[tokio::test]
571 async fn test_reconcile_execution_mass_status_with_empty() {
572 let mut manager = create_test_manager();
573 let account_id = AccountId::from("ACCOUNT-001");
574 let venue = nautilus_model::identifiers::Venue::from("BINANCE");
575
576 let client_id = ClientId::from("BINANCE");
577 let mass_status = ExecutionMassStatus::new(
578 client_id,
579 account_id,
580 venue,
581 UnixNanos::default(),
582 Some(UUID4::new()),
583 );
584
585 let events = manager.reconcile_execution_mass_status(mass_status).await;
586 assert_eq!(events.len(), 0);
587 }
588
589 #[rstest]
590 fn test_reconciliation_config_default() {
591 let config = ReconciliationConfig::default();
592
593 assert_eq!(config.lookback_mins, Some(60));
594 assert_eq!(config.inflight_threshold_ms, 5000);
595 assert_eq!(config.inflight_max_retries, 3);
596 assert!(config.filter_unclaimed_external);
597 assert!(!config.generate_missing_orders);
598 }
599}