1pub mod config;
24pub mod stubs;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30 cell::{RefCell, RefMut},
31 collections::{HashMap, HashSet},
32 fmt::Debug,
33 rc::Rc,
34 time::SystemTime,
35};
36
37use ahash::{AHashMap, AHashSet};
38use config::ExecutionEngineConfig;
39use futures::future::join_all;
40use nautilus_common::{
41 cache::Cache,
42 clock::Clock,
43 generators::position_id::PositionIdGenerator,
44 logging::{CMD, EVT, RECV, SEND},
45 messages::execution::{
46 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47 SubmitOrder, SubmitOrderList, TradingCommand,
48 },
49 msgbus::{
50 self, get_message_bus,
51 switchboard::{self},
52 },
53};
54use nautilus_core::UUID4;
55use nautilus_model::{
56 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
57 events::{
58 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
59 PositionOpened,
60 },
61 identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
62 instruments::{Instrument, InstrumentAny},
63 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
64 orders::{Order, OrderAny, OrderError},
65 position::Position,
66 types::{Money, Price, Quantity},
67};
68
69use crate::client::{ExecutionClient, ExecutionClientAdapter};
70
71pub struct ExecutionEngine {
78 clock: Rc<RefCell<dyn Clock>>,
79 cache: Rc<RefCell<Cache>>,
80 clients: AHashMap<ClientId, ExecutionClientAdapter>,
81 default_client: Option<ExecutionClientAdapter>,
82 routing_map: HashMap<Venue, ClientId>,
83 oms_overrides: HashMap<StrategyId, OmsType>,
84 external_order_claims: HashMap<InstrumentId, StrategyId>,
85 external_clients: HashSet<ClientId>,
86 pos_id_generator: PositionIdGenerator,
87 config: ExecutionEngineConfig,
88}
89
90impl Debug for ExecutionEngine {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct(stringify!(ExecutionEngine))
93 .field("client_count", &self.clients.len())
94 .finish()
95 }
96}
97
98impl ExecutionEngine {
99 pub fn new(
101 clock: Rc<RefCell<dyn Clock>>,
102 cache: Rc<RefCell<Cache>>,
103 config: Option<ExecutionEngineConfig>,
104 ) -> Self {
105 let trader_id = get_message_bus().borrow().trader_id;
106 Self {
107 clock: clock.clone(),
108 cache,
109 clients: AHashMap::new(),
110 default_client: None,
111 routing_map: HashMap::new(),
112 oms_overrides: HashMap::new(),
113 external_order_claims: HashMap::new(),
114 external_clients: config
115 .as_ref()
116 .and_then(|c| c.external_clients.clone())
117 .unwrap_or_default()
118 .into_iter()
119 .collect(),
120 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
121 config: config.unwrap_or_default(),
122 }
123 }
124
125 #[must_use]
126 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
128 self.pos_id_generator.count(strategy_id)
129 }
130
131 #[must_use]
132 pub fn check_integrity(&self) -> bool {
134 self.cache.borrow_mut().check_integrity()
135 }
136
137 #[must_use]
138 pub fn check_connected(&self) -> bool {
140 let clients_connected = self.clients.values().all(|c| c.is_connected());
141 let default_connected = self
142 .default_client
143 .as_ref()
144 .is_none_or(|c| c.is_connected());
145 clients_connected && default_connected
146 }
147
148 #[must_use]
149 pub fn check_disconnected(&self) -> bool {
151 let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
152 let default_disconnected = self
153 .default_client
154 .as_ref()
155 .is_none_or(|c| !c.is_connected());
156 clients_disconnected && default_disconnected
157 }
158
159 #[must_use]
160 pub fn check_residuals(&self) -> bool {
162 self.cache.borrow().check_residuals()
163 }
164
165 #[must_use]
166 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
168 self.external_order_claims.keys().copied().collect()
169 }
170
171 #[must_use]
172 pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
174 self.external_clients.clone()
175 }
176
177 #[must_use]
178 pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
180 self.external_order_claims.get(instrument_id).copied()
181 }
182
183 pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
191 let client_id = client.client_id();
192 let venue = client.venue();
193
194 if self.clients.contains_key(&client_id) {
195 anyhow::bail!("Client already registered with ID {client_id}");
196 }
197
198 let adapter = ExecutionClientAdapter::new(client);
199
200 self.routing_map.insert(venue, client_id);
201
202 log::info!("Registered client {client_id}");
203 self.clients.insert(client_id, adapter);
204 Ok(())
205 }
206
207 pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
209 let client_id = client.client_id();
210 let adapter = ExecutionClientAdapter::new(client);
211
212 log::info!("Registered default client {client_id}");
213 self.default_client = Some(adapter);
214 }
215
216 #[must_use]
217 pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
219 self.clients.get(client_id).map(|a| a.client.as_ref())
220 }
221
222 #[must_use]
223 pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
225 let mut adapters: Vec<_> = self.clients.values_mut().collect();
226 if let Some(default) = &mut self.default_client {
227 adapters.push(default);
228 }
229 adapters
230 }
231
232 #[must_use]
233 pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
238 let mut client_ids: AHashSet<ClientId> = AHashSet::new();
239 let mut venues: AHashSet<Venue> = AHashSet::new();
240
241 for order in orders {
243 venues.insert(order.instrument_id().venue);
244 if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
245 client_ids.insert(*client_id);
246 }
247 }
248
249 let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
250
251 for client_id in &client_ids {
253 if let Some(adapter) = self.clients.get(client_id)
254 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
255 {
256 clients.push(adapter.client.as_ref());
257 }
258 }
259
260 for venue in &venues {
262 if let Some(client_id) = self.routing_map.get(venue) {
263 if let Some(adapter) = self.clients.get(client_id)
264 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
265 {
266 clients.push(adapter.client.as_ref());
267 }
268 } else if let Some(adapter) = &self.default_client
269 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
270 {
271 clients.push(adapter.client.as_ref());
272 }
273 }
274
275 clients
276 }
277
278 pub fn register_venue_routing(
284 &mut self,
285 client_id: ClientId,
286 venue: Venue,
287 ) -> anyhow::Result<()> {
288 if !self.clients.contains_key(&client_id) {
289 anyhow::bail!("No client registered with ID {client_id}");
290 }
291
292 self.routing_map.insert(venue, client_id);
293 log::info!("Set client {client_id} routing for {venue}");
294 Ok(())
295 }
296
297 pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
301 self.oms_overrides.insert(strategy_id, oms_type);
302 log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
303 }
304
305 pub fn register_external_order_claims(
313 &mut self,
314 strategy_id: StrategyId,
315 instrument_ids: HashSet<InstrumentId>,
316 ) -> anyhow::Result<()> {
317 for instrument_id in &instrument_ids {
319 if let Some(existing) = self.external_order_claims.get(instrument_id) {
320 anyhow::bail!(
321 "External order claim for {instrument_id} already exists for {existing}"
322 );
323 }
324 }
325
326 for instrument_id in &instrument_ids {
328 self.external_order_claims
329 .insert(*instrument_id, strategy_id);
330 }
331
332 if !instrument_ids.is_empty() {
333 log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
334 }
335
336 Ok(())
337 }
338
339 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
343 if self.clients.remove(&client_id).is_some() {
344 self.routing_map
346 .retain(|_, mapped_id| mapped_id != &client_id);
347 log::info!("Deregistered client {client_id}");
348 Ok(())
349 } else {
350 anyhow::bail!("No client registered with ID {client_id}")
351 }
352 }
353
354 pub async fn connect(&mut self) -> anyhow::Result<()> {
360 let futures: Vec<_> = self
361 .get_clients_mut()
362 .into_iter()
363 .map(|client| client.connect())
364 .collect();
365
366 let results = join_all(futures).await;
367 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
368
369 if errors.is_empty() {
370 Ok(())
371 } else {
372 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
373 anyhow::bail!(
374 "Failed to connect execution clients: {}",
375 error_msgs.join("; ")
376 )
377 }
378 }
379
380 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
386 let futures: Vec<_> = self
387 .get_clients_mut()
388 .into_iter()
389 .map(|client| client.disconnect())
390 .collect();
391
392 let results = join_all(futures).await;
393 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
394
395 if errors.is_empty() {
396 Ok(())
397 } else {
398 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
399 anyhow::bail!(
400 "Failed to disconnect execution clients: {}",
401 error_msgs.join("; ")
402 )
403 }
404 }
405
406 pub fn set_manage_own_order_books(&mut self, value: bool) {
408 self.config.manage_own_order_books = value;
409 }
410
411 pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
413 self.config.convert_quote_qty_to_base = value;
414 }
415
416 pub fn start_snapshot_timer(&mut self) {
420 if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
421 log::info!("Starting position snapshots timer at {interval_secs} second intervals");
422 }
423 }
424
425 pub fn stop_snapshot_timer(&mut self) {
427 if self.config.snapshot_positions_interval_secs.is_some() {
428 log::info!("Canceling position snapshots timer");
429 }
430 }
431
432 pub fn snapshot_open_position_states(&self) {
434 let positions: Vec<Position> = self
435 .cache
436 .borrow()
437 .positions_open(None, None, None, None)
438 .into_iter()
439 .cloned()
440 .collect();
441
442 for position in positions {
443 self.create_position_state_snapshot(&position);
444 }
445 }
446
447 #[allow(clippy::await_holding_refcell_ref)]
450 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
456 let ts = SystemTime::now();
457
458 {
459 let mut cache = self.cache.borrow_mut();
460 cache.clear_index();
461 cache.cache_general()?;
462 self.cache.borrow_mut().cache_all().await?;
463 cache.build_index();
464 let _ = cache.check_integrity();
465
466 if self.config.manage_own_order_books {
467 for order in cache.orders(None, None, None, None) {
468 if order.is_closed() || !should_handle_own_book_order(order) {
469 continue;
470 }
471 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
472 own_book.add(order.to_own_book_order());
473 }
474 }
475 }
476
477 self.set_position_id_counts();
478
479 log::info!(
480 "Loaded cache in {}ms",
481 SystemTime::now()
482 .duration_since(ts)
483 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
484 .as_millis()
485 );
486
487 Ok(())
488 }
489
490 pub fn flush_db(&self) {
492 self.cache.borrow_mut().flush_db();
493 }
494
495 pub fn process(&mut self, event: &OrderEventAny) {
497 self.handle_event(event);
498 }
499
500 pub fn execute(&self, command: &TradingCommand) {
502 self.execute_command(command);
503 }
504
505 fn execute_command(&self, command: &TradingCommand) {
508 if self.config.debug {
509 log::debug!("{RECV}{CMD} {command:?}");
510 }
511
512 if self.external_clients.contains(&command.client_id()) {
513 if self.config.debug {
514 let cid = command.client_id();
515 log::debug!("Skipping execution command for external client {cid}: {command:?}");
516 }
517 return;
518 }
519
520 let client = if let Some(adapter) = self
521 .clients
522 .get(&command.client_id())
523 .or_else(|| {
524 self.routing_map
525 .get(&command.instrument_id().venue)
526 .and_then(|client_id| self.clients.get(client_id))
527 })
528 .or(self.default_client.as_ref())
529 {
530 adapter.client.as_ref()
531 } else {
532 log::error!(
533 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
534 command.client_id(),
535 command.instrument_id().venue,
536 );
537 return;
538 };
539
540 match command {
541 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
542 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
543 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
544 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
545 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
546 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
547 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
548 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
549 }
550 }
551
552 fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
553 let mut order = cmd.order.clone();
554 let client_order_id = order.client_order_id();
555 let instrument_id = order.instrument_id();
556
557 if !self.cache.borrow().order_exists(&client_order_id) {
559 {
561 let mut cache = self.cache.borrow_mut();
562 if let Err(e) =
563 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
564 {
565 log::error!("Error adding order to cache: {e}");
566 return;
567 }
568 }
569
570 if self.config.snapshot_orders {
571 self.create_order_state_snapshot(&order);
572 }
573 }
574
575 let instrument = {
577 let cache = self.cache.borrow();
578 if let Some(instrument) = cache.instrument(&instrument_id) {
579 instrument.clone()
580 } else {
581 log::error!(
582 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
583 );
584 return;
585 }
586 };
587
588 if self.config.convert_quote_qty_to_base
590 && !instrument.is_inverse()
591 && order.is_quote_quantity()
592 {
593 log::warn!(
594 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
595 );
596 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
597
598 if let Some(price) = last_px {
599 let base_qty = instrument.get_base_quantity(order.quantity(), price);
600 self.set_order_base_qty(&mut order, base_qty);
601 } else {
602 self.deny_order(
603 &order,
604 &format!("no-price-to-convert-quote-qty {instrument_id}"),
605 );
606 return;
607 }
608 }
609
610 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
611 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
612 own_book.add(order.to_own_book_order());
613 }
614
615 if let Err(e) = client.submit_order(cmd) {
617 log::error!("Error submitting order to client: {e}");
618 self.deny_order(
619 &cmd.order,
620 &format!("failed-to-submit-order-to-client: {e}"),
621 );
622 }
623 }
624
625 fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
626 let orders = cmd.order_list.orders.clone();
627
628 let mut cache = self.cache.borrow_mut();
629 for order in &orders {
630 if !cache.order_exists(&order.client_order_id()) {
631 if let Err(e) =
632 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
633 {
634 log::error!("Error adding order to cache: {e}");
635 return;
636 }
637
638 if self.config.snapshot_orders {
639 self.create_order_state_snapshot(order);
640 }
641 }
642 }
643 drop(cache);
644
645 let instrument = {
646 let cache = self.cache.borrow();
647 if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
648 instrument.clone()
649 } else {
650 log::error!(
651 "Cannot handle submit order list: no instrument found for {}, {cmd}",
652 cmd.instrument_id,
653 );
654 return;
655 }
656 };
657
658 if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
660 let mut conversions: Vec<(ClientOrderId, Quantity)> =
661 Vec::with_capacity(cmd.order_list.orders.len());
662
663 for order in &cmd.order_list.orders {
664 if !order.is_quote_quantity() {
665 continue; }
667
668 let last_px =
669 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
670
671 if let Some(px) = last_px {
672 let base_qty = instrument.get_base_quantity(order.quantity(), px);
673 conversions.push((order.client_order_id(), base_qty));
674 } else {
675 for order in &cmd.order_list.orders {
676 self.deny_order(
677 order,
678 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
679 );
680 }
681 return; }
683 }
684
685 if !conversions.is_empty() {
686 log::warn!(
687 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
688 );
689
690 let mut cache = self.cache.borrow_mut();
691 for (client_order_id, base_qty) in conversions {
692 if let Some(mut_order) = cache.mut_order(&client_order_id) {
693 self.set_order_base_qty(mut_order, base_qty);
694 }
695 }
696 }
697 }
698
699 if self.config.manage_own_order_books {
700 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
701 for order in &cmd.order_list.orders {
702 if should_handle_own_book_order(order) {
703 own_book.add(order.to_own_book_order());
704 }
705 }
706 }
707
708 if let Err(e) = client.submit_order_list(cmd) {
710 log::error!("Error submitting order list to client: {e}");
711 for order in &orders {
712 self.deny_order(
713 order,
714 &format!("failed-to-submit-order-list-to-client: {e}"),
715 );
716 }
717 }
718 }
719
720 fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
721 if let Err(e) = client.modify_order(cmd) {
722 log::error!("Error modifying order: {e}");
723 }
724 }
725
726 fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
727 if let Err(e) = client.cancel_order(cmd) {
728 log::error!("Error canceling order: {e}");
729 }
730 }
731
732 fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
733 if let Err(e) = client.cancel_all_orders(cmd) {
734 log::error!("Error canceling all orders: {e}");
735 }
736 }
737
738 fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
739 if let Err(e) = client.batch_cancel_orders(cmd) {
740 log::error!("Error batch canceling orders: {e}");
741 }
742 }
743
744 fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
745 if let Err(e) = client.query_account(cmd) {
746 log::error!("Error querying account: {e}");
747 }
748 }
749
750 fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
751 if let Err(e) = client.query_order(cmd) {
752 log::error!("Error querying order: {e}");
753 }
754 }
755
756 fn create_order_state_snapshot(&self, order: &OrderAny) {
757 if self.config.debug {
758 log::debug!("Creating order state snapshot for {order}");
759 }
760
761 if self.cache.borrow().has_backing()
762 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
763 {
764 log::error!("Failed to snapshot order state: {e}");
765 return;
766 }
767
768 if get_message_bus().borrow().has_backing {
769 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
770 msgbus::publish(topic, order);
771 }
772 }
773
774 fn create_position_state_snapshot(&self, position: &Position) {
775 if self.config.debug {
776 log::debug!("Creating position state snapshot for {position}");
777 }
778
779 let topic = switchboard::get_positions_snapshots_topic(position.id);
785 msgbus::publish(topic, position);
786 }
787
788 fn handle_event(&mut self, event: &OrderEventAny) {
791 if self.config.debug {
792 log::debug!("{RECV}{EVT} {event:?}");
793 }
794
795 let client_order_id = event.client_order_id();
796 let cache = self.cache.borrow();
797 let mut order = if let Some(order) = cache.order(&client_order_id) {
798 order.clone()
799 } else {
800 log::warn!(
801 "Order with {} not found in the cache to apply {}",
802 event.client_order_id(),
803 event
804 );
805
806 let venue_order_id = if let Some(id) = event.venue_order_id() {
808 id
809 } else {
810 log::error!(
811 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
812 event.client_order_id()
813 );
814 return;
815 };
816
817 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
819 id
820 } else {
821 log::error!(
822 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
823 event.client_order_id(),
824 );
825 return;
826 };
827
828 if let Some(order) = cache.order(client_order_id) {
830 log::info!("Order with {client_order_id} was found in the cache");
831 order.clone()
832 } else {
833 log::error!(
834 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
835 );
836 return;
837 }
838 };
839
840 drop(cache);
841
842 match event {
843 OrderEventAny::Filled(fill) => {
844 let oms_type = self.determine_oms_type(fill);
845 let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
846
847 let mut fill = *fill;
848 if fill.position_id.is_none() {
849 fill.position_id = Some(position_id);
850 }
851
852 if self.apply_fill_to_order(&mut order, fill).is_ok() {
853 self.handle_order_fill(&order, fill, oms_type);
854 }
855 }
856 _ => {
857 let _ = self.apply_event_to_order(&mut order, event.clone());
858 }
859 }
860 }
861
862 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
863 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
865 return *oms_type;
866 }
867
868 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
870 && let Some(client) = self.clients.get(client_id)
871 {
872 return client.oms_type();
873 }
874
875 if let Some(client) = &self.default_client {
876 return client.oms_type();
877 }
878
879 OmsType::Netting }
881
882 fn determine_position_id(
883 &mut self,
884 fill: OrderFilled,
885 oms_type: OmsType,
886 order: Option<&OrderAny>,
887 ) -> PositionId {
888 match oms_type {
889 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
890 OmsType::Netting => self.determine_netting_position_id(fill),
891 _ => self.determine_netting_position_id(fill), }
893 }
894
895 fn determine_hedging_position_id(
896 &mut self,
897 fill: OrderFilled,
898 order: Option<&OrderAny>,
899 ) -> PositionId {
900 if let Some(position_id) = fill.position_id {
902 if self.config.debug {
903 log::debug!("Already had a position ID of: {position_id}");
904 }
905 return position_id;
906 }
907
908 let cache = self.cache.borrow();
909
910 let order = if let Some(o) = order {
911 o
912 } else {
913 match cache.order(&fill.client_order_id()) {
914 Some(o) => o,
915 None => {
916 panic!(
917 "Order for {} not found to determine position ID",
918 fill.client_order_id()
919 );
920 }
921 }
922 };
923
924 if let Some(spawn_id) = order.exec_spawn_id() {
926 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
927 for spawned_order in spawn_orders {
928 if let Some(pos_id) = spawned_order.position_id() {
929 if self.config.debug {
930 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
931 }
932 return pos_id;
933 }
934 }
935 }
936
937 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
939 if self.config.debug {
940 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
941 }
942 position_id
943 }
944
945 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
946 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
947 }
948
949 fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
950 if order.is_duplicate_fill(&fill) {
951 log::warn!(
952 "Duplicate fill: {} trade_id={} already applied, skipping",
953 order.client_order_id(),
954 fill.trade_id
955 );
956 anyhow::bail!("Duplicate fill");
957 }
958
959 self.check_overfill(order, &fill)?;
960 let event = OrderEventAny::Filled(fill);
961 self.apply_order_event(order, event)
962 }
963
964 fn apply_event_to_order(
965 &self,
966 order: &mut OrderAny,
967 event: OrderEventAny,
968 ) -> anyhow::Result<()> {
969 self.apply_order_event(order, event)
970 }
971
972 fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
973 if let Err(e) = order.apply(event.clone()) {
974 match e {
975 OrderError::InvalidStateTransition => {
976 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
979 }
980 OrderError::DuplicateFill(trade_id) => {
981 log::warn!(
983 "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
984 );
985 anyhow::bail!("{e}");
986 }
987 _ => {
988 log::error!("Error applying event: {e}, did not apply {event}");
990 if should_handle_own_book_order(order) {
991 self.cache.borrow_mut().update_own_order_book(order);
992 }
993 anyhow::bail!("{e}");
994 }
995 }
996 }
997
998 if let Err(e) = self.cache.borrow_mut().update_order(order) {
999 log::error!("Error updating order in cache: {e}");
1000 }
1001
1002 if self.config.debug {
1003 log::debug!("{SEND}{EVT} {event}");
1004 }
1005
1006 let topic = switchboard::get_event_orders_topic(event.strategy_id());
1007 msgbus::publish(topic, &event);
1008
1009 if self.config.snapshot_orders {
1010 self.create_order_state_snapshot(order);
1011 }
1012
1013 Ok(())
1014 }
1015
1016 fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1017 let potential_overfill = order.calculate_overfill(fill.last_qty);
1018
1019 if potential_overfill.is_positive() {
1020 if self.config.allow_overfills {
1021 log::warn!(
1022 "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1023 order.client_order_id(),
1024 potential_overfill,
1025 order.filled_qty(),
1026 fill.last_qty,
1027 order.quantity()
1028 );
1029 } else {
1030 let msg = format!(
1031 "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1032 Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1033 order.client_order_id(),
1034 potential_overfill,
1035 order.filled_qty(),
1036 fill.last_qty,
1037 order.quantity()
1038 );
1039 anyhow::bail!("{msg}");
1040 }
1041 }
1042
1043 Ok(())
1044 }
1045
1046 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1047 let instrument =
1048 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1049 instrument.clone()
1050 } else {
1051 log::error!(
1052 "Cannot handle order fill: no instrument found for {}, {fill}",
1053 fill.instrument_id,
1054 );
1055 return;
1056 };
1057
1058 if self.cache.borrow().account(&fill.account_id).is_none() {
1059 log::error!(
1060 "Cannot handle order fill: no account found for {}, {fill}",
1061 fill.instrument_id.venue,
1062 );
1063 return;
1064 }
1065
1066 let position = if instrument.is_spread() {
1069 None
1070 } else {
1071 self.handle_position_update(instrument.clone(), fill, oms_type);
1072 let position_id = fill.position_id.unwrap();
1073 self.cache.borrow().position(&position_id).cloned()
1074 };
1075
1076 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1079 if !instrument.is_spread()
1081 && let Some(ref pos) = position
1082 && pos.is_open()
1083 {
1084 let position_id = pos.id;
1085 for client_order_id in order.linked_order_ids().unwrap_or_default() {
1086 let mut cache = self.cache.borrow_mut();
1087 let contingent_order = cache.mut_order(client_order_id);
1088 if let Some(contingent_order) = contingent_order
1089 && contingent_order.position_id().is_none()
1090 {
1091 contingent_order.set_position_id(Some(position_id));
1092
1093 if let Err(e) = self.cache.borrow_mut().add_position_id(
1094 &position_id,
1095 &contingent_order.instrument_id().venue,
1096 &contingent_order.client_order_id(),
1097 &contingent_order.strategy_id(),
1098 ) {
1099 log::error!("Failed to add position ID: {e}");
1100 }
1101 }
1102 }
1103 }
1104 }
1107 }
1108
1109 fn handle_position_update(
1113 &mut self,
1114 instrument: InstrumentAny,
1115 fill: OrderFilled,
1116 oms_type: OmsType,
1117 ) {
1118 let position_id = if let Some(position_id) = fill.position_id {
1119 position_id
1120 } else {
1121 log::error!("Cannot handle position update: no position ID found for fill {fill}");
1122 return;
1123 };
1124
1125 let position_opt = self.cache.borrow().position(&position_id).cloned();
1126
1127 match position_opt {
1128 None => {
1129 if self.open_position(instrument, None, fill, oms_type).is_ok() {
1131 }
1133 }
1134 Some(pos) if pos.is_closed() => {
1135 if self
1137 .open_position(instrument, Some(&pos), fill, oms_type)
1138 .is_ok()
1139 {
1140 }
1142 }
1143 Some(mut pos) => {
1144 if self.will_flip_position(&pos, fill) {
1145 self.flip_position(instrument, &mut pos, fill, oms_type);
1147 } else {
1148 self.update_position(&mut pos, fill);
1150 }
1151 }
1152 }
1153 }
1154
1155 fn open_position(
1156 &self,
1157 instrument: InstrumentAny,
1158 position: Option<&Position>,
1159 fill: OrderFilled,
1160 oms_type: OmsType,
1161 ) -> anyhow::Result<()> {
1162 if let Some(position) = position {
1163 if Self::is_duplicate_closed_fill(position, &fill) {
1164 log::warn!(
1165 "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1166 fill.trade_id,
1167 position.id,
1168 fill.order_side,
1169 fill.last_qty,
1170 fill.last_px
1171 );
1172 return Ok(());
1173 }
1174 self.reopen_position(position, oms_type)?;
1175 }
1176
1177 let position = Position::new(&instrument, fill);
1178 self.cache
1179 .borrow_mut()
1180 .add_position(position.clone(), oms_type)?; if self.config.snapshot_positions {
1183 self.create_position_state_snapshot(&position);
1184 }
1185
1186 let ts_init = self.clock.borrow().timestamp_ns();
1187 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1188 let topic = switchboard::get_event_positions_topic(event.strategy_id);
1189 msgbus::publish(topic, &event);
1190
1191 Ok(())
1192 }
1193
1194 fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1195 position.events.iter().any(|event| {
1196 event.trade_id == fill.trade_id
1197 && event.order_side == fill.order_side
1198 && event.last_px == fill.last_px
1199 && event.last_qty == fill.last_qty
1200 })
1201 }
1202
1203 fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1204 if oms_type == OmsType::Netting {
1205 if position.is_open() {
1206 anyhow::bail!(
1207 "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1208 position.id
1209 );
1210 }
1211 self.cache.borrow_mut().snapshot_position(position)?;
1213 } else {
1214 log::warn!(
1216 "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1217 position.id
1218 );
1219 }
1220 Ok(())
1221 }
1222
1223 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1224 position.apply(&fill);
1226
1227 let is_closed = position.is_closed();
1229
1230 if let Err(e) = self.cache.borrow_mut().update_position(position) {
1232 log::error!("Failed to update position: {e:?}");
1233 return;
1234 }
1235
1236 let cache = self.cache.borrow();
1238
1239 drop(cache);
1240
1241 if self.config.snapshot_positions {
1243 self.create_position_state_snapshot(position);
1244 }
1245
1246 let topic = switchboard::get_event_positions_topic(position.strategy_id);
1248 let ts_init = self.clock.borrow().timestamp_ns();
1249
1250 if is_closed {
1251 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1252 msgbus::publish(topic, &event);
1253 } else {
1254 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1255 msgbus::publish(topic, &event);
1256 }
1257 }
1258
1259 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1260 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1261 }
1262
1263 fn flip_position(
1264 &mut self,
1265 instrument: InstrumentAny,
1266 position: &mut Position,
1267 fill: OrderFilled,
1268 oms_type: OmsType,
1269 ) {
1270 let difference = match position.side {
1271 PositionSide::Long => Quantity::from_raw(
1272 fill.last_qty.raw - position.quantity.raw,
1273 position.size_precision,
1274 ),
1275 PositionSide::Short => Quantity::from_raw(
1276 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
1278 ),
1279 _ => fill.last_qty,
1280 };
1281
1282 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1284 let (commission1, commission2) = if let Some(commission) = fill.commission {
1285 let commission_currency = commission.currency;
1286 let commission1 = Money::new(commission * fill_percent, commission_currency);
1287 let commission2 = commission - commission1;
1288 (Some(commission1), Some(commission2))
1289 } else {
1290 log::error!("Commission is not available.");
1291 (None, None)
1292 };
1293
1294 let mut fill_split1: Option<OrderFilled> = None;
1295 if position.is_open() {
1296 fill_split1 = Some(OrderFilled::new(
1297 fill.trader_id,
1298 fill.strategy_id,
1299 fill.instrument_id,
1300 fill.client_order_id,
1301 fill.venue_order_id,
1302 fill.account_id,
1303 fill.trade_id,
1304 fill.order_side,
1305 fill.order_type,
1306 position.quantity,
1307 fill.last_px,
1308 fill.currency,
1309 fill.liquidity_side,
1310 UUID4::new(),
1311 fill.ts_event,
1312 fill.ts_init,
1313 fill.reconciliation,
1314 fill.position_id,
1315 commission1,
1316 ));
1317
1318 self.update_position(position, fill_split1.unwrap());
1319
1320 if oms_type == OmsType::Netting
1322 && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1323 {
1324 log::error!("Failed to snapshot position during flip: {e:?}");
1325 }
1326 }
1327
1328 if difference.raw == 0 {
1330 log::warn!(
1331 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1332 );
1333 return;
1334 }
1335
1336 let position_id_flip = if oms_type == OmsType::Hedging
1337 && let Some(position_id) = fill.position_id
1338 && position_id.is_virtual()
1339 {
1340 Some(self.pos_id_generator.generate(fill.strategy_id, true))
1342 } else {
1343 fill.position_id
1345 };
1346
1347 let fill_split2 = OrderFilled::new(
1348 fill.trader_id,
1349 fill.strategy_id,
1350 fill.instrument_id,
1351 fill.client_order_id,
1352 fill.venue_order_id,
1353 fill.account_id,
1354 fill.trade_id,
1355 fill.order_side,
1356 fill.order_type,
1357 difference,
1358 fill.last_px,
1359 fill.currency,
1360 fill.liquidity_side,
1361 UUID4::new(),
1362 fill.ts_event,
1363 fill.ts_init,
1364 fill.reconciliation,
1365 position_id_flip,
1366 commission2,
1367 );
1368
1369 if oms_type == OmsType::Hedging
1370 && let Some(position_id) = fill.position_id
1371 && position_id.is_virtual()
1372 {
1373 log::warn!("Closing position {fill_split1:?}");
1374 log::warn!("Flipping position {fill_split2:?}");
1375 }
1376
1377 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1379 log::error!("Failed to open flipped position: {e:?}");
1380 }
1381 }
1382
1383 fn set_position_id_counts(&mut self) {
1386 let cache = self.cache.borrow();
1388 let positions = cache.positions(None, None, None, None);
1389
1390 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1392
1393 for position in positions {
1394 *counts.entry(position.strategy_id).or_insert(0) += 1;
1395 }
1396
1397 self.pos_id_generator.reset();
1398
1399 for (strategy_id, count) in counts {
1400 self.pos_id_generator.set_count(count, strategy_id);
1401 log::info!("Set PositionId count for {strategy_id} to {count}");
1402 }
1403 }
1404
1405 fn last_px_for_conversion(
1406 &self,
1407 instrument_id: &InstrumentId,
1408 side: OrderSide,
1409 ) -> Option<Price> {
1410 let cache = self.cache.borrow();
1411
1412 if let Some(trade) = cache.trade(instrument_id) {
1414 return Some(trade.price);
1415 }
1416
1417 if let Some(quote) = cache.quote(instrument_id) {
1419 match side {
1420 OrderSide::Buy => Some(quote.ask_price),
1421 OrderSide::Sell => Some(quote.bid_price),
1422 OrderSide::NoOrderSide => None,
1423 }
1424 } else {
1425 None
1426 }
1427 }
1428
1429 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1430 log::info!(
1431 "Setting {} order quote quantity {} to base quantity {}",
1432 order.instrument_id(),
1433 order.quantity(),
1434 base_qty
1435 );
1436
1437 let original_qty = order.quantity();
1438 order.set_quantity(base_qty);
1439 order.set_leaves_qty(base_qty);
1440 order.set_is_quote_quantity(false);
1441
1442 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1443 return;
1444 }
1445
1446 if let Some(linked_order_ids) = order.linked_order_ids() {
1447 for client_order_id in linked_order_ids {
1448 match self.cache.borrow_mut().mut_order(client_order_id) {
1449 Some(contingent_order) => {
1450 if !contingent_order.is_quote_quantity() {
1451 continue; }
1453
1454 if contingent_order.quantity() != original_qty {
1455 log::warn!(
1456 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1457 contingent_order.quantity(),
1458 original_qty,
1459 base_qty
1460 );
1461 }
1462
1463 log::info!(
1464 "Setting {} order quote quantity {} to base quantity {}",
1465 contingent_order.instrument_id(),
1466 contingent_order.quantity(),
1467 base_qty
1468 );
1469
1470 contingent_order.set_quantity(base_qty);
1471 contingent_order.set_leaves_qty(base_qty);
1472 contingent_order.set_is_quote_quantity(false);
1473 }
1474 None => {
1475 log::error!("Contingency order {client_order_id} not found");
1476 }
1477 }
1478 }
1479 } else {
1480 log::warn!(
1481 "No linked order IDs found for order {}",
1482 order.client_order_id()
1483 );
1484 }
1485 }
1486
1487 fn deny_order(&self, order: &OrderAny, reason: &str) {
1488 log::error!(
1489 "Order denied: {reason}, order ID: {}",
1490 order.client_order_id()
1491 );
1492
1493 let denied = OrderDenied::new(
1494 order.trader_id(),
1495 order.strategy_id(),
1496 order.instrument_id(),
1497 order.client_order_id(),
1498 reason.into(),
1499 UUID4::new(),
1500 self.clock.borrow().timestamp_ns(),
1501 self.clock.borrow().timestamp_ns(),
1502 );
1503
1504 let mut order = order.clone();
1505
1506 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1507 log::error!("Failed to apply denied event to order: {e}");
1508 return;
1509 }
1510
1511 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1512 log::error!("Failed to update order in cache: {e}");
1513 return;
1514 }
1515
1516 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1517 msgbus::publish(topic, &OrderEventAny::Denied(denied));
1518
1519 if self.config.snapshot_orders {
1520 self.create_order_state_snapshot(&order);
1521 }
1522 }
1523
1524 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1525 let mut cache = self.cache.borrow_mut();
1526 if cache.own_order_book_mut(instrument_id).is_none() {
1527 let own_book = OwnOrderBook::new(*instrument_id);
1528 cache.add_own_order_book(own_book).unwrap();
1529 }
1530
1531 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1532 }
1533}