1use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25 cache::Cache,
26 clock::Clock,
27 enums::LogColor,
28 log_info,
29 messages::execution::report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
30};
31use nautilus_core::{UUID4, UnixNanos};
32use nautilus_execution::client::ExecutionClient;
33use nautilus_model::{
34 enums::OrderStatus,
35 events::{
36 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderInitialized,
37 OrderRejected, OrderTriggered,
38 },
39 identifiers::{
40 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
41 },
42 instruments::{Instrument, InstrumentAny},
43 orders::{Order, OrderAny},
44 position::Position,
45 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
46 types::Quantity,
47};
48use rust_decimal::Decimal;
49use ustr::Ustr;
50
51use crate::config::LiveExecEngineConfig;
52
53#[inline]
55fn is_order_status_open(status: OrderStatus) -> bool {
56 matches!(
57 status,
58 OrderStatus::Accepted
59 | OrderStatus::Triggered
60 | OrderStatus::PendingCancel
61 | OrderStatus::PendingUpdate
62 | OrderStatus::PartiallyFilled
63 )
64}
65
66#[derive(Debug, Clone)]
68pub struct ExecutionManagerConfig {
69 pub trader_id: TraderId,
71 pub reconciliation: bool,
73 pub reconciliation_startup_delay_secs: f64,
75 pub lookback_mins: Option<u64>,
77 pub reconciliation_instrument_ids: AHashSet<InstrumentId>,
79 pub filter_unclaimed_external: bool,
81 pub filter_position_reports: bool,
83 pub filtered_client_order_ids: AHashSet<ClientOrderId>,
85 pub generate_missing_orders: bool,
87 pub inflight_check_interval_ms: u32,
89 pub inflight_threshold_ms: u64,
91 pub inflight_max_retries: u32,
93 pub open_check_interval_secs: Option<f64>,
95 pub open_check_lookback_mins: Option<u64>,
97 pub open_check_threshold_ns: u64,
99 pub open_check_missing_retries: u32,
101 pub open_check_open_only: bool,
103 pub max_single_order_queries_per_cycle: u32,
105 pub single_order_query_delay_ms: u32,
107 pub position_check_interval_secs: Option<f64>,
109 pub position_check_lookback_mins: u64,
111 pub position_check_threshold_ns: u64,
113 pub purge_closed_orders_buffer_mins: Option<u32>,
115 pub purge_closed_positions_buffer_mins: Option<u32>,
117 pub purge_account_events_lookback_mins: Option<u32>,
119 pub purge_from_database: bool,
121}
122
123impl Default for ExecutionManagerConfig {
124 fn default() -> Self {
125 Self {
126 trader_id: TraderId::default(),
127 reconciliation: true,
128 reconciliation_startup_delay_secs: 10.0,
129 lookback_mins: Some(60),
130 reconciliation_instrument_ids: AHashSet::new(),
131 filter_unclaimed_external: false,
132 filter_position_reports: false,
133 filtered_client_order_ids: AHashSet::new(),
134 generate_missing_orders: true,
135 inflight_check_interval_ms: 2_000,
136 inflight_threshold_ms: 5_000,
137 inflight_max_retries: 5,
138 open_check_interval_secs: None,
139 open_check_lookback_mins: Some(60),
140 open_check_threshold_ns: 5_000_000_000,
141 open_check_missing_retries: 5,
142 open_check_open_only: true,
143 max_single_order_queries_per_cycle: 5,
144 single_order_query_delay_ms: 100,
145 position_check_interval_secs: None,
146 position_check_lookback_mins: 60,
147 position_check_threshold_ns: 60_000_000_000,
148 purge_closed_orders_buffer_mins: None,
149 purge_closed_positions_buffer_mins: None,
150 purge_account_events_lookback_mins: None,
151 purge_from_database: false,
152 }
153 }
154}
155
156impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
157 fn from(config: &LiveExecEngineConfig) -> Self {
158 let filtered_client_order_ids: AHashSet<ClientOrderId> = config
159 .filtered_client_order_ids
160 .clone()
161 .unwrap_or_default()
162 .into_iter()
163 .map(|value| ClientOrderId::from(value.as_str()))
164 .collect();
165
166 let reconciliation_instrument_ids: AHashSet<InstrumentId> = config
167 .reconciliation_instrument_ids
168 .clone()
169 .unwrap_or_default()
170 .into_iter()
171 .map(|value| InstrumentId::from(value.as_str()))
172 .collect();
173
174 Self {
175 trader_id: TraderId::default(), reconciliation: config.reconciliation,
177 reconciliation_startup_delay_secs: config.reconciliation_startup_delay_secs,
178 lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
179 reconciliation_instrument_ids,
180 filter_unclaimed_external: config.filter_unclaimed_external_orders,
181 filter_position_reports: config.filter_position_reports,
182 filtered_client_order_ids,
183 generate_missing_orders: config.generate_missing_orders,
184 inflight_check_interval_ms: config.inflight_check_interval_ms,
185 inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
186 inflight_max_retries: config.inflight_check_retries,
187 open_check_interval_secs: config.open_check_interval_secs,
188 open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
189 open_check_threshold_ns: (config.open_check_threshold_ms as u64) * 1_000_000,
190 open_check_missing_retries: config.open_check_missing_retries,
191 open_check_open_only: config.open_check_open_only,
192 max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
193 single_order_query_delay_ms: config.single_order_query_delay_ms,
194 position_check_interval_secs: config.position_check_interval_secs,
195 position_check_lookback_mins: config.position_check_lookback_mins as u64,
196 position_check_threshold_ns: (config.position_check_threshold_ms as u64) * 1_000_000,
197 purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
198 purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
199 purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
200 purge_from_database: config.purge_from_database,
201 }
202 }
203}
204
205impl ExecutionManagerConfig {
206 #[must_use]
208 pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
209 self.trader_id = trader_id;
210 self
211 }
212}
213
214#[derive(Debug, Clone)]
217pub struct ExecutionReport {
218 pub client_order_id: ClientOrderId,
219 pub venue_order_id: Option<VenueOrderId>,
220 pub status: OrderStatus,
221 pub filled_qty: Quantity,
222 pub avg_px: Option<f64>,
223 pub ts_event: UnixNanos,
224}
225
226#[derive(Debug, Clone)]
228struct InflightCheck {
229 #[allow(dead_code)]
230 pub client_order_id: ClientOrderId,
231 pub ts_submitted: UnixNanos,
232 pub retry_count: u32,
233 pub last_query_ts: Option<UnixNanos>,
234}
235
236#[derive(Clone)]
259pub struct ExecutionManager {
260 clock: Rc<RefCell<dyn Clock>>,
261 cache: Rc<RefCell<Cache>>,
262 config: ExecutionManagerConfig,
263 inflight_checks: AHashMap<ClientOrderId, InflightCheck>,
264 external_order_claims: AHashMap<InstrumentId, StrategyId>,
265 processed_fills: AHashMap<TradeId, ClientOrderId>,
266 recon_check_retries: AHashMap<ClientOrderId, u32>,
267 ts_last_query: AHashMap<ClientOrderId, UnixNanos>,
268 order_local_activity_ns: AHashMap<ClientOrderId, UnixNanos>,
269 position_local_activity_ns: AHashMap<InstrumentId, UnixNanos>,
270 recent_fills_cache: AHashMap<TradeId, UnixNanos>,
271}
272
273impl Debug for ExecutionManager {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 f.debug_struct(stringify!(ExecutionManager))
276 .field("config", &self.config)
277 .field("inflight_checks", &self.inflight_checks)
278 .field("external_order_claims", &self.external_order_claims)
279 .field("processed_fills", &self.processed_fills)
280 .field("recon_check_retries", &self.recon_check_retries)
281 .finish()
282 }
283}
284
285impl ExecutionManager {
286 pub fn new(
288 clock: Rc<RefCell<dyn Clock>>,
289 cache: Rc<RefCell<Cache>>,
290 config: ExecutionManagerConfig,
291 ) -> Self {
292 Self {
293 clock,
294 cache,
295 config,
296 inflight_checks: AHashMap::new(),
297 external_order_claims: AHashMap::new(),
298 processed_fills: AHashMap::new(),
299 recon_check_retries: AHashMap::new(),
300 ts_last_query: AHashMap::new(),
301 order_local_activity_ns: AHashMap::new(),
302 position_local_activity_ns: AHashMap::new(),
303 recent_fills_cache: AHashMap::new(),
304 }
305 }
306
307 pub async fn reconcile_execution_mass_status(
309 &mut self,
310 mass_status: ExecutionMassStatus,
311 ) -> Vec<OrderEventAny> {
312 let venue = mass_status.venue;
313 let order_count = mass_status.order_reports().len();
314 let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
315 let position_count = mass_status.position_reports().len();
316
317 log_info!(
318 "Reconciling ExecutionMassStatus for {}",
319 venue,
320 color = LogColor::Blue
321 );
322 log_info!(
323 "Received {} order(s), {} fill(s), {} position(s)",
324 order_count,
325 fill_count,
326 position_count,
327 color = LogColor::Blue
328 );
329
330 let mut events = Vec::new();
331 let mut orders_reconciled = 0usize;
332 let mut external_orders_created = 0usize;
333 let mut open_orders_initialized = 0usize;
334 let mut orders_skipped_no_instrument = 0usize;
335 let mut fills_applied = 0usize;
336
337 for report in mass_status.order_reports().values() {
339 if let Some(client_order_id) = &report.client_order_id {
340 if let Some(order) = self.get_order(client_order_id) {
341 let mut order = order;
342 log::info!(
343 color = LogColor::Blue as u8;
344 "Reconciling {} {} {} [{}] -> [{}]",
345 client_order_id,
346 report.venue_order_id,
347 report.instrument_id,
348 order.status(),
349 report.order_status,
350 );
351 if let Some(event) = self.reconcile_order_report(&mut order, report) {
352 orders_reconciled += 1;
353 events.push(event);
354 }
355 } else if !self.config.filter_unclaimed_external {
356 if self.get_instrument(&report.instrument_id).is_none() {
358 orders_skipped_no_instrument += 1;
359 } else {
360 let external_events =
361 self.handle_external_order(report, &mass_status.account_id);
362 if !external_events.is_empty() {
363 external_orders_created += 1;
364 if is_order_status_open(report.order_status) {
365 open_orders_initialized += 1;
366 }
367 events.extend(external_events);
368 }
369 }
370 }
371 } else if !self.config.filter_unclaimed_external {
372 if self.get_instrument(&report.instrument_id).is_none() {
373 orders_skipped_no_instrument += 1;
374 } else {
375 let external_events =
376 self.handle_external_order(report, &mass_status.account_id);
377 if !external_events.is_empty() {
378 external_orders_created += 1;
379 if is_order_status_open(report.order_status) {
380 open_orders_initialized += 1;
381 }
382 events.extend(external_events);
383 }
384 }
385 }
386 }
387
388 let fill_reports = mass_status.fill_reports();
390 let mut all_fills: Vec<&FillReport> = fill_reports.values().flatten().collect();
391 all_fills.sort_by_key(|f| f.ts_event);
392
393 for fill in all_fills {
394 if let Some(client_order_id) = &fill.client_order_id
395 && let Some(order) = self.get_order(client_order_id)
396 {
397 let mut order = order;
398 let instrument_id = order.instrument_id();
399
400 if let Some(instrument) = self.get_instrument(&instrument_id)
401 && let Some(event) = self.create_order_fill(&mut order, fill, &instrument)
402 {
403 fills_applied += 1;
404 events.push(event);
405 }
406 }
407 }
408
409 if orders_skipped_no_instrument > 0 {
410 log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
411 }
412
413 log::info!(
414 color = LogColor::Blue as u8;
415 "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}",
416 );
417
418 events
419 }
420
421 pub fn reconcile_report(
427 &mut self,
428 report: ExecutionReport,
429 ) -> anyhow::Result<Vec<OrderEventAny>> {
430 let mut events = Vec::new();
431
432 self.clear_recon_tracking(&report.client_order_id, true);
433
434 if let Some(order) = self.get_order(&report.client_order_id) {
435 let mut order = order;
436 let Some(account_id) = order.account_id() else {
437 log::error!("Cannot process fill report: order has no account_id");
438 return Ok(vec![]);
439 };
440 let Some(venue_order_id) = report.venue_order_id else {
441 log::error!("Cannot process fill report: report has no venue_order_id");
442 return Ok(vec![]);
443 };
444 let mut order_report = OrderStatusReport::new(
445 account_id,
446 order.instrument_id(),
447 Some(report.client_order_id),
448 venue_order_id,
449 order.order_side(),
450 order.order_type(),
451 order.time_in_force(),
452 report.status,
453 order.quantity(),
454 report.filled_qty,
455 report.ts_event, report.ts_event, self.clock.borrow().timestamp_ns(),
458 Some(UUID4::new()),
459 );
460
461 if let Some(avg_px) = report.avg_px {
462 order_report = order_report.with_avg_px(avg_px)?;
463 }
464
465 if let Some(event) = self.reconcile_order_report(&mut order, &order_report) {
466 events.push(event);
467 }
468 }
469
470 Ok(events)
471 }
472
473 pub fn check_inflight_orders(&mut self) -> Vec<OrderEventAny> {
475 let mut events = Vec::new();
476 let current_time = self.clock.borrow().timestamp_ns();
477 let threshold_ns = self.config.inflight_threshold_ms * 1_000_000;
478
479 let mut to_check = Vec::new();
480 for (client_order_id, check) in &self.inflight_checks {
481 if current_time - check.ts_submitted > threshold_ns {
482 to_check.push(*client_order_id);
483 }
484 }
485
486 for client_order_id in to_check {
487 if self
488 .config
489 .filtered_client_order_ids
490 .contains(&client_order_id)
491 {
492 continue;
493 }
494
495 if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
496 if let Some(last_query_ts) = check.last_query_ts
497 && current_time - last_query_ts < threshold_ns
498 {
499 continue;
500 }
501
502 check.retry_count += 1;
503 check.last_query_ts = Some(current_time);
504 self.ts_last_query.insert(client_order_id, current_time);
505 self.recon_check_retries
506 .insert(client_order_id, check.retry_count);
507
508 if check.retry_count >= self.config.inflight_max_retries {
509 if let Some(order) = self.get_order(&client_order_id) {
511 events.push(self.create_order_rejected(&order, Some("INFLIGHT_TIMEOUT")));
512 }
513 self.clear_recon_tracking(&client_order_id, true);
515 }
516 }
517 }
518
519 events
520 }
521
522 pub async fn check_open_orders(
532 &mut self,
533 clients: &[Rc<dyn ExecutionClient>],
534 ) -> Vec<OrderEventAny> {
535 log::debug!("Checking order consistency between cached-state and venues");
536
537 let filtered_orders: Vec<OrderAny> = {
538 let cache = self.cache.borrow();
539 let open_orders = cache.orders_open(None, None, None, None);
540
541 if !self.config.reconciliation_instrument_ids.is_empty() {
542 open_orders
543 .iter()
544 .filter(|o| {
545 self.config
546 .reconciliation_instrument_ids
547 .contains(&o.instrument_id())
548 })
549 .map(|o| (*o).clone())
550 .collect()
551 } else {
552 open_orders.iter().map(|o| (*o).clone()).collect()
553 }
554 };
555
556 log::debug!(
557 "Found {} order{} open in cache",
558 filtered_orders.len(),
559 if filtered_orders.len() == 1 { "" } else { "s" }
560 );
561
562 let mut all_reports = Vec::new();
563 let mut venue_reported_ids = AHashSet::new();
564
565 for client in clients {
566 let cmd = GenerateOrderStatusReports::new(
567 UUID4::new(),
568 self.clock.borrow().timestamp_ns(),
569 true, None, None, None, None, None, );
576
577 match client.generate_order_status_reports(&cmd).await {
578 Ok(reports) => {
579 for report in reports {
580 if let Some(client_order_id) = &report.client_order_id {
581 venue_reported_ids.insert(*client_order_id);
582 }
583 all_reports.push(report);
584 }
585 }
586 Err(e) => {
587 log::error!(
588 "Failed to query order reports from {}: {e}",
589 client.client_id()
590 );
591 }
592 }
593 }
594
595 let mut events = Vec::new();
597 for report in all_reports {
598 if let Some(client_order_id) = &report.client_order_id
599 && let Some(mut order) = self.get_order(client_order_id)
600 && let Some(event) = self.reconcile_order_report(&mut order, &report)
601 {
602 events.push(event);
603 }
604 }
605
606 if !self.config.open_check_open_only {
608 let cached_ids: AHashSet<ClientOrderId> = filtered_orders
609 .iter()
610 .map(|o| o.client_order_id())
611 .collect();
612 let missing_at_venue: AHashSet<ClientOrderId> = cached_ids
613 .difference(&venue_reported_ids)
614 .copied()
615 .collect();
616
617 for client_order_id in missing_at_venue {
618 events.extend(self.handle_missing_order(client_order_id));
619 }
620 }
621
622 events
623 }
624
625 pub async fn check_positions_consistency(
635 &mut self,
636 clients: &[Rc<dyn ExecutionClient>],
637 ) -> Vec<OrderEventAny> {
638 log::debug!("Checking position consistency between cached-state and venues");
639
640 let open_positions = {
641 let cache = self.cache.borrow();
642 let positions = cache.positions_open(None, None, None, None);
643
644 if !self.config.reconciliation_instrument_ids.is_empty() {
645 positions
646 .iter()
647 .filter(|p| {
648 self.config
649 .reconciliation_instrument_ids
650 .contains(&p.instrument_id)
651 })
652 .map(|p| (*p).clone())
653 .collect::<Vec<_>>()
654 } else {
655 positions.iter().map(|p| (*p).clone()).collect()
656 }
657 };
658
659 log::debug!(
660 "Found {} position{} to check",
661 open_positions.len(),
662 if open_positions.len() == 1 { "" } else { "s" }
663 );
664
665 let mut venue_positions = AHashMap::new();
667
668 for client in clients {
669 let cmd = GeneratePositionStatusReports::new(
670 UUID4::new(),
671 self.clock.borrow().timestamp_ns(),
672 None, None, None, None, None, );
678
679 match client.generate_position_status_reports(&cmd).await {
680 Ok(reports) => {
681 for report in reports {
682 venue_positions.insert(report.instrument_id, report);
683 }
684 }
685 Err(e) => {
686 log::error!(
687 "Failed to query position reports from {}: {e}",
688 client.client_id()
689 );
690 }
691 }
692 }
693
694 let mut events = Vec::new();
696
697 for position in &open_positions {
698 if !self.config.reconciliation_instrument_ids.is_empty()
700 && !self
701 .config
702 .reconciliation_instrument_ids
703 .contains(&position.instrument_id)
704 {
705 continue;
706 }
707
708 let venue_report = venue_positions.get(&position.instrument_id);
709
710 if let Some(discrepancy_events) =
711 self.check_position_discrepancy(position, venue_report)
712 {
713 events.extend(discrepancy_events);
714 }
715 }
716
717 events
718 }
719
720 pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
722 let ts_submitted = self.clock.borrow().timestamp_ns();
723 self.inflight_checks.insert(
724 client_order_id,
725 InflightCheck {
726 client_order_id,
727 ts_submitted,
728 retry_count: 0,
729 last_query_ts: None,
730 },
731 );
732 self.recon_check_retries.insert(client_order_id, 0);
733 self.ts_last_query.remove(&client_order_id);
734 self.order_local_activity_ns.remove(&client_order_id);
735 }
736
737 pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
739 self.order_local_activity_ns
740 .insert(client_order_id, ts_event);
741 }
742
743 pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
745 self.inflight_checks.remove(client_order_id);
746 self.recon_check_retries.remove(client_order_id);
747 if drop_last_query {
748 self.ts_last_query.remove(client_order_id);
749 }
750 self.order_local_activity_ns.remove(client_order_id);
751 }
752
753 pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
755 self.external_order_claims
756 .insert(instrument_id, strategy_id);
757 }
758
759 pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
761 self.position_local_activity_ns
762 .insert(instrument_id, ts_event);
763 }
764
765 pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
767 self.recent_fills_cache.contains_key(trade_id)
768 }
769
770 pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
772 let ts_now = self.clock.borrow().timestamp_ns();
773 self.recent_fills_cache.insert(trade_id, ts_now);
774 }
775
776 pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
780 let ts_now = self.clock.borrow().timestamp_ns();
781 let ttl_ns = (ttl_secs * 1_000_000_000.0) as u64;
782
783 self.recent_fills_cache
784 .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
785 }
786
787 pub fn purge_closed_orders(&mut self) {
789 let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
790 return;
791 };
792
793 let ts_now = self.clock.borrow().timestamp_ns();
794 let buffer_secs = (buffer_mins as u64) * 60;
795
796 self.cache
797 .borrow_mut()
798 .purge_closed_orders(ts_now, buffer_secs);
799 }
800
801 pub fn purge_closed_positions(&mut self) {
803 let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
804 return;
805 };
806
807 let ts_now = self.clock.borrow().timestamp_ns();
808 let buffer_secs = (buffer_mins as u64) * 60;
809
810 self.cache
811 .borrow_mut()
812 .purge_closed_positions(ts_now, buffer_secs);
813 }
814
815 pub fn purge_account_events(&mut self) {
817 let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
818 return;
819 };
820
821 let ts_now = self.clock.borrow().timestamp_ns();
822 let lookback_secs = (lookback_mins as u64) * 60;
823
824 self.cache
825 .borrow_mut()
826 .purge_account_events(ts_now, lookback_secs);
827 }
828
829 fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
832 self.cache.borrow().order(client_order_id).cloned()
833 }
834
835 fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
836 self.cache.borrow().instrument(instrument_id).cloned()
837 }
838
839 fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
840 let mut events = Vec::new();
841
842 let Some(order) = self.get_order(&client_order_id) else {
843 return events;
844 };
845
846 let ts_now = self.clock.borrow().timestamp_ns();
847 let ts_last = order.ts_last();
848
849 if (ts_now - ts_last) < self.config.open_check_threshold_ns {
851 return events;
852 }
853
854 if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
856 && (ts_now - last_activity) < self.config.open_check_threshold_ns
857 {
858 return events;
859 }
860
861 let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
863 *retries += 1;
864
865 if *retries >= self.config.open_check_missing_retries {
867 log::warn!(
868 "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
869 );
870
871 let rejected = self.create_order_rejected(&order, Some("NOT_FOUND_AT_VENUE"));
872 events.push(rejected);
873
874 self.clear_recon_tracking(&client_order_id, true);
875 } else {
876 log::debug!(
877 "Order {} not found at venue, retry {}/{}",
878 client_order_id,
879 retries,
880 self.config.open_check_missing_retries
881 );
882 }
883
884 events
885 }
886
887 fn check_position_discrepancy(
888 &mut self,
889 position: &Position,
890 venue_report: Option<&PositionStatusReport>,
891 ) -> Option<Vec<OrderEventAny>> {
892 let cached_qty = position.quantity.as_decimal();
893
894 let venue_qty = if let Some(report) = venue_report {
895 report.quantity.as_decimal()
896 } else {
897 Decimal::ZERO
898 };
899
900 let tolerance = Decimal::from_str("0.00000001").unwrap();
902 if (cached_qty - venue_qty).abs() <= tolerance {
903 return None; }
905
906 let ts_now = self.clock.borrow().timestamp_ns();
908 if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
909 && (ts_now - last_activity) < self.config.position_check_threshold_ns
910 {
911 log::debug!(
912 "Skipping position reconciliation for {}: recent activity within threshold",
913 position.instrument_id
914 );
915 return None;
916 }
917
918 log::warn!(
919 "Position discrepancy detected for {}: cached_qty={}, venue_qty={}",
920 position.instrument_id,
921 cached_qty,
922 venue_qty
923 );
924
925 None
928 }
929
930 fn reconcile_order_report(
931 &mut self,
932 order: &mut OrderAny,
933 report: &OrderStatusReport,
934 ) -> Option<OrderEventAny> {
935 if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
937 return None; }
939
940 let event = match report.order_status {
941 OrderStatus::Accepted => self.create_order_accepted(order, report),
942 OrderStatus::Rejected => {
943 self.create_order_rejected(order, report.cancel_reason.as_deref())
944 }
945 OrderStatus::Triggered => self.create_order_triggered(order, report),
946 OrderStatus::Canceled => self.create_order_canceled(order, report),
947 OrderStatus::Expired => self.create_order_expired(order, report),
948 _ => return None,
949 };
950
951 Some(event)
952 }
953
954 fn handle_external_order(
955 &mut self,
956 report: &OrderStatusReport,
957 account_id: &AccountId,
958 ) -> Vec<OrderEventAny> {
959 let (strategy_id, tags) =
960 if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
961 let order_id = report
962 .client_order_id
963 .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
964 log::info!(
965 color = LogColor::Blue as u8;
966 "External order {} for {} claimed by strategy {}",
967 order_id,
968 report.instrument_id,
969 claimed_strategy,
970 );
971 (*claimed_strategy, None)
972 } else {
973 (
975 StrategyId::from("EXTERNAL"),
976 Some(vec![Ustr::from("VENUE")]),
977 )
978 };
979
980 let client_order_id = report
981 .client_order_id
982 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
983
984 let ts_now = self.clock.borrow().timestamp_ns();
985
986 let initialized = OrderInitialized::new(
987 self.config.trader_id,
988 strategy_id,
989 report.instrument_id,
990 client_order_id,
991 report.order_side,
992 report.order_type,
993 report.quantity,
994 report.time_in_force,
995 report.post_only,
996 report.reduce_only,
997 false, true, UUID4::new(),
1000 ts_now,
1001 ts_now,
1002 report.price,
1003 report.trigger_price,
1004 report.trigger_type,
1005 report.limit_offset,
1006 report.trailing_offset,
1007 Some(report.trailing_offset_type),
1008 report.expire_time,
1009 report.display_qty,
1010 None, None, Some(report.contingency_type),
1013 report.order_list_id,
1014 report.linked_order_ids.clone(),
1015 report.parent_order_id,
1016 None, None, None, tags,
1020 );
1021
1022 let events = vec![OrderEventAny::Initialized(initialized)];
1023 let order = match OrderAny::from_events(events) {
1024 Ok(order) => order,
1025 Err(e) => {
1026 log::error!("Failed to create order from report: {e}");
1027 return Vec::new();
1028 }
1029 };
1030
1031 {
1032 let mut cache = self.cache.borrow_mut();
1033 if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1034 log::error!("Failed to add external order to cache: {e}");
1035 return Vec::new();
1036 }
1037
1038 if let Err(e) =
1039 cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
1040 {
1041 log::warn!("Failed to add venue order ID index: {e}");
1042 }
1043 }
1044
1045 log::info!(
1046 color = LogColor::Blue as u8;
1047 "Created external order {} ({}) for {} [{}]",
1048 client_order_id,
1049 report.venue_order_id,
1050 report.instrument_id,
1051 report.order_status,
1052 );
1053
1054 self.generate_external_order_status_events(&order, report, account_id)
1055 }
1056
1057 fn generate_external_order_status_events(
1063 &self,
1064 order: &OrderAny,
1065 report: &OrderStatusReport,
1066 account_id: &AccountId,
1067 ) -> Vec<OrderEventAny> {
1068 let ts_now = self.clock.borrow().timestamp_ns();
1069
1070 let accepted = OrderEventAny::Accepted(OrderAccepted::new(
1071 order.trader_id(),
1072 order.strategy_id(),
1073 order.instrument_id(),
1074 order.client_order_id(),
1075 report.venue_order_id,
1076 *account_id,
1077 UUID4::new(),
1078 report.ts_accepted,
1079 ts_now,
1080 true, ));
1082
1083 match report.order_status {
1084 OrderStatus::Accepted | OrderStatus::PartiallyFilled | OrderStatus::Filled => {
1085 vec![accepted]
1087 }
1088 OrderStatus::Canceled => {
1089 let canceled = OrderEventAny::Canceled(OrderCanceled::new(
1091 order.trader_id(),
1092 order.strategy_id(),
1093 order.instrument_id(),
1094 order.client_order_id(),
1095 UUID4::new(),
1096 report.ts_last,
1097 ts_now,
1098 true, Some(report.venue_order_id),
1100 Some(*account_id),
1101 ));
1102 vec![accepted, canceled]
1103 }
1104 OrderStatus::Expired => {
1105 let expired = OrderEventAny::Expired(OrderExpired::new(
1107 order.trader_id(),
1108 order.strategy_id(),
1109 order.instrument_id(),
1110 order.client_order_id(),
1111 UUID4::new(),
1112 report.ts_last,
1113 ts_now,
1114 true, Some(report.venue_order_id),
1116 Some(*account_id),
1117 ));
1118 vec![accepted, expired]
1119 }
1120 OrderStatus::Rejected => {
1121 vec![OrderEventAny::Rejected(OrderRejected::new(
1123 order.trader_id(),
1124 order.strategy_id(),
1125 order.instrument_id(),
1126 order.client_order_id(),
1127 *account_id,
1128 Ustr::from(report.cancel_reason.as_deref().unwrap_or("UNKNOWN")),
1129 UUID4::new(),
1130 report.ts_last,
1131 ts_now,
1132 true, false,
1134 ))]
1135 }
1136 OrderStatus::Triggered => {
1137 vec![accepted]
1139 }
1140 _ => {
1141 log::warn!(
1142 "Unhandled order status {} for external order {}",
1143 report.order_status,
1144 order.client_order_id()
1145 );
1146 Vec::new()
1147 }
1148 }
1149 }
1150
1151 fn create_order_accepted(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
1152 OrderEventAny::Accepted(OrderAccepted::new(
1153 order.trader_id(),
1154 order.strategy_id(),
1155 order.instrument_id(),
1156 order.client_order_id(),
1157 order.venue_order_id().unwrap_or(report.venue_order_id),
1158 order
1159 .account_id()
1160 .expect("Order should have account_id when creating accepted event"),
1161 UUID4::new(),
1162 report.ts_accepted,
1163 self.clock.borrow().timestamp_ns(),
1164 false,
1165 ))
1166 }
1167
1168 fn create_order_rejected(&self, order: &OrderAny, reason: Option<&str>) -> OrderEventAny {
1169 let reason = reason.unwrap_or("UNKNOWN");
1170 OrderEventAny::Rejected(OrderRejected::new(
1171 order.trader_id(),
1172 order.strategy_id(),
1173 order.instrument_id(),
1174 order.client_order_id(),
1175 order
1176 .account_id()
1177 .expect("Order should have account_id when creating rejected event"),
1178 Ustr::from(reason),
1179 UUID4::new(),
1180 self.clock.borrow().timestamp_ns(),
1181 self.clock.borrow().timestamp_ns(),
1182 false,
1183 false, ))
1185 }
1186
1187 fn create_order_triggered(
1188 &self,
1189 order: &OrderAny,
1190 report: &OrderStatusReport,
1191 ) -> OrderEventAny {
1192 OrderEventAny::Triggered(OrderTriggered::new(
1193 order.trader_id(),
1194 order.strategy_id(),
1195 order.instrument_id(),
1196 order.client_order_id(),
1197 UUID4::new(),
1198 report
1199 .ts_triggered
1200 .unwrap_or(self.clock.borrow().timestamp_ns()),
1201 self.clock.borrow().timestamp_ns(),
1202 false,
1203 order.venue_order_id(),
1204 order.account_id(),
1205 ))
1206 }
1207
1208 fn create_order_canceled(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
1209 OrderEventAny::Canceled(OrderCanceled::new(
1210 order.trader_id(),
1211 order.strategy_id(),
1212 order.instrument_id(),
1213 order.client_order_id(),
1214 UUID4::new(),
1215 report.ts_last,
1216 self.clock.borrow().timestamp_ns(),
1217 false,
1218 order.venue_order_id(),
1219 order.account_id(),
1220 ))
1221 }
1222
1223 fn create_order_expired(&self, order: &OrderAny, report: &OrderStatusReport) -> OrderEventAny {
1224 OrderEventAny::Expired(OrderExpired::new(
1225 order.trader_id(),
1226 order.strategy_id(),
1227 order.instrument_id(),
1228 order.client_order_id(),
1229 UUID4::new(),
1230 report.ts_last,
1231 self.clock.borrow().timestamp_ns(),
1232 false,
1233 order.venue_order_id(),
1234 order.account_id(),
1235 ))
1236 }
1237
1238 fn create_order_fill(
1239 &mut self,
1240 order: &mut OrderAny,
1241 fill: &FillReport,
1242 instrument: &InstrumentAny,
1243 ) -> Option<OrderEventAny> {
1244 if self.processed_fills.contains_key(&fill.trade_id) {
1245 return None;
1246 }
1247
1248 self.processed_fills
1249 .insert(fill.trade_id, order.client_order_id());
1250
1251 Some(OrderEventAny::Filled(OrderFilled::new(
1252 order.trader_id(),
1253 order.strategy_id(),
1254 order.instrument_id(),
1255 order.client_order_id(),
1256 fill.venue_order_id,
1257 fill.account_id,
1258 fill.trade_id,
1259 fill.order_side,
1260 order.order_type(),
1261 fill.last_qty,
1262 fill.last_px,
1263 instrument.quote_currency(),
1264 fill.liquidity_side,
1265 fill.report_id,
1266 fill.ts_event,
1267 self.clock.borrow().timestamp_ns(),
1268 false,
1269 fill.venue_position_id,
1270 Some(fill.commission),
1271 )))
1272 }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277 use std::{cell::RefCell, rc::Rc};
1278
1279 use nautilus_common::{cache::Cache, clock::TestClock};
1280 use nautilus_core::{UUID4, UnixNanos};
1281 use nautilus_model::{
1282 enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce},
1283 events::OrderEventAny,
1284 identifiers::{
1285 AccountId, ClientId, ClientOrderId, InstrumentId, TradeId, Venue, VenueOrderId,
1286 },
1287 instruments::stubs::audusd_sim,
1288 orders::{Order, OrderTestBuilder},
1289 reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
1290 types::{Money, Price, Quantity},
1291 };
1292 use rstest::rstest;
1293
1294 use super::*;
1295
1296 fn create_test_manager() -> ExecutionManager {
1297 let clock = Rc::new(RefCell::new(TestClock::new()));
1298 let cache = Rc::new(RefCell::new(Cache::default()));
1299 let config = ExecutionManagerConfig::default();
1300 ExecutionManager::new(clock, cache, config)
1301 }
1302
1303 #[rstest]
1304 fn test_reconciliation_manager_new() {
1305 let manager = create_test_manager();
1306 assert_eq!(manager.inflight_checks.len(), 0);
1307 assert_eq!(manager.external_order_claims.len(), 0);
1308 assert_eq!(manager.processed_fills.len(), 0);
1309 }
1310
1311 #[rstest]
1312 fn test_register_inflight() {
1313 let mut manager = create_test_manager();
1314 let client_order_id = ClientOrderId::from("O-123456");
1315
1316 manager.register_inflight(client_order_id);
1317
1318 assert_eq!(manager.inflight_checks.len(), 1);
1319 assert!(manager.inflight_checks.contains_key(&client_order_id));
1320 }
1321
1322 #[rstest]
1323 fn test_claim_external_orders() {
1324 let mut manager = create_test_manager();
1325 let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1326 let strategy_id = StrategyId::from("STRATEGY-001");
1327
1328 manager.claim_external_orders(instrument_id, strategy_id);
1329
1330 assert_eq!(manager.external_order_claims.len(), 1);
1331 assert_eq!(
1332 manager.external_order_claims.get(&instrument_id),
1333 Some(&strategy_id)
1334 );
1335 }
1336
1337 #[rstest]
1338 fn test_reconcile_report_removes_from_inflight() {
1339 let mut manager = create_test_manager();
1340 let client_order_id = ClientOrderId::from("O-123456");
1341
1342 manager.register_inflight(client_order_id);
1343 assert_eq!(manager.inflight_checks.len(), 1);
1344
1345 let report = ExecutionReport {
1346 client_order_id,
1347 venue_order_id: Some(VenueOrderId::from("V-123456")),
1348 status: OrderStatus::Accepted,
1349 filled_qty: Quantity::from(0),
1350 avg_px: None,
1351 ts_event: UnixNanos::default(),
1352 };
1353
1354 manager.reconcile_report(report).unwrap();
1356 assert_eq!(manager.inflight_checks.len(), 0);
1357 }
1358
1359 #[rstest]
1360 fn test_check_inflight_orders_generates_rejection_after_max_retries() {
1361 let clock = Rc::new(RefCell::new(TestClock::new()));
1362 let cache = Rc::new(RefCell::new(Cache::default()));
1363 let config = ExecutionManagerConfig {
1364 inflight_threshold_ms: 100,
1365 inflight_max_retries: 2,
1366 ..ExecutionManagerConfig::default()
1367 };
1368 let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1369
1370 let client_order_id = ClientOrderId::from("O-123456");
1371 manager.register_inflight(client_order_id);
1372
1373 clock
1375 .borrow_mut()
1376 .advance_time(UnixNanos::from(200_000_000), true);
1377 let events = manager.check_inflight_orders();
1378 assert_eq!(events.len(), 0);
1379 let first_check = manager
1380 .inflight_checks
1381 .get(&client_order_id)
1382 .expect("inflight check present");
1383 assert_eq!(first_check.retry_count, 1);
1384 let first_query_ts = first_check.last_query_ts.expect("last query recorded");
1385
1386 clock
1388 .borrow_mut()
1389 .advance_time(UnixNanos::from(400_000_000), true);
1390 let events = manager.check_inflight_orders();
1391 assert_eq!(events.len(), 0); assert!(!manager.inflight_checks.contains_key(&client_order_id));
1393 assert!(clock.borrow().timestamp_ns() > first_query_ts);
1395 }
1396
1397 #[rstest]
1398 fn test_check_inflight_orders_skips_recent_query() {
1399 let clock = Rc::new(RefCell::new(TestClock::new()));
1400 let cache = Rc::new(RefCell::new(Cache::default()));
1401 let config = ExecutionManagerConfig {
1402 inflight_threshold_ms: 100,
1403 inflight_max_retries: 3,
1404 ..ExecutionManagerConfig::default()
1405 };
1406 let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1407
1408 let client_order_id = ClientOrderId::from("O-ABCDEF");
1409 manager.register_inflight(client_order_id);
1410
1411 clock
1413 .borrow_mut()
1414 .advance_time(UnixNanos::from(200_000_000), true);
1415 let events = manager.check_inflight_orders();
1416 assert!(events.is_empty());
1417 let initial_check = manager
1418 .inflight_checks
1419 .get(&client_order_id)
1420 .expect("inflight check retained");
1421 assert_eq!(initial_check.retry_count, 1);
1422 let last_query_ts = initial_check.last_query_ts.expect("last query recorded");
1423
1424 clock
1426 .borrow_mut()
1427 .advance_time(UnixNanos::from(250_000_000), true);
1428 let events = manager.check_inflight_orders();
1429 assert!(events.is_empty());
1430 let second_check = manager
1431 .inflight_checks
1432 .get(&client_order_id)
1433 .expect("inflight check retained");
1434 assert_eq!(second_check.retry_count, 1);
1435 assert_eq!(second_check.last_query_ts, Some(last_query_ts));
1436 }
1437
1438 #[rstest]
1439 fn test_check_inflight_orders_skips_filtered_ids() {
1440 let clock = Rc::new(RefCell::new(TestClock::new()));
1441 let cache = Rc::new(RefCell::new(Cache::default()));
1442 let filtered_id = ClientOrderId::from("O-FILTERED");
1443 let mut config = ExecutionManagerConfig::default();
1444 config.filtered_client_order_ids.insert(filtered_id);
1445 config.inflight_threshold_ms = 100;
1446 let mut manager = ExecutionManager::new(clock.clone(), cache, config);
1447
1448 manager.register_inflight(filtered_id);
1449 clock
1450 .borrow_mut()
1451 .advance_time(UnixNanos::from(200_000_000), true);
1452 let events = manager.check_inflight_orders();
1453 assert!(events.is_empty());
1454 assert!(manager.inflight_checks.contains_key(&filtered_id));
1455 }
1456
1457 #[rstest]
1458 fn test_record_and_clear_tracking() {
1459 let mut manager = create_test_manager();
1460 let client_order_id = ClientOrderId::from("O-TRACK");
1461
1462 manager.register_inflight(client_order_id);
1463 let ts_now = UnixNanos::from(1_000_000);
1464 manager.record_local_activity(client_order_id, ts_now);
1465
1466 assert_eq!(
1467 manager
1468 .order_local_activity_ns
1469 .get(&client_order_id)
1470 .copied(),
1471 Some(ts_now)
1472 );
1473
1474 manager.clear_recon_tracking(&client_order_id, true);
1475 assert!(!manager.inflight_checks.contains_key(&client_order_id));
1476 assert!(
1477 !manager
1478 .order_local_activity_ns
1479 .contains_key(&client_order_id)
1480 );
1481 assert!(!manager.recon_check_retries.contains_key(&client_order_id));
1482 assert!(!manager.ts_last_query.contains_key(&client_order_id));
1483 }
1484
1485 #[tokio::test]
1486 async fn test_reconcile_execution_mass_status_with_empty() {
1487 let mut manager = create_test_manager();
1488 let account_id = AccountId::from("ACCOUNT-001");
1489 let venue = Venue::from("BINANCE");
1490
1491 let client_id = ClientId::from("BINANCE");
1492 let mass_status = ExecutionMassStatus::new(
1493 client_id,
1494 account_id,
1495 venue,
1496 UnixNanos::default(),
1497 Some(UUID4::new()),
1498 );
1499
1500 let events = manager.reconcile_execution_mass_status(mass_status).await;
1501 assert_eq!(events.len(), 0);
1502 }
1503
1504 #[rstest]
1505 fn test_reconciliation_config_default() {
1506 let config = ExecutionManagerConfig::default();
1507
1508 assert_eq!(config.lookback_mins, Some(60));
1509 assert_eq!(config.inflight_threshold_ms, 5000);
1510 assert_eq!(config.inflight_max_retries, 5);
1511 assert!(!config.filter_unclaimed_external);
1512 assert!(config.generate_missing_orders);
1513 }
1514
1515 #[rstest]
1516 fn test_create_order_fill_deduplicates_by_trade_id() {
1517 let mut manager = create_test_manager();
1518 let instrument = audusd_sim();
1519 let mut order = OrderTestBuilder::new(OrderType::Market)
1520 .instrument_id(instrument.id())
1521 .side(OrderSide::Buy)
1522 .quantity(Quantity::from(100_000))
1523 .build();
1524 let trade_id = TradeId::from("T-001");
1525 let fill = FillReport::new(
1526 AccountId::from("SIM-001"),
1527 instrument.id(),
1528 VenueOrderId::from("V-001"),
1529 trade_id,
1530 OrderSide::Buy,
1531 Quantity::from(100_000),
1532 Price::from("1.00000"),
1533 Money::from("1.00 USD"),
1534 LiquiditySide::Maker,
1535 Some(ClientOrderId::from("O-123456")),
1536 None,
1537 UnixNanos::from(1_000_000_000),
1538 UnixNanos::from(1_000_000_000),
1539 None,
1540 );
1541 let event1 = manager.create_order_fill(&mut order, &fill, &InstrumentAny::from(instrument));
1542 assert!(event1.is_some());
1543
1544 let event2 = manager.create_order_fill(&mut order, &fill, &InstrumentAny::from(instrument));
1546 assert!(event2.is_none());
1547 }
1548
1549 #[rstest]
1550 fn test_handle_external_order_uses_claimed_strategy() {
1551 let mut manager = create_test_manager();
1552 let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1553 let strategy_id = StrategyId::from("STRATEGY-001");
1554 let account_id = AccountId::from("BINANCE-001");
1555 let venue_order_id = VenueOrderId::from("V-EXT-001");
1556 manager.claim_external_orders(instrument_id, strategy_id);
1557 let report = OrderStatusReport::new(
1558 account_id,
1559 instrument_id,
1560 None, venue_order_id,
1562 OrderSide::Buy,
1563 OrderType::Limit,
1564 TimeInForce::Gtc,
1565 OrderStatus::Accepted,
1566 Quantity::from(1),
1567 Quantity::from(0),
1568 UnixNanos::from(1_000_000),
1569 UnixNanos::from(1_000_000),
1570 UnixNanos::from(1_000_000),
1571 None,
1572 )
1573 .with_price(Price::from("50000.00"));
1574 let events = manager.handle_external_order(&report, &account_id);
1575
1576 assert_eq!(events.len(), 1);
1578 assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1579 let client_order_id = ClientOrderId::from(venue_order_id.as_str());
1580 let order = manager.cache.borrow().order(&client_order_id).cloned();
1581 assert!(order.is_some());
1582 assert_eq!(order.unwrap().strategy_id(), strategy_id);
1583 }
1584
1585 #[rstest]
1586 fn test_handle_external_order_uses_external_strategy_when_unclaimed() {
1587 let mut manager = create_test_manager();
1588 let instrument_id = InstrumentId::from("ETHUSDT.BINANCE");
1589 let account_id = AccountId::from("BINANCE-001");
1590 let venue_order_id = VenueOrderId::from("V-EXT-002");
1591 let report = OrderStatusReport::new(
1592 account_id,
1593 instrument_id,
1594 None, venue_order_id,
1596 OrderSide::Sell,
1597 OrderType::Limit,
1598 TimeInForce::Gtc,
1599 OrderStatus::Accepted,
1600 Quantity::from(1),
1601 Quantity::from(0),
1602 UnixNanos::from(1_000_000),
1603 UnixNanos::from(1_000_000),
1604 UnixNanos::from(1_000_000),
1605 None,
1606 )
1607 .with_price(Price::from("3000.00"));
1608 let events = manager.handle_external_order(&report, &account_id);
1609 assert_eq!(events.len(), 1);
1610 assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1611 let client_order_id = ClientOrderId::from(venue_order_id.as_str());
1612 let order = manager.cache.borrow().order(&client_order_id).cloned();
1613 assert!(order.is_some());
1614 let order = order.unwrap();
1615 assert_eq!(order.strategy_id(), StrategyId::from("EXTERNAL"));
1616 assert!(
1617 order
1618 .tags()
1619 .is_some_and(|t| t.iter().any(|s| s.as_str() == "VENUE"))
1620 );
1621 }
1622
1623 #[rstest]
1624 fn test_external_order_canceled_generates_accepted_and_canceled() {
1625 let mut manager = create_test_manager();
1626 let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1627 let account_id = AccountId::from("BINANCE-001");
1628 let report = OrderStatusReport::new(
1629 account_id,
1630 instrument_id,
1631 None, VenueOrderId::from("V-EXT-003"),
1633 OrderSide::Buy,
1634 OrderType::Limit,
1635 TimeInForce::Gtc,
1636 OrderStatus::Canceled,
1637 Quantity::from(1),
1638 Quantity::from(0),
1639 UnixNanos::from(1_000_000),
1640 UnixNanos::from(1_000_000),
1641 UnixNanos::from(1_000_000),
1642 None,
1643 )
1644 .with_price(Price::from("50000.00"));
1645 let events = manager.handle_external_order(&report, &account_id);
1646 assert_eq!(events.len(), 2);
1647 assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1648 assert!(matches!(events[1], OrderEventAny::Canceled(_)));
1649 }
1650
1651 #[rstest]
1652 fn test_external_order_expired_generates_accepted_and_expired() {
1653 let mut manager = create_test_manager();
1654 let instrument_id = InstrumentId::from("BTCUSDT.BINANCE");
1655 let account_id = AccountId::from("BINANCE-001");
1656 let report = OrderStatusReport::new(
1657 account_id,
1658 instrument_id,
1659 None, VenueOrderId::from("V-EXT-004"),
1661 OrderSide::Buy,
1662 OrderType::Limit,
1663 TimeInForce::Gtc,
1664 OrderStatus::Expired,
1665 Quantity::from(1),
1666 Quantity::from(0),
1667 UnixNanos::from(1_000_000),
1668 UnixNanos::from(1_000_000),
1669 UnixNanos::from(1_000_000),
1670 None,
1671 )
1672 .with_price(Price::from("50000.00"));
1673 let events = manager.handle_external_order(&report, &account_id);
1674 assert_eq!(events.len(), 2);
1675 assert!(matches!(events[0], OrderEventAny::Accepted(_)));
1676 assert!(matches!(events[1], OrderEventAny::Expired(_)));
1677 }
1678}