nautilus_live/
reconciliation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Reconciliation managers for live execution state.
17//!
18//! This module provides managers for reconciling execution state between
19//! the local cache and connected venues during live trading.
20
21use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
22
23use nautilus_common::{cache::Cache, clock::Clock};
24use nautilus_core::{UUID4, UnixNanos};
25use nautilus_model::{
26    enums::OrderStatus,
27    events::{
28        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
29        OrderTriggered,
30    },
31    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
32    instruments::{Instrument, InstrumentAny},
33    orders::{Order, OrderAny},
34    reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
35    types::Quantity,
36};
37use ustr::Ustr;
38
39/// Configuration for reconciliation manager.
40#[derive(Debug, Clone)]
41pub struct ReconciliationConfig {
42    /// Number of minutes to look back during reconciliation.
43    pub lookback_mins: Option<u64>,
44    /// Threshold in milliseconds for inflight order checks.
45    pub inflight_threshold_ms: u64,
46    /// Maximum number of retries for inflight checks.
47    pub inflight_max_retries: u8,
48    /// Whether to filter unclaimed external orders.
49    pub filter_unclaimed_external: bool,
50    /// Whether to generate missing orders from reports.
51    pub generate_missing_orders: bool,
52}
53
54impl Default for ReconciliationConfig {
55    fn default() -> Self {
56        Self {
57            lookback_mins: Some(60),
58            inflight_threshold_ms: 5000,
59            inflight_max_retries: 3,
60            filter_unclaimed_external: true,
61            generate_missing_orders: false,
62        }
63    }
64}
65
66/// Execution report for continuous reconciliation.
67/// This is a simplified report type used during runtime reconciliation.
68#[derive(Debug, Clone)]
69pub struct ExecutionReport {
70    pub client_order_id: ClientOrderId,
71    pub venue_order_id: Option<VenueOrderId>,
72    pub status: OrderStatus,
73    pub filled_qty: Quantity,
74    pub avg_px: Option<f64>,
75    pub ts_event: UnixNanos,
76}
77
78/// Information about an inflight order check.
79#[derive(Debug, Clone)]
80struct InflightCheck {
81    #[allow(dead_code)]
82    pub client_order_id: ClientOrderId,
83    pub ts_submitted: UnixNanos,
84    pub retry_count: u8,
85}
86
87/// Manager for reconciling execution state between local cache and venues.
88///
89/// The `ReconciliationManager` handles:
90/// - Startup reconciliation to align state on system start
91/// - Continuous reconciliation of inflight orders
92/// - External order discovery and claiming
93/// - Fill report processing and validation
94#[derive(Clone)]
95pub struct ReconciliationManager {
96    clock: Rc<RefCell<dyn Clock>>,
97    cache: Rc<RefCell<Cache>>,
98    config: ReconciliationConfig,
99    inflight_checks: HashMap<ClientOrderId, InflightCheck>,
100    external_order_claims: HashMap<InstrumentId, StrategyId>,
101    processed_fills: HashMap<TradeId, ClientOrderId>,
102}
103
104impl Debug for ReconciliationManager {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("ReconciliationManager")
107            .field("config", &self.config)
108            .field("inflight_checks", &self.inflight_checks)
109            .field("external_order_claims", &self.external_order_claims)
110            .field("processed_fills", &self.processed_fills)
111            .finish()
112    }
113}
114
115impl ReconciliationManager {
116    /// Creates a new [`ReconciliationManager`] instance.
117    pub fn new(
118        clock: Rc<RefCell<dyn Clock>>,
119        cache: Rc<RefCell<Cache>>,
120        config: ReconciliationConfig,
121    ) -> Self {
122        Self {
123            clock,
124            cache,
125            config,
126            inflight_checks: HashMap::new(),
127            external_order_claims: HashMap::new(),
128            processed_fills: HashMap::new(),
129        }
130    }
131
132    /// Reconciles orders and fills from a mass status report.
133    pub async fn reconcile_execution_mass_status(
134        &mut self,
135        mass_status: ExecutionMassStatus,
136    ) -> Vec<OrderEventAny> {
137        let mut events = Vec::new();
138
139        // Process order status reports first
140        for report in mass_status.order_reports().values() {
141            if let Some(client_order_id) = &report.client_order_id {
142                if let Some(order) = self.get_order(client_order_id) {
143                    let mut order = order;
144                    if let Some(event) = self.reconcile_order_report(&mut order, report) {
145                        events.push(event);
146                    }
147                }
148            } else if !self.config.filter_unclaimed_external
149                && let Some(event) = self.handle_external_order(report, &mass_status.account_id)
150            {
151                events.push(event);
152            }
153        }
154
155        // Process fill reports
156        for fills in mass_status.fill_reports().values() {
157            for fill in fills {
158                if let Some(client_order_id) = &fill.client_order_id
159                    && let Some(order) = self.get_order(client_order_id)
160                {
161                    let mut order = order;
162                    // Get instrument for the order
163                    let instrument_id = order.instrument_id();
164                    if let Some(instrument) = self.get_instrument(&instrument_id)
165                        && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
166                    {
167                        events.push(event);
168                    }
169                }
170            }
171        }
172
173        events
174    }
175
176    /// Reconciles a single execution report during runtime.
177    pub fn reconcile_report(&mut self, report: ExecutionReport) -> Vec<OrderEventAny> {
178        let mut events = Vec::new();
179
180        // Remove from inflight checks if present
181        self.inflight_checks.remove(&report.client_order_id);
182
183        if let Some(order) = self.get_order(&report.client_order_id) {
184            let mut order = order;
185            // Create an OrderStatusReport from the ExecutionReport
186            let order_report = OrderStatusReport::new(
187                order.account_id().unwrap_or_default(),
188                order.instrument_id(),
189                Some(report.client_order_id),
190                report.venue_order_id.unwrap_or_default(),
191                order.order_side(),
192                order.order_type(),
193                order.time_in_force(),
194                report.status,
195                order.quantity(),
196                report.filled_qty,
197                report.ts_event, // Use ts_event as ts_accepted
198                report.ts_event, // Use ts_event as ts_last
199                self.clock.borrow().timestamp_ns(),
200                Some(UUID4::new()),
201            )
202            .with_avg_px(report.avg_px.unwrap_or(0.0));
203
204            if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
205                events.push(event);
206            }
207        }
208
209        events
210    }
211
212    /// Checks inflight orders and returns events for any that need reconciliation.
213    pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
214        let mut events = Vec::new();
215        let current_time = self.clock.borrow().timestamp_ns();
216        let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
217
218        let mut to_check = Vec::new();
219        for (client_order_id, check) in &self.inflight_checks {
220            if current_time - check.ts_submitted > threshold_ns {
221                to_check.push(*client_order_id);
222            }
223        }
224
225        for client_order_id in to_check {
226            if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
227                check.retry_count += 1;
228                if check.retry_count >= self.config.inflight_max_retries {
229                    // Generate rejection after max retries
230                    if let Some(order) = self.get_order(&client_order_id) {
231                        events.push(self.create_order_rejected(&order));
232                    }
233                    // Remove from inflight checks regardless of whether order exists
234                    self.inflight_checks.remove(&client_order_id);
235                }
236                // Otherwise, the check remains for the next cycle
237            }
238        }
239
240        events
241    }
242
243    /// Checks open orders against the venue state.
244    pub async fn check_open_orders(&mut self) -> Vec<OrderEventAny> {
245        // This would need to query the venue for open orders
246        // and reconcile any discrepancies
247        Vec::new()
248    }
249
250    /// Registers an order as inflight for tracking.
251    pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
252        let ts_submitted = self.clock.borrow().timestamp_ns();
253        self.inflight_checks.insert(
254            client_order_id,
255            InflightCheck {
256                client_order_id,
257                ts_submitted,
258                retry_count: 0,
259            },
260        );
261    }
262
263    /// Claims external orders for a specific strategy and instrument.
264    pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
265        self.external_order_claims
266            .insert(instrument_id, strategy_id);
267    }
268
269    // Private helper methods
270
271    fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
272        self.cache.borrow().order(client_order_id).cloned()
273    }
274
275    fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
276        self.cache.borrow().instrument(instrument_id).cloned()
277    }
278
279    fn reconcile_order_report(
280        &mut self,
281        order: &mut OrderAny,
282        report: &OrderStatusReport,
283    ) -> Option<OrderEventAny> {
284        // Check if reconciliation is needed
285        if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
286            return None; // Already in sync
287        }
288
289        // Generate appropriate event based on status
290        match report.order_status {
291            OrderStatus::Accepted => Some(self.create_order_accepted(order, report)),
292            OrderStatus::Rejected => Some(self.create_order_rejected(order)),
293            OrderStatus::Triggered => Some(self.create_order_triggered(order, report)),
294            OrderStatus::Canceled => Some(self.create_order_canceled(order, report)),
295            OrderStatus::Expired => Some(self.create_order_expired(order, report)),
296            _ => None,
297        }
298    }
299
300    fn handle_external_order(
301        &self,
302        _report: &OrderStatusReport,
303        _account_id: &AccountId,
304    ) -> Option<OrderEventAny> {
305        // This would need to create a new order from the report
306        // For now, we'll skip external order handling
307        None
308    }
309
310    fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
311        OrderEventAny::Accepted(OrderAccepted::new(
312            order.trader_id(),
313            order.strategy_id(),
314            order.instrument_id(),
315            order.client_order_id(),
316            order.venue_order_id().unwrap_or(report.venue_order_id),
317            order.account_id().unwrap_or_default(),
318            UUID4::new(),
319            report.ts_accepted,
320            self.clock.borrow().timestamp_ns(),
321            false,
322        ))
323    }
324
325    fn create_order_rejected(&self, order: &OrderAny) -> OrderEventAny {
326        OrderEventAny::Rejected(OrderRejected::new(
327            order.trader_id(),
328            order.strategy_id(),
329            order.instrument_id(),
330            order.client_order_id(),
331            order.account_id().unwrap_or_default(),
332            Ustr::from("Inflight check timeout"),
333            UUID4::new(),
334            self.clock.borrow().timestamp_ns(),
335            self.clock.borrow().timestamp_ns(),
336            false,
337            false, // due_post_only
338        ))
339    }
340
341    fn create_order_triggered(
342        &self,
343        order: &OrderAny,
344        report: &OrderStatusReport,
345    ) -> OrderEventAny {
346        OrderEventAny::Triggered(OrderTriggered::new(
347            order.trader_id(),
348            order.strategy_id(),
349            order.instrument_id(),
350            order.client_order_id(),
351            UUID4::new(),
352            report
353                .ts_triggered
354                .unwrap_or(self.clock.borrow().timestamp_ns()),
355            self.clock.borrow().timestamp_ns(),
356            false,
357            order.venue_order_id(),
358            order.account_id(),
359        ))
360    }
361
362    fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
363        OrderEventAny::Canceled(OrderCanceled::new(
364            order.trader_id(),
365            order.strategy_id(),
366            order.instrument_id(),
367            order.client_order_id(),
368            UUID4::new(),
369            report.ts_last,
370            self.clock.borrow().timestamp_ns(),
371            false,
372            order.venue_order_id(),
373            order.account_id(),
374        ))
375    }
376
377    #[allow(dead_code)]
378    fn create_order_canceled_simple(&self, order: &OrderAny, ts_event: UnixNanos) -> OrderEventAny {
379        OrderEventAny::Canceled(OrderCanceled::new(
380            order.trader_id(),
381            order.strategy_id(),
382            order.instrument_id(),
383            order.client_order_id(),
384            UUID4::new(),
385            ts_event,
386            self.clock.borrow().timestamp_ns(),
387            false,
388            order.venue_order_id(),
389            order.account_id(),
390        ))
391    }
392
393    fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
394        OrderEventAny::Expired(OrderExpired::new(
395            order.trader_id(),
396            order.strategy_id(),
397            order.instrument_id(),
398            order.client_order_id(),
399            UUID4::new(),
400            report.ts_last,
401            self.clock.borrow().timestamp_ns(),
402            false,
403            order.venue_order_id(),
404            order.account_id(),
405        ))
406    }
407
408    fn create_order_fill(
409        &mut self,
410        order: &mut OrderAny,
411        fill: &FillReport,
412        instrument: &InstrumentAny,
413    ) -> Option<OrderEventAny> {
414        // Check if this fill was already processed
415        if self.processed_fills.contains_key(&fill.trade_id) {
416            return None;
417        }
418
419        // Mark this fill as processed
420        self.processed_fills
421            .insert(fill.trade_id, order.client_order_id());
422
423        Some(OrderEventAny::Filled(OrderFilled::new(
424            order.trader_id(),
425            order.strategy_id(),
426            order.instrument_id(),
427            order.client_order_id(),
428            fill.venue_order_id,
429            order.account_id().unwrap_or_default(),
430            fill.trade_id,
431            fill.order_side,
432            order.order_type(),
433            fill.last_qty,
434            fill.last_px,
435            instrument.quote_currency(),
436            fill.liquidity_side,
437            fill.report_id,
438            fill.ts_event,
439            self.clock.borrow().timestamp_ns(),
440            false,
441            fill.venue_position_id,
442            Some(fill.commission),
443        )))
444    }
445}
446
447////////////////////////////////////////////////////////////////////////////////
448// Tests
449////////////////////////////////////////////////////////////////////////////////
450
451#[cfg(test)]
452mod tests {
453    use std::{cell::RefCell, rc::Rc};
454
455    use nautilus_common::{cache::Cache, clock::TestClock};
456    use nautilus_core::{UUID4, UnixNanos};
457    use nautilus_model::{
458        enums::OrderStatus,
459        identifiers::{AccountId, ClientId, ClientOrderId, VenueOrderId},
460        reports::ExecutionMassStatus,
461        types::Quantity,
462    };
463    use rstest::rstest;
464
465    use super::*;
466
467    fn create_test_manager() -> ReconciliationManager {
468        let clock = Rc::new(RefCell::new(TestClock::new()));
469        let cache = Rc::new(RefCell::new(Cache::default()));
470        let config = ReconciliationConfig::default();
471        ReconciliationManager::new(clock, cache, config)
472    }
473
474    #[rstest]
475    fn test_reconciliation_manager_new() {
476        let manager = create_test_manager();
477        assert_eq!(manager.inflight_checks.len(), 0);
478        assert_eq!(manager.external_order_claims.len(), 0);
479        assert_eq!(manager.processed_fills.len(), 0);
480    }
481
482    #[rstest]
483    fn test_register_inflight() {
484        let mut manager = create_test_manager();
485        let client_order_id = ClientOrderId::from("O-123456");
486
487        manager.register_inflight(client_order_id);
488
489        assert_eq!(manager.inflight_checks.len(), 1);
490        assert!(manager.inflight_checks.contains_key(&client_order_id));
491    }
492
493    #[rstest]
494    fn test_claim_external_orders() {
495        let mut manager = create_test_manager();
496        let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
497        let strategy_id = StrategyId::from("STRATEGY-001");
498
499        manager.claim_external_orders(instrument_id, strategy_id);
500
501        assert_eq!(manager.external_order_claims.len(), 1);
502        assert_eq!(
503            manager.external_order_claims.get(&instrument_id),
504            Some(&strategy_id)
505        );
506    }
507
508    #[rstest]
509    fn test_reconcile_report_removes_from_inflight() {
510        let mut manager = create_test_manager();
511        let client_order_id = ClientOrderId::from("O-123456");
512
513        // Register as inflight
514        manager.register_inflight(client_order_id);
515        assert_eq!(manager.inflight_checks.len(), 1);
516
517        // Create execution report
518        let report = ExecutionReport {
519            client_order_id,
520            venue_order_id: Some(VenueOrderId::from("V-123456")),
521            status: OrderStatus::Accepted,
522            filled_qty: Quantity::from(0),
523            avg_px: None,
524            ts_event: UnixNanos::default(),
525        };
526
527        // Reconcile should remove from inflight checks
528        manager.reconcile_report(report);
529        assert_eq!(manager.inflight_checks.len(), 0);
530    }
531
532    #[rstest]
533    fn test_check_inflight_orders_generates_rejection_after_max_retries() {
534        let clock = Rc::new(RefCell::new(TestClock::new()));
535        let cache = Rc::new(RefCell::new(Cache::default()));
536        let config = ReconciliationConfig {
537            inflight_threshold_ms: 100,
538            inflight_max_retries: 2,
539            ..ReconciliationConfig::default()
540        };
541        let mut manager = ReconciliationManager::new(clock.clone(), cache.clone(), config);
542
543        let client_order_id = ClientOrderId::from("O-123456");
544        manager.register_inflight(client_order_id);
545
546        // First check - should increment retry count
547        clock
548            .borrow_mut()
549            .advance_time(UnixNanos::from(200_000_000), true);
550        let events = manager.check_inflight_orders();
551        assert_eq!(events.len(), 0);
552        assert_eq!(
553            manager
554                .inflight_checks
555                .get(&client_order_id)
556                .unwrap()
557                .retry_count,
558            1
559        );
560
561        // Second check - should hit max retries and generate rejection
562        clock
563            .borrow_mut()
564            .advance_time(UnixNanos::from(400_000_000), true);
565        let events = manager.check_inflight_orders();
566        assert_eq!(events.len(), 0); // Would generate rejection if order existed in cache
567        assert!(!manager.inflight_checks.contains_key(&client_order_id));
568    }
569
570    #[tokio::test]
571    async fn test_reconcile_execution_mass_status_with_empty() {
572        let mut manager = create_test_manager();
573        let account_id = AccountId::from("ACCOUNT-001");
574        let venue = nautilus_model::identifiers::Venue::from("BINANCE");
575
576        let client_id = ClientId::from("BINANCE");
577        let mass_status = ExecutionMassStatus::new(
578            client_id,
579            account_id,
580            venue,
581            UnixNanos::default(),
582            Some(UUID4::new()),
583        );
584
585        let events = manager.reconcile_execution_mass_status(mass_status).await;
586        assert_eq!(events.len(), 0);
587    }
588
589    #[rstest]
590    fn test_reconciliation_config_default() {
591        let config = ReconciliationConfig::default();
592
593        assert_eq!(config.lookback_mins, Some(60));
594        assert_eq!(config.inflight_threshold_ms, 5000);
595        assert_eq!(config.inflight_max_retries, 3);
596        assert!(config.filter_unclaimed_external);
597        assert!(!config.generate_missing_orders);
598    }
599}