1pub mod config;
24pub mod stubs;
25
26use std::{
27 cell::{RefCell, RefMut},
28 collections::{HashMap, HashSet},
29 fmt::Debug,
30 rc::Rc,
31 time::SystemTime,
32};
33
34use ahash::{AHashMap, AHashSet};
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use nautilus_common::{
38 cache::Cache,
39 clients::ExecutionClient,
40 clock::Clock,
41 generators::position_id::PositionIdGenerator,
42 logging::{CMD, EVT, RECV, SEND},
43 messages::{
44 ExecutionReport,
45 execution::{
46 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47 SubmitOrder, SubmitOrderList, TradingCommand,
48 },
49 },
50 msgbus::{
51 self, get_message_bus,
52 switchboard::{self},
53 },
54};
55use nautilus_core::UUID4;
56use nautilus_model::{
57 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
58 events::{
59 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
60 PositionEvent, PositionOpened,
61 },
62 identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
63 instruments::{Instrument, InstrumentAny},
64 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
65 orders::{Order, OrderAny, OrderError},
66 position::Position,
67 reports::{ExecutionMassStatus, OrderStatusReport},
68 types::{Money, Price, Quantity},
69};
70
71use crate::{client::ExecutionClientAdapter, reconciliation::reconcile_order_report};
72
73pub struct ExecutionEngine {
80 clock: Rc<RefCell<dyn Clock>>,
81 cache: Rc<RefCell<Cache>>,
82 clients: AHashMap<ClientId, ExecutionClientAdapter>,
83 default_client: Option<ExecutionClientAdapter>,
84 routing_map: HashMap<Venue, ClientId>,
85 oms_overrides: HashMap<StrategyId, OmsType>,
86 external_order_claims: HashMap<InstrumentId, StrategyId>,
87 external_clients: HashSet<ClientId>,
88 pos_id_generator: PositionIdGenerator,
89 config: ExecutionEngineConfig,
90}
91
92impl Debug for ExecutionEngine {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.debug_struct(stringify!(ExecutionEngine))
95 .field("client_count", &self.clients.len())
96 .finish()
97 }
98}
99
100impl ExecutionEngine {
101 pub fn new(
103 clock: Rc<RefCell<dyn Clock>>,
104 cache: Rc<RefCell<Cache>>,
105 config: Option<ExecutionEngineConfig>,
106 ) -> Self {
107 let trader_id = get_message_bus().borrow().trader_id;
108 Self {
109 clock: clock.clone(),
110 cache,
111 clients: AHashMap::new(),
112 default_client: None,
113 routing_map: HashMap::new(),
114 oms_overrides: HashMap::new(),
115 external_order_claims: HashMap::new(),
116 external_clients: config
117 .as_ref()
118 .and_then(|c| c.external_clients.clone())
119 .unwrap_or_default()
120 .into_iter()
121 .collect(),
122 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
123 config: config.unwrap_or_default(),
124 }
125 }
126
127 #[must_use]
128 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
130 self.pos_id_generator.count(strategy_id)
131 }
132
133 #[must_use]
134 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
136 &self.cache
137 }
138
139 #[must_use]
140 pub const fn config(&self) -> &ExecutionEngineConfig {
142 &self.config
143 }
144
145 #[must_use]
146 pub fn check_integrity(&self) -> bool {
148 self.cache.borrow_mut().check_integrity()
149 }
150
151 #[must_use]
152 pub fn check_connected(&self) -> bool {
154 let clients_connected = self.clients.values().all(|c| c.is_connected());
155 let default_connected = self
156 .default_client
157 .as_ref()
158 .is_none_or(|c| c.is_connected());
159 clients_connected && default_connected
160 }
161
162 #[must_use]
163 pub fn check_disconnected(&self) -> bool {
165 let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
166 let default_disconnected = self
167 .default_client
168 .as_ref()
169 .is_none_or(|c| !c.is_connected());
170 clients_disconnected && default_disconnected
171 }
172
173 #[must_use]
174 pub fn check_residuals(&self) -> bool {
176 self.cache.borrow().check_residuals()
177 }
178
179 #[must_use]
180 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
182 self.external_order_claims.keys().copied().collect()
183 }
184
185 #[must_use]
186 pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
188 self.external_clients.clone()
189 }
190
191 #[must_use]
192 pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
194 self.external_order_claims.get(instrument_id).copied()
195 }
196
197 pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
203 let client_id = client.client_id();
204 let venue = client.venue();
205
206 if self.clients.contains_key(&client_id) {
207 anyhow::bail!("Client already registered with ID {client_id}");
208 }
209
210 let adapter = ExecutionClientAdapter::new(client);
211
212 self.routing_map.insert(venue, client_id);
213
214 log::debug!("Registered client {client_id}");
215 self.clients.insert(client_id, adapter);
216 Ok(())
217 }
218
219 pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
221 let client_id = client.client_id();
222 let adapter = ExecutionClientAdapter::new(client);
223
224 log::debug!("Registered default client {client_id}");
225 self.default_client = Some(adapter);
226 }
227
228 #[must_use]
229 pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
231 self.clients.get(client_id).map(|a| a.client.as_ref())
232 }
233
234 #[must_use]
235 pub fn get_client_adapter_mut(
237 &mut self,
238 client_id: &ClientId,
239 ) -> Option<&mut ExecutionClientAdapter> {
240 if let Some(default) = &self.default_client
241 && &default.client_id == client_id
242 {
243 return self.default_client.as_mut();
244 }
245 self.clients.get_mut(client_id)
246 }
247
248 pub async fn generate_mass_status(
254 &mut self,
255 client_id: &ClientId,
256 lookback_mins: Option<u64>,
257 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
258 if let Some(client) = self.get_client_adapter_mut(client_id) {
259 client.generate_mass_status(lookback_mins).await
260 } else {
261 anyhow::bail!("Client {client_id} not found")
262 }
263 }
264
265 #[must_use]
266 pub fn client_ids(&self) -> Vec<ClientId> {
268 let mut ids: Vec<_> = self.clients.keys().copied().collect();
269
270 if let Some(default) = &self.default_client {
271 ids.push(default.client_id);
272 }
273 ids
274 }
275
276 #[must_use]
277 pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
279 let mut adapters: Vec<_> = self.clients.values_mut().collect();
280 if let Some(default) = &mut self.default_client {
281 adapters.push(default);
282 }
283 adapters
284 }
285
286 #[must_use]
287 pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
292 let mut client_ids: AHashSet<ClientId> = AHashSet::new();
293 let mut venues: AHashSet<Venue> = AHashSet::new();
294
295 for order in orders {
297 venues.insert(order.instrument_id().venue);
298 if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
299 client_ids.insert(*client_id);
300 }
301 }
302
303 let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
304
305 for client_id in &client_ids {
307 if let Some(adapter) = self.clients.get(client_id)
308 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
309 {
310 clients.push(adapter.client.as_ref());
311 }
312 }
313
314 for venue in &venues {
316 if let Some(client_id) = self.routing_map.get(venue) {
317 if let Some(adapter) = self.clients.get(client_id)
318 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
319 {
320 clients.push(adapter.client.as_ref());
321 }
322 } else if let Some(adapter) = &self.default_client
323 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
324 {
325 clients.push(adapter.client.as_ref());
326 }
327 }
328
329 clients
330 }
331
332 pub fn register_venue_routing(
338 &mut self,
339 client_id: ClientId,
340 venue: Venue,
341 ) -> anyhow::Result<()> {
342 if !self.clients.contains_key(&client_id) {
343 anyhow::bail!("No client registered with ID {client_id}");
344 }
345
346 self.routing_map.insert(venue, client_id);
347 log::info!("Set client {client_id} routing for {venue}");
348 Ok(())
349 }
350
351 pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
355 self.oms_overrides.insert(strategy_id, oms_type);
356 log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
357 }
358
359 pub fn register_external_order_claims(
367 &mut self,
368 strategy_id: StrategyId,
369 instrument_ids: HashSet<InstrumentId>,
370 ) -> anyhow::Result<()> {
371 for instrument_id in &instrument_ids {
373 if let Some(existing) = self.external_order_claims.get(instrument_id) {
374 anyhow::bail!(
375 "External order claim for {instrument_id} already exists for {existing}"
376 );
377 }
378 }
379
380 for instrument_id in &instrument_ids {
382 self.external_order_claims
383 .insert(*instrument_id, strategy_id);
384 }
385
386 if !instrument_ids.is_empty() {
387 log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
388 }
389
390 Ok(())
391 }
392
393 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
397 if self.clients.remove(&client_id).is_some() {
398 self.routing_map
400 .retain(|_, mapped_id| mapped_id != &client_id);
401 log::info!("Deregistered client {client_id}");
402 Ok(())
403 } else {
404 anyhow::bail!("No client registered with ID {client_id}")
405 }
406 }
407
408 pub async fn connect(&mut self) -> anyhow::Result<()> {
414 let futures: Vec<_> = self
415 .get_clients_mut()
416 .into_iter()
417 .map(|client| client.connect())
418 .collect();
419
420 let results = join_all(futures).await;
421 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
422
423 if errors.is_empty() {
424 Ok(())
425 } else {
426 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
427 anyhow::bail!(
428 "Failed to connect execution clients: {}",
429 error_msgs.join("; ")
430 )
431 }
432 }
433
434 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
440 let futures: Vec<_> = self
441 .get_clients_mut()
442 .into_iter()
443 .map(|client| client.disconnect())
444 .collect();
445
446 let results = join_all(futures).await;
447 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
448
449 if errors.is_empty() {
450 Ok(())
451 } else {
452 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
453 anyhow::bail!(
454 "Failed to disconnect execution clients: {}",
455 error_msgs.join("; ")
456 )
457 }
458 }
459
460 pub fn set_manage_own_order_books(&mut self, value: bool) {
462 self.config.manage_own_order_books = value;
463 }
464
465 pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
467 self.config.convert_quote_qty_to_base = value;
468 }
469
470 pub fn start_snapshot_timer(&mut self) {
474 if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
475 log::info!("Starting position snapshots timer at {interval_secs} second intervals");
476 }
477 }
478
479 pub fn stop_snapshot_timer(&mut self) {
481 if self.config.snapshot_positions_interval_secs.is_some() {
482 log::info!("Canceling position snapshots timer");
483 }
484 }
485
486 pub fn snapshot_open_position_states(&self) {
488 let positions: Vec<Position> = self
489 .cache
490 .borrow()
491 .positions_open(None, None, None, None)
492 .into_iter()
493 .cloned()
494 .collect();
495
496 for position in positions {
497 self.create_position_state_snapshot(&position);
498 }
499 }
500
501 #[allow(clippy::await_holding_refcell_ref)]
502 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
508 let ts = SystemTime::now();
509
510 {
511 let mut cache = self.cache.borrow_mut();
512 cache.clear_index();
513 cache.cache_general()?;
514 self.cache.borrow_mut().cache_all().await?;
515 cache.build_index();
516 let _ = cache.check_integrity();
517
518 if self.config.manage_own_order_books {
519 for order in cache.orders(None, None, None, None) {
520 if order.is_closed() || !should_handle_own_book_order(order) {
521 continue;
522 }
523 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
524 own_book.add(order.to_own_book_order());
525 }
526 }
527 }
528
529 self.set_position_id_counts();
530
531 log::info!(
532 "Loaded cache in {}ms",
533 SystemTime::now()
534 .duration_since(ts)
535 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
536 .as_millis()
537 );
538
539 Ok(())
540 }
541
542 pub fn flush_db(&self) {
544 self.cache.borrow_mut().flush_db();
545 }
546
547 pub fn reconcile_execution_report(&mut self, report: &ExecutionReport) {
551 match report {
552 ExecutionReport::Order(order_report) => {
553 self.reconcile_order_status_report(order_report);
554 }
555 ExecutionReport::Fill(fill_report) => {
556 log::debug!("Received fill report: {fill_report:?}");
558 }
559 ExecutionReport::Position(position_report) => {
560 log::debug!("Received position report: {position_report:?}");
562 }
563 ExecutionReport::MassStatus(mass_status) => {
564 log::warn!(
566 "Unexpected mass status in reconcile_report: {:?}",
567 mass_status.account_id
568 );
569 }
570 }
571 }
572
573 pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
579 let cache = self.cache.borrow();
580
581 let order = report
582 .client_order_id
583 .and_then(|id| cache.order(&id).cloned())
584 .or_else(|| {
585 cache
586 .client_order_id(&report.venue_order_id)
587 .and_then(|cid| cache.order(cid).cloned())
588 });
589
590 let Some(order) = order else {
591 log::debug!(
592 "Order not found in cache for reconciliation: client_order_id={:?}, venue_order_id={}",
593 report.client_order_id,
594 report.venue_order_id
595 );
596 return;
597 };
598
599 let instrument = cache.instrument(&report.instrument_id).cloned();
600
601 drop(cache);
602
603 let ts_now = self.clock.borrow().timestamp_ns();
604
605 if let Some(event) = reconcile_order_report(&order, report, instrument.as_ref(), ts_now) {
606 self.handle_event(&event);
607 }
608 }
609
610 pub fn execute(&self, command: &TradingCommand) {
612 self.execute_command(command);
613 }
614
615 pub fn process(&mut self, event: &OrderEventAny) {
617 self.handle_event(event);
618 }
619
620 pub fn start(&mut self) {
622 self.start_snapshot_timer();
623
624 log::info!("Started");
625 }
626
627 pub fn stop(&mut self) {
629 self.stop_snapshot_timer();
630
631 log::info!("Stopped");
632 }
633
634 pub fn reset(&mut self) {
636 self.pos_id_generator.reset();
637
638 log::info!("Reset");
639 }
640
641 pub fn dispose(&mut self) {
643 log::info!("Disposed");
644 }
645
646 fn execute_command(&self, command: &TradingCommand) {
647 if self.config.debug {
648 log::debug!("{RECV}{CMD} {command:?}");
649 }
650
651 if let Some(cid) = command.client_id()
652 && self.external_clients.contains(&cid)
653 {
654 if self.config.debug {
655 log::debug!("Skipping execution command for external client {cid}: {command:?}");
656 }
657 return;
658 }
659
660 let client = if let Some(adapter) = command
661 .client_id()
662 .and_then(|cid| self.clients.get(&cid))
663 .or_else(|| {
664 self.routing_map
665 .get(&command.instrument_id().venue)
666 .and_then(|client_id| self.clients.get(client_id))
667 })
668 .or(self.default_client.as_ref())
669 {
670 adapter.client.as_ref()
671 } else {
672 log::error!(
673 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
674 command.client_id(),
675 command.instrument_id().venue,
676 );
677 return;
678 };
679
680 match command {
681 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
682 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
683 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
684 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
685 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
686 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
687 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
688 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
689 }
690 }
691
692 fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
693 let mut order = cmd.order.clone();
694 let client_order_id = order.client_order_id();
695 let instrument_id = order.instrument_id();
696
697 if !self.cache.borrow().order_exists(&client_order_id) {
699 {
701 let mut cache = self.cache.borrow_mut();
702 if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
703 {
704 log::error!("Error adding order to cache: {e}");
705 return;
706 }
707 }
708
709 if self.config.snapshot_orders {
710 self.create_order_state_snapshot(&order);
711 }
712 }
713
714 let instrument = {
716 let cache = self.cache.borrow();
717 if let Some(instrument) = cache.instrument(&instrument_id) {
718 instrument.clone()
719 } else {
720 log::error!(
721 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
722 );
723 return;
724 }
725 };
726
727 if self.config.convert_quote_qty_to_base
729 && !instrument.is_inverse()
730 && order.is_quote_quantity()
731 {
732 log::warn!(
733 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
734 );
735 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
736
737 if let Some(price) = last_px {
738 let base_qty = instrument.get_base_quantity(order.quantity(), price);
739 self.set_order_base_qty(&mut order, base_qty);
740 } else {
741 self.deny_order(
742 &order,
743 &format!("no-price-to-convert-quote-qty {instrument_id}"),
744 );
745 return;
746 }
747 }
748
749 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
750 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
751 own_book.add(order.to_own_book_order());
752 }
753
754 if let Err(e) = client.submit_order(cmd) {
756 log::error!("Error submitting order to client: {e}");
757 self.deny_order(
758 &cmd.order,
759 &format!("failed-to-submit-order-to-client: {e}"),
760 );
761 }
762 }
763
764 fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
765 let orders = cmd.order_list.orders.clone();
766
767 let mut cache = self.cache.borrow_mut();
768 for order in &orders {
769 if !cache.order_exists(&order.client_order_id()) {
770 if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
771 {
772 log::error!("Error adding order to cache: {e}");
773 return;
774 }
775
776 if self.config.snapshot_orders {
777 self.create_order_state_snapshot(order);
778 }
779 }
780 }
781 drop(cache);
782
783 let instrument = {
784 let cache = self.cache.borrow();
785 if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
786 instrument.clone()
787 } else {
788 log::error!(
789 "Cannot handle submit order list: no instrument found for {}, {cmd}",
790 cmd.instrument_id,
791 );
792 return;
793 }
794 };
795
796 if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
798 let mut conversions: Vec<(ClientOrderId, Quantity)> =
799 Vec::with_capacity(cmd.order_list.orders.len());
800
801 for order in &cmd.order_list.orders {
802 if !order.is_quote_quantity() {
803 continue; }
805
806 let last_px =
807 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
808
809 if let Some(px) = last_px {
810 let base_qty = instrument.get_base_quantity(order.quantity(), px);
811 conversions.push((order.client_order_id(), base_qty));
812 } else {
813 for order in &cmd.order_list.orders {
814 self.deny_order(
815 order,
816 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
817 );
818 }
819 return; }
821 }
822
823 if !conversions.is_empty() {
824 log::warn!(
825 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
826 );
827
828 let mut cache = self.cache.borrow_mut();
829 for (client_order_id, base_qty) in conversions {
830 if let Some(mut_order) = cache.mut_order(&client_order_id) {
831 self.set_order_base_qty(mut_order, base_qty);
832 }
833 }
834 }
835 }
836
837 if self.config.manage_own_order_books {
838 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
839 for order in &cmd.order_list.orders {
840 if should_handle_own_book_order(order) {
841 own_book.add(order.to_own_book_order());
842 }
843 }
844 }
845
846 if let Err(e) = client.submit_order_list(cmd) {
848 log::error!("Error submitting order list to client: {e}");
849 for order in &orders {
850 self.deny_order(
851 order,
852 &format!("failed-to-submit-order-list-to-client: {e}"),
853 );
854 }
855 }
856 }
857
858 fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
859 if let Err(e) = client.modify_order(cmd) {
860 log::error!("Error modifying order: {e}");
861 }
862 }
863
864 fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
865 if let Err(e) = client.cancel_order(cmd) {
866 log::error!("Error canceling order: {e}");
867 }
868 }
869
870 fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
871 if let Err(e) = client.cancel_all_orders(cmd) {
872 log::error!("Error canceling all orders: {e}");
873 }
874 }
875
876 fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
877 if let Err(e) = client.batch_cancel_orders(cmd) {
878 log::error!("Error batch canceling orders: {e}");
879 }
880 }
881
882 fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
883 if let Err(e) = client.query_account(cmd) {
884 log::error!("Error querying account: {e}");
885 }
886 }
887
888 fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
889 if let Err(e) = client.query_order(cmd) {
890 log::error!("Error querying order: {e}");
891 }
892 }
893
894 fn create_order_state_snapshot(&self, order: &OrderAny) {
895 if self.config.debug {
896 log::debug!("Creating order state snapshot for {order}");
897 }
898
899 if self.cache.borrow().has_backing()
900 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
901 {
902 log::error!("Failed to snapshot order state: {e}");
903 return;
904 }
905
906 if get_message_bus().borrow().has_backing {
907 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
908 msgbus::publish(topic, order);
909 }
910 }
911
912 fn create_position_state_snapshot(&self, position: &Position) {
913 if self.config.debug {
914 log::debug!("Creating position state snapshot for {position}");
915 }
916
917 let topic = switchboard::get_positions_snapshots_topic(position.id);
923 msgbus::publish(topic, position);
924 }
925
926 fn handle_event(&mut self, event: &OrderEventAny) {
927 if self.config.debug {
928 log::debug!("{RECV}{EVT} {event:?}");
929 }
930
931 let client_order_id = event.client_order_id();
932 let cache = self.cache.borrow();
933 let mut order = if let Some(order) = cache.order(&client_order_id) {
934 order.clone()
935 } else {
936 log::warn!(
937 "Order with {} not found in the cache to apply {}",
938 event.client_order_id(),
939 event
940 );
941
942 let venue_order_id = if let Some(id) = event.venue_order_id() {
944 id
945 } else {
946 log::error!(
947 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
948 event.client_order_id()
949 );
950 return;
951 };
952
953 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
955 id
956 } else {
957 log::error!(
958 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
959 event.client_order_id(),
960 );
961 return;
962 };
963
964 if let Some(order) = cache.order(client_order_id) {
966 log::info!("Order with {client_order_id} was found in the cache");
967 order.clone()
968 } else {
969 log::error!(
970 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
971 );
972 return;
973 }
974 };
975
976 drop(cache);
977
978 match event {
979 OrderEventAny::Filled(fill) => {
980 let oms_type = self.determine_oms_type(fill);
981 let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
982
983 let mut fill = *fill;
984 if fill.position_id.is_none() {
985 fill.position_id = Some(position_id);
986 }
987
988 if self.apply_fill_to_order(&mut order, fill).is_ok() {
989 self.handle_order_fill(&order, fill, oms_type);
990 }
991 }
992 _ => {
993 let _ = self.apply_event_to_order(&mut order, event.clone());
994 }
995 }
996 }
997
998 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
999 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
1001 return *oms_type;
1002 }
1003
1004 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
1006 && let Some(client) = self.clients.get(client_id)
1007 {
1008 return client.oms_type();
1009 }
1010
1011 if let Some(client) = &self.default_client {
1012 return client.oms_type();
1013 }
1014
1015 OmsType::Netting }
1017
1018 fn determine_position_id(
1019 &mut self,
1020 fill: OrderFilled,
1021 oms_type: OmsType,
1022 order: Option<&OrderAny>,
1023 ) -> PositionId {
1024 match oms_type {
1025 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
1026 OmsType::Netting => self.determine_netting_position_id(fill),
1027 _ => self.determine_netting_position_id(fill), }
1029 }
1030
1031 fn determine_hedging_position_id(
1032 &mut self,
1033 fill: OrderFilled,
1034 order: Option<&OrderAny>,
1035 ) -> PositionId {
1036 if let Some(position_id) = fill.position_id {
1038 if self.config.debug {
1039 log::debug!("Already had a position ID of: {position_id}");
1040 }
1041 return position_id;
1042 }
1043
1044 let cache = self.cache.borrow();
1045
1046 let order = if let Some(o) = order {
1047 o
1048 } else {
1049 match cache.order(&fill.client_order_id()) {
1050 Some(o) => o,
1051 None => {
1052 panic!(
1053 "Order for {} not found to determine position ID",
1054 fill.client_order_id()
1055 );
1056 }
1057 }
1058 };
1059
1060 if let Some(spawn_id) = order.exec_spawn_id() {
1062 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
1063 for spawned_order in spawn_orders {
1064 if let Some(pos_id) = spawned_order.position_id() {
1065 if self.config.debug {
1066 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
1067 }
1068 return pos_id;
1069 }
1070 }
1071 }
1072
1073 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
1075 if self.config.debug {
1076 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
1077 }
1078 position_id
1079 }
1080
1081 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
1082 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
1083 }
1084
1085 fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
1086 if order.is_duplicate_fill(&fill) {
1087 log::warn!(
1088 "Duplicate fill: {} trade_id={} already applied, skipping",
1089 order.client_order_id(),
1090 fill.trade_id
1091 );
1092 anyhow::bail!("Duplicate fill");
1093 }
1094
1095 self.check_overfill(order, &fill)?;
1096 let event = OrderEventAny::Filled(fill);
1097 self.apply_order_event(order, event)
1098 }
1099
1100 fn apply_event_to_order(
1101 &self,
1102 order: &mut OrderAny,
1103 event: OrderEventAny,
1104 ) -> anyhow::Result<()> {
1105 self.apply_order_event(order, event)
1106 }
1107
1108 fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
1109 if let Err(e) = order.apply(event.clone()) {
1110 match e {
1111 OrderError::InvalidStateTransition => {
1112 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
1115 }
1116 OrderError::DuplicateFill(trade_id) => {
1117 log::warn!(
1119 "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
1120 );
1121 anyhow::bail!("{e}");
1122 }
1123 _ => {
1124 log::error!("Error applying event: {e}, did not apply {event}");
1126 if should_handle_own_book_order(order) {
1127 self.cache.borrow_mut().update_own_order_book(order);
1128 }
1129 anyhow::bail!("{e}");
1130 }
1131 }
1132 }
1133
1134 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1135 log::error!("Error updating order in cache: {e}");
1136 }
1137
1138 if self.config.debug {
1139 log::debug!("{SEND}{EVT} {event}");
1140 }
1141
1142 let topic = switchboard::get_event_orders_topic(event.strategy_id());
1143 msgbus::publish(topic, &event);
1144
1145 if self.config.snapshot_orders {
1146 self.create_order_state_snapshot(order);
1147 }
1148
1149 Ok(())
1150 }
1151
1152 fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1153 let potential_overfill = order.calculate_overfill(fill.last_qty);
1154
1155 if potential_overfill.is_positive() {
1156 if self.config.allow_overfills {
1157 log::warn!(
1158 "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1159 order.client_order_id(),
1160 potential_overfill,
1161 order.filled_qty(),
1162 fill.last_qty,
1163 order.quantity()
1164 );
1165 } else {
1166 let msg = format!(
1167 "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1168 Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1169 order.client_order_id(),
1170 potential_overfill,
1171 order.filled_qty(),
1172 fill.last_qty,
1173 order.quantity()
1174 );
1175 anyhow::bail!("{msg}");
1176 }
1177 }
1178
1179 Ok(())
1180 }
1181
1182 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1183 let instrument =
1184 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1185 instrument.clone()
1186 } else {
1187 log::error!(
1188 "Cannot handle order fill: no instrument found for {}, {fill}",
1189 fill.instrument_id,
1190 );
1191 return;
1192 };
1193
1194 if self.cache.borrow().account(&fill.account_id).is_none() {
1195 log::error!(
1196 "Cannot handle order fill: no account found for {}, {fill}",
1197 fill.instrument_id.venue,
1198 );
1199 return;
1200 }
1201
1202 let position = if instrument.is_spread() {
1205 None
1206 } else {
1207 self.handle_position_update(instrument.clone(), fill, oms_type);
1208 let position_id = fill.position_id.unwrap();
1209 self.cache.borrow().position(&position_id).cloned()
1210 };
1211
1212 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1215 if !instrument.is_spread()
1217 && let Some(ref pos) = position
1218 && pos.is_open()
1219 {
1220 let position_id = pos.id;
1221 for client_order_id in order.linked_order_ids().unwrap_or_default() {
1222 let mut cache = self.cache.borrow_mut();
1223 let contingent_order = cache.mut_order(client_order_id);
1224 if let Some(contingent_order) = contingent_order
1225 && contingent_order.position_id().is_none()
1226 {
1227 contingent_order.set_position_id(Some(position_id));
1228
1229 if let Err(e) = self.cache.borrow_mut().add_position_id(
1230 &position_id,
1231 &contingent_order.instrument_id().venue,
1232 &contingent_order.client_order_id(),
1233 &contingent_order.strategy_id(),
1234 ) {
1235 log::error!("Failed to add position ID: {e}");
1236 }
1237 }
1238 }
1239 }
1240 }
1243 }
1244
1245 fn handle_position_update(
1249 &mut self,
1250 instrument: InstrumentAny,
1251 fill: OrderFilled,
1252 oms_type: OmsType,
1253 ) {
1254 let position_id = if let Some(position_id) = fill.position_id {
1255 position_id
1256 } else {
1257 log::error!("Cannot handle position update: no position ID found for fill {fill}");
1258 return;
1259 };
1260
1261 let position_opt = self.cache.borrow().position(&position_id).cloned();
1262
1263 match position_opt {
1264 None => {
1265 if self.open_position(instrument, None, fill, oms_type).is_ok() {
1267 }
1269 }
1270 Some(pos) if pos.is_closed() => {
1271 if self
1273 .open_position(instrument, Some(&pos), fill, oms_type)
1274 .is_ok()
1275 {
1276 }
1278 }
1279 Some(mut pos) => {
1280 if self.will_flip_position(&pos, fill) {
1281 self.flip_position(instrument, &mut pos, fill, oms_type);
1283 } else {
1284 self.update_position(&mut pos, fill);
1286 }
1287 }
1288 }
1289 }
1290
1291 fn open_position(
1292 &self,
1293 instrument: InstrumentAny,
1294 position: Option<&Position>,
1295 fill: OrderFilled,
1296 oms_type: OmsType,
1297 ) -> anyhow::Result<()> {
1298 if let Some(position) = position {
1299 if Self::is_duplicate_closed_fill(position, &fill) {
1300 log::warn!(
1301 "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1302 fill.trade_id,
1303 position.id,
1304 fill.order_side,
1305 fill.last_qty,
1306 fill.last_px
1307 );
1308 return Ok(());
1309 }
1310 self.reopen_position(position, oms_type)?;
1311 }
1312
1313 let position = Position::new(&instrument, fill);
1314 self.cache
1315 .borrow_mut()
1316 .add_position(position.clone(), oms_type)?; if self.config.snapshot_positions {
1319 self.create_position_state_snapshot(&position);
1320 }
1321
1322 let ts_init = self.clock.borrow().timestamp_ns();
1323 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1324 let topic = switchboard::get_event_positions_topic(event.strategy_id);
1325 msgbus::publish(topic, &PositionEvent::PositionOpened(event));
1326
1327 Ok(())
1328 }
1329
1330 fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1331 position.events.iter().any(|event| {
1332 event.trade_id == fill.trade_id
1333 && event.order_side == fill.order_side
1334 && event.last_px == fill.last_px
1335 && event.last_qty == fill.last_qty
1336 })
1337 }
1338
1339 fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1340 if oms_type == OmsType::Netting {
1341 if position.is_open() {
1342 anyhow::bail!(
1343 "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1344 position.id
1345 );
1346 }
1347 self.cache.borrow_mut().snapshot_position(position)?;
1349 } else {
1350 log::warn!(
1352 "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1353 position.id
1354 );
1355 }
1356 Ok(())
1357 }
1358
1359 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1360 position.apply(&fill);
1362
1363 let is_closed = position.is_closed();
1365
1366 if let Err(e) = self.cache.borrow_mut().update_position(position) {
1368 log::error!("Failed to update position: {e:?}");
1369 return;
1370 }
1371
1372 let cache = self.cache.borrow();
1374
1375 drop(cache);
1376
1377 if self.config.snapshot_positions {
1379 self.create_position_state_snapshot(position);
1380 }
1381
1382 let topic = switchboard::get_event_positions_topic(position.strategy_id);
1384 let ts_init = self.clock.borrow().timestamp_ns();
1385
1386 if is_closed {
1387 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1388 msgbus::publish(topic, &PositionEvent::PositionClosed(event));
1389 } else {
1390 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1391 msgbus::publish(topic, &PositionEvent::PositionChanged(event));
1392 }
1393 }
1394
1395 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1396 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1397 }
1398
1399 fn flip_position(
1400 &mut self,
1401 instrument: InstrumentAny,
1402 position: &mut Position,
1403 fill: OrderFilled,
1404 oms_type: OmsType,
1405 ) {
1406 let difference = match position.side {
1407 PositionSide::Long => Quantity::from_raw(
1408 fill.last_qty.raw - position.quantity.raw,
1409 position.size_precision,
1410 ),
1411 PositionSide::Short => Quantity::from_raw(
1412 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
1414 ),
1415 _ => fill.last_qty,
1416 };
1417
1418 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1420 let (commission1, commission2) = if let Some(commission) = fill.commission {
1421 let commission_currency = commission.currency;
1422 let commission1 = Money::new(commission * fill_percent, commission_currency);
1423 let commission2 = commission - commission1;
1424 (Some(commission1), Some(commission2))
1425 } else {
1426 log::error!("Commission is not available.");
1427 (None, None)
1428 };
1429
1430 let mut fill_split1: Option<OrderFilled> = None;
1431 if position.is_open() {
1432 fill_split1 = Some(OrderFilled::new(
1433 fill.trader_id,
1434 fill.strategy_id,
1435 fill.instrument_id,
1436 fill.client_order_id,
1437 fill.venue_order_id,
1438 fill.account_id,
1439 fill.trade_id,
1440 fill.order_side,
1441 fill.order_type,
1442 position.quantity,
1443 fill.last_px,
1444 fill.currency,
1445 fill.liquidity_side,
1446 UUID4::new(),
1447 fill.ts_event,
1448 fill.ts_init,
1449 fill.reconciliation,
1450 fill.position_id,
1451 commission1,
1452 ));
1453
1454 self.update_position(position, fill_split1.unwrap());
1455
1456 if oms_type == OmsType::Netting
1458 && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1459 {
1460 log::error!("Failed to snapshot position during flip: {e:?}");
1461 }
1462 }
1463
1464 if difference.raw == 0 {
1466 log::warn!(
1467 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1468 );
1469 return;
1470 }
1471
1472 let position_id_flip = if oms_type == OmsType::Hedging
1473 && let Some(position_id) = fill.position_id
1474 && position_id.is_virtual()
1475 {
1476 Some(self.pos_id_generator.generate(fill.strategy_id, true))
1478 } else {
1479 fill.position_id
1481 };
1482
1483 let fill_split2 = OrderFilled::new(
1484 fill.trader_id,
1485 fill.strategy_id,
1486 fill.instrument_id,
1487 fill.client_order_id,
1488 fill.venue_order_id,
1489 fill.account_id,
1490 fill.trade_id,
1491 fill.order_side,
1492 fill.order_type,
1493 difference,
1494 fill.last_px,
1495 fill.currency,
1496 fill.liquidity_side,
1497 UUID4::new(),
1498 fill.ts_event,
1499 fill.ts_init,
1500 fill.reconciliation,
1501 position_id_flip,
1502 commission2,
1503 );
1504
1505 if oms_type == OmsType::Hedging
1506 && let Some(position_id) = fill.position_id
1507 && position_id.is_virtual()
1508 {
1509 log::warn!("Closing position {fill_split1:?}");
1510 log::warn!("Flipping position {fill_split2:?}");
1511 }
1512
1513 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1515 log::error!("Failed to open flipped position: {e:?}");
1516 }
1517 }
1518
1519 pub fn set_position_id_counts(&mut self) {
1521 let cache = self.cache.borrow();
1522 let positions = cache.positions(None, None, None, None);
1523
1524 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1526
1527 for position in positions {
1528 *counts.entry(position.strategy_id).or_insert(0) += 1;
1529 }
1530
1531 self.pos_id_generator.reset();
1532
1533 for (strategy_id, count) in counts {
1534 self.pos_id_generator.set_count(count, strategy_id);
1535 log::info!("Set PositionId count for {strategy_id} to {count}");
1536 }
1537 }
1538
1539 fn last_px_for_conversion(
1540 &self,
1541 instrument_id: &InstrumentId,
1542 side: OrderSide,
1543 ) -> Option<Price> {
1544 let cache = self.cache.borrow();
1545
1546 if let Some(trade) = cache.trade(instrument_id) {
1548 return Some(trade.price);
1549 }
1550
1551 if let Some(quote) = cache.quote(instrument_id) {
1553 match side {
1554 OrderSide::Buy => Some(quote.ask_price),
1555 OrderSide::Sell => Some(quote.bid_price),
1556 OrderSide::NoOrderSide => None,
1557 }
1558 } else {
1559 None
1560 }
1561 }
1562
1563 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1564 log::info!(
1565 "Setting {} order quote quantity {} to base quantity {}",
1566 order.instrument_id(),
1567 order.quantity(),
1568 base_qty
1569 );
1570
1571 let original_qty = order.quantity();
1572 order.set_quantity(base_qty);
1573 order.set_leaves_qty(base_qty);
1574 order.set_is_quote_quantity(false);
1575
1576 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1577 return;
1578 }
1579
1580 if let Some(linked_order_ids) = order.linked_order_ids() {
1581 for client_order_id in linked_order_ids {
1582 match self.cache.borrow_mut().mut_order(client_order_id) {
1583 Some(contingent_order) => {
1584 if !contingent_order.is_quote_quantity() {
1585 continue; }
1587
1588 if contingent_order.quantity() != original_qty {
1589 log::warn!(
1590 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1591 contingent_order.quantity(),
1592 original_qty,
1593 base_qty
1594 );
1595 }
1596
1597 log::info!(
1598 "Setting {} order quote quantity {} to base quantity {}",
1599 contingent_order.instrument_id(),
1600 contingent_order.quantity(),
1601 base_qty
1602 );
1603
1604 contingent_order.set_quantity(base_qty);
1605 contingent_order.set_leaves_qty(base_qty);
1606 contingent_order.set_is_quote_quantity(false);
1607 }
1608 None => {
1609 log::error!("Contingency order {client_order_id} not found");
1610 }
1611 }
1612 }
1613 } else {
1614 log::warn!(
1615 "No linked order IDs found for order {}",
1616 order.client_order_id()
1617 );
1618 }
1619 }
1620
1621 fn deny_order(&self, order: &OrderAny, reason: &str) {
1622 log::error!(
1623 "Order denied: {reason}, order ID: {}",
1624 order.client_order_id()
1625 );
1626
1627 let denied = OrderDenied::new(
1628 order.trader_id(),
1629 order.strategy_id(),
1630 order.instrument_id(),
1631 order.client_order_id(),
1632 reason.into(),
1633 UUID4::new(),
1634 self.clock.borrow().timestamp_ns(),
1635 self.clock.borrow().timestamp_ns(),
1636 );
1637
1638 let mut order = order.clone();
1639
1640 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1641 log::error!("Failed to apply denied event to order: {e}");
1642 return;
1643 }
1644
1645 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1646 log::error!("Failed to update order in cache: {e}");
1647 return;
1648 }
1649
1650 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1651 msgbus::publish(topic, &OrderEventAny::Denied(denied));
1652
1653 if self.config.snapshot_orders {
1654 self.create_order_state_snapshot(&order);
1655 }
1656 }
1657
1658 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1659 let mut cache = self.cache.borrow_mut();
1660 if cache.own_order_book_mut(instrument_id).is_none() {
1661 let own_book = OwnOrderBook::new(*instrument_id);
1662 cache.add_own_order_book(own_book).unwrap();
1663 }
1664
1665 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1666 }
1667}