1pub mod config;
24
25use std::{
26 cell::RefCell,
27 collections::{HashMap, HashSet},
28 rc::Rc,
29 time::SystemTime,
30};
31
32use config::ExecutionEngineConfig;
33use nautilus_common::{
34 cache::Cache,
35 clock::Clock,
36 generators::position_id::PositionIdGenerator,
37 logging::{CMD, EVT, RECV},
38 msgbus::MessageBus,
39};
40use nautilus_core::UUID4;
41use nautilus_model::{
42 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
43 events::{
44 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
45 PositionOpened,
46 },
47 identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
48 instruments::InstrumentAny,
49 orders::{OrderAny, OrderError},
50 position::Position,
51 types::{Money, Price, Quantity},
52};
53
54use crate::{
55 client::ExecutionClient,
56 messages::{
57 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
58 SubmitOrderList, TradingCommand,
59 },
60};
61
62pub struct ExecutionEngine {
63 clock: Rc<RefCell<dyn Clock>>,
64 cache: Rc<RefCell<Cache>>,
65 msgbus: Rc<RefCell<MessageBus>>,
66 clients: HashMap<ClientId, ExecutionClient>,
67 default_client: Option<ExecutionClient>,
68 routing_map: HashMap<Venue, ClientId>,
69 oms_overrides: HashMap<StrategyId, OmsType>,
70 external_order_claims: HashMap<InstrumentId, StrategyId>,
71 pos_id_generator: PositionIdGenerator,
72 config: ExecutionEngineConfig,
73}
74
75impl ExecutionEngine {
76 pub fn new(
77 clock: Rc<RefCell<dyn Clock>>,
78 cache: Rc<RefCell<Cache>>,
79 msgbus: Rc<RefCell<MessageBus>>,
80 config: Option<ExecutionEngineConfig>,
81 ) -> Self {
82 let trader_id = msgbus.borrow().trader_id;
83 Self {
84 clock: clock.clone(),
85 cache,
86 msgbus,
87 clients: HashMap::new(),
88 default_client: None,
89 routing_map: HashMap::new(),
90 oms_overrides: HashMap::new(),
91 external_order_claims: HashMap::new(),
92 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
93 config: config.unwrap_or_default(),
94 }
95 }
96
97 #[must_use]
98 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
99 self.pos_id_generator.count(strategy_id)
100 }
101
102 #[must_use]
103 pub fn check_integrity(&self) -> bool {
104 self.cache.borrow_mut().check_integrity()
105 }
106
107 #[must_use]
108 pub fn check_connected(&self) -> bool {
109 self.clients.values().all(|c| c.is_connected)
110 }
111
112 #[must_use]
113 pub fn check_disconnected(&self) -> bool {
114 self.clients.values().all(|c| !c.is_connected)
115 }
116
117 #[must_use]
118 pub fn check_residuals(&self) -> bool {
119 self.cache.borrow().check_residuals()
120 }
121
122 #[must_use]
123 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
124 self.external_order_claims.keys().copied().collect()
125 }
126
127 pub fn register_client(&mut self, client: ExecutionClient) -> anyhow::Result<()> {
130 if self.clients.contains_key(&client.client_id) {
131 anyhow::bail!("Client already registered with ID {}", client.client_id);
132 }
133
134 self.routing_map.insert(client.venue, client.client_id);
136
137 log::info!("Registered client {}", client.client_id);
138 self.clients.insert(client.client_id, client);
139 Ok(())
140 }
141
142 pub fn register_default_client(&mut self, client: ExecutionClient) {
143 log::info!("Registered default client {}", client.client_id);
144 self.default_client = Some(client);
145 }
146
147 pub fn register_venue_routing(
148 &mut self,
149 client_id: ClientId,
150 venue: Venue,
151 ) -> anyhow::Result<()> {
152 if !self.clients.contains_key(&client_id) {
153 anyhow::bail!("No client registered with ID {client_id}");
154 }
155
156 self.routing_map.insert(venue, client_id);
157 log::info!("Set client {client_id} routing for {venue}");
158 Ok(())
159 }
160
161 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
167 if self.clients.remove(&client_id).is_some() {
168 self.routing_map
170 .retain(|_, mapped_id| mapped_id != &client_id);
171 log::info!("Deregistered client {client_id}");
172 Ok(())
173 } else {
174 anyhow::bail!("No client registered with ID {client_id}")
175 }
176 }
177
178 #[allow(clippy::await_holding_refcell_ref)]
180 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
181 let ts = SystemTime::now();
182
183 {
184 let mut cache = self.cache.borrow_mut();
185 cache.cache_general()?;
186 self.cache.borrow_mut().cache_all().await?;
187 cache.build_index();
188 let _ = cache.check_integrity();
189 }
190
191 self.set_position_id_counts();
192
193 log::info!(
194 "Loaded cache in {}ms",
195 SystemTime::now()
196 .duration_since(ts)
197 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {}", e))?
198 .as_millis()
199 );
200
201 Ok(())
202 }
203
204 pub fn flush_db(&self) {
205 self.cache.borrow_mut().flush_db();
206 }
207
208 pub fn process(&mut self, event: &OrderEventAny) {
209 self.handle_event(event);
210 }
211
212 pub fn execute(&self, command: TradingCommand) {
213 self.execute_command(command);
214 }
215
216 fn execute_command(&self, command: TradingCommand) {
219 if self.config.debug {
220 log::debug!("{RECV}{CMD} {command:?}");
221 }
222
223 let client = if let Some(client) = self
224 .clients
225 .get(&command.client_id())
226 .or_else(|| {
227 self.routing_map
228 .get(&command.instrument_id().venue)
229 .and_then(|client_id| self.clients.get(client_id))
230 })
231 .or(self.default_client.as_ref())
232 {
233 client
234 } else {
235 log::error!(
236 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
237 command.client_id(),
238 command.instrument_id().venue,
239 );
240 return;
241 };
242
243 match command {
244 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
245 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
246 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
247 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
248 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
249 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
250 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
251 }
252 }
253
254 fn handle_submit_order(&self, client: &ExecutionClient, command: SubmitOrder) {
255 let mut command = command;
256 let mut order = command.order.clone();
257 let client_order_id = order.client_order_id();
258 let instrument_id = order.instrument_id();
259
260 if !self.cache.borrow().order_exists(&client_order_id) {
262 {
264 let mut cache = self.cache.borrow_mut();
265 if let Err(e) = cache.add_order(
266 order.clone(),
267 command.position_id,
268 Some(command.client_id),
269 true,
270 ) {
271 log::error!("Error adding order to cache: {e}");
272 return;
273 }
274 }
275
276 if self.config.snapshot_orders {
277 self.create_order_state_snapshot(&order);
278 }
279 }
280
281 let instrument = {
283 let cache = self.cache.borrow();
284 if let Some(instrument) = cache.instrument(&instrument_id) {
285 instrument.clone()
286 } else {
287 log::error!(
288 "Cannot handle submit order: no instrument found for {instrument_id}, {command}",
289 );
290 return;
291 }
292 };
293
294 if !instrument.is_inverse() && order.is_quote_quantity() {
296 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
297
298 if let Some(price) = last_px {
299 let base_qty = instrument.get_base_quantity(order.quantity(), price);
300 self.set_order_base_qty(&mut order, base_qty);
301 } else {
302 self.deny_order(
303 &order,
304 &format!("no-price-to-convert-quote-qty {instrument_id}"),
305 );
306 return;
307 }
308 }
309
310 command.order = order;
311
312 if let Err(e) = client.submit_order(command.clone()) {
314 log::error!("Error submitting order to client: {e}");
315 self.deny_order(
316 &command.order,
317 &format!("failed-to-submit-order-to-client: {e}"),
318 );
319 }
320 }
321
322 fn handle_submit_order_list(&self, client: &ExecutionClient, mut command: SubmitOrderList) {
323 let orders = command.order_list.orders.clone();
324
325 let mut cache = self.cache.borrow_mut();
327 for order in &orders {
328 if !cache.order_exists(&order.client_order_id()) {
329 if let Err(e) = cache.add_order(
330 order.clone(),
331 command.position_id,
332 Some(command.client_id),
333 true,
334 ) {
335 log::error!("Error adding order to cache: {e}");
336 return;
337 }
338
339 if self.config.snapshot_orders {
340 self.create_order_state_snapshot(order);
341 }
342 }
343 }
344 drop(cache);
345
346 let cache = self.cache.borrow();
348 let instrument = if let Some(instrument) = cache.instrument(&command.instrument_id) {
349 instrument
350 } else {
351 log::error!(
352 "Cannot handle submit order list: no instrument found for {}, {command}",
353 command.instrument_id,
354 );
355 return;
356 };
357
358 if !instrument.is_inverse() && command.order_list.orders[0].is_quote_quantity() {
360 let mut quote_qty = None;
361 let mut last_px = None;
362
363 for order in &mut command.order_list.orders {
364 if !order.is_quote_quantity() {
365 continue; }
367
368 if Some(order.quantity()) != quote_qty {
369 last_px =
370 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
371 quote_qty = Some(order.quantity());
372 }
373
374 if let Some(px) = last_px {
375 let base_qty = instrument.get_base_quantity(order.quantity(), px);
376 self.set_order_base_qty(order, base_qty);
377 } else {
378 for order in &command.order_list.orders {
379 self.deny_order(
380 order,
381 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
382 );
383 }
384 return; }
386 }
387 }
388
389 if let Err(e) = client.submit_order_list(command) {
391 log::error!("Error submitting order list to client: {e}");
392 for order in &orders {
393 self.deny_order(
394 order,
395 &format!("failed-to-submit-order-list-to-client: {e}"),
396 );
397 }
398 }
399 }
400
401 fn handle_modify_order(&self, client: &ExecutionClient, command: ModifyOrder) {
402 if let Err(e) = client.modify_order(command) {
403 log::error!("Error modifying order: {e}");
404 }
405 }
406
407 fn handle_cancel_order(&self, client: &ExecutionClient, command: CancelOrder) {
408 if let Err(e) = client.cancel_order(command) {
409 log::error!("Error canceling order: {e}");
410 }
411 }
412
413 fn handle_cancel_all_orders(&self, client: &ExecutionClient, command: CancelAllOrders) {
414 if let Err(e) = client.cancel_all_orders(command) {
415 log::error!("Error canceling all orders: {e}");
416 }
417 }
418
419 fn handle_batch_cancel_orders(&self, client: &ExecutionClient, command: BatchCancelOrders) {
420 if let Err(e) = client.batch_cancel_orders(command) {
421 log::error!("Error batch canceling orders: {e}");
422 }
423 }
424
425 fn handle_query_order(&self, client: &ExecutionClient, command: QueryOrder) {
426 if let Err(e) = client.query_order(command) {
427 log::error!("Error querying order: {e}");
428 }
429 }
430
431 fn create_order_state_snapshot(&self, order: &OrderAny) {
432 if self.config.debug {
433 log::debug!("Creating order state snapshot for {order}");
434 }
435
436 if self.cache.borrow().has_backing() {
437 if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
438 log::error!("Failed to snapshot order state: {e}");
439 return;
440 }
441 }
442
443 let mut msgbus = self.msgbus.borrow_mut();
444 if msgbus.has_backing {
445 let topic = msgbus
446 .switchboard
447 .get_order_snapshots_topic(order.client_order_id());
448 msgbus.publish(&topic, order);
449 }
450 }
451
452 fn create_position_state_snapshot(&self, position: &Position) {
453 if self.config.debug {
454 log::debug!("Creating position state snapshot for {position}");
455 }
456
457 let mut msgbus = self.msgbus.borrow_mut();
463 let topic = msgbus
464 .switchboard
465 .get_positions_snapshots_topic(position.id);
466 msgbus.publish(&topic, position);
467 }
468
469 fn handle_event(&mut self, event: &OrderEventAny) {
472 if self.config.debug {
473 log::debug!("{RECV}{EVT} {event:?}");
474 }
475
476 let client_order_id = event.client_order_id();
477 let cache = self.cache.borrow();
478 let mut order = if let Some(order) = cache.order(&client_order_id) {
479 order.clone()
480 } else {
481 log::warn!(
482 "Order with {} not found in the cache to apply {}",
483 event.client_order_id(),
484 event
485 );
486
487 let venue_order_id = if let Some(id) = event.venue_order_id() {
489 id
490 } else {
491 log::error!(
492 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
493 event.client_order_id()
494 );
495 return;
496 };
497
498 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
500 id
501 } else {
502 log::error!(
503 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
504 event.client_order_id(),
505 );
506 return;
507 };
508
509 if let Some(order) = cache.order(client_order_id) {
511 log::info!("Order with {client_order_id} was found in the cache");
512 order.clone()
513 } else {
514 log::error!(
515 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
516 );
517 return;
518 }
519 };
520
521 drop(cache);
522 match event {
523 OrderEventAny::Filled(order_filled) => {
524 let oms_type = self.determine_oms_type(order_filled);
525 let position_id = self.determine_position_id(*order_filled, oms_type);
526
527 let mut order_filled = *order_filled;
529 if order_filled.position_id.is_none() {
530 order_filled.position_id = Some(position_id);
531 }
532
533 self.apply_event_to_order(&mut order, OrderEventAny::Filled(order_filled));
534 self.handle_order_fill(&order, order_filled, oms_type);
535 }
536 _ => {
537 self.apply_event_to_order(&mut order, event.clone());
538 }
539 }
540 }
541
542 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
543 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
545 return *oms_type;
546 }
547
548 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
550 if let Some(client) = self.clients.get(client_id) {
551 return client.oms_type;
552 }
553 }
554
555 if let Some(client) = &self.default_client {
556 return client.oms_type;
557 }
558
559 OmsType::Netting }
561
562 fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
563 match oms_type {
564 OmsType::Hedging => self.determine_hedging_position_id(fill),
565 OmsType::Netting => self.determine_netting_position_id(fill),
566 _ => self.determine_netting_position_id(fill), }
568 }
569
570 fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
571 if let Some(position_id) = fill.position_id {
573 if self.config.debug {
574 log::debug!("Already had a position ID of: {}", position_id);
575 }
576 return position_id;
577 }
578
579 let cache = self.cache.borrow();
581 let order = match cache.order(&fill.client_order_id()) {
582 Some(o) => o,
583 None => {
584 panic!(
585 "Order for {} not found to determine position ID",
586 fill.client_order_id()
587 );
588 }
589 };
590
591 if let Some(spawn_id) = order.exec_spawn_id() {
593 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
594 for spawned_order in spawn_orders {
595 if let Some(pos_id) = spawned_order.position_id() {
596 if self.config.debug {
597 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
598 }
599 return pos_id;
600 }
601 }
602 }
603
604 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
606 if self.config.debug {
607 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
608 }
609 position_id
610 }
611
612 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
613 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
614 }
615
616 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
617 if let Err(e) = order.apply(event.clone()) {
618 match e {
619 OrderError::InvalidStateTransition => {
620 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
621 }
622 _ => {
623 log::error!("Error applying event: {e}, did not apply {event}");
624 }
625 }
626 return;
627 }
628
629 if let Err(e) = self.cache.borrow_mut().update_order(order) {
630 log::error!("Error updating order in cache: {e}");
631 }
632
633 let mut msgbus = self.msgbus.borrow_mut();
634 let topic = msgbus
635 .switchboard
636 .get_event_orders_topic(event.strategy_id());
637 msgbus.publish(&topic, order);
638
639 if self.config.snapshot_orders {
640 self.create_order_state_snapshot(order);
641 }
642 }
643
644 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
645 let instrument =
646 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
647 instrument.clone()
648 } else {
649 log::error!(
650 "Cannot handle order fill: no instrument found for {}, {fill}",
651 fill.instrument_id,
652 );
653 return;
654 };
655
656 if self.cache.borrow().account(&fill.account_id).is_none() {
657 log::error!(
658 "Cannot handle order fill: no account found for {}, {fill}",
659 fill.instrument_id.venue,
660 );
661 return;
662 }
663
664 let position_id = if let Some(position_id) = fill.position_id {
665 position_id
666 } else {
667 log::error!("Cannot handle order fill: no position ID found for fill {fill}",);
668 return;
669 };
670
671 let mut position = match self.cache.borrow().position(&position_id) {
672 Some(pos) if !pos.is_closed() => pos.clone(),
673 _ => self
674 .open_position(instrument.clone(), None, fill, oms_type)
675 .unwrap(),
676 };
677
678 if self.will_flip_position(&position, fill) {
679 self.flip_position(instrument, &mut position, fill, oms_type);
680 } else {
681 self.update_position(&mut position, fill);
682 }
683
684 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
685 for client_order_id in order.linked_order_ids().unwrap_or_default() {
686 let mut cache = self.cache.borrow_mut();
687 let contingent_order = cache.mut_order(&client_order_id);
688 if let Some(contingent_order) = contingent_order {
689 if contingent_order.position_id().is_none() {
690 contingent_order.set_position_id(Some(position_id));
691
692 if let Err(e) = self.cache.borrow_mut().add_position_id(
693 &position_id,
694 &contingent_order.instrument_id().venue,
695 &contingent_order.client_order_id(),
696 &contingent_order.strategy_id(),
697 ) {
698 log::error!("Failed to add position ID: {e}");
699 }
700 }
701 }
702 }
703 }
704 }
705
706 fn open_position(
707 &self,
708 instrument: InstrumentAny,
709 position: Option<&Position>,
710 fill: OrderFilled,
711 oms_type: OmsType,
712 ) -> anyhow::Result<Position> {
713 let position = if let Some(position) = position {
714 self.cache.borrow_mut().snapshot_position(position)?;
716 let mut position = position.clone();
717 position.apply(&fill);
718 self.cache.borrow_mut().update_position(&position)?;
719 position
720 } else {
721 let position = Position::new(&instrument, fill);
722 self.cache
723 .borrow_mut()
724 .add_position(position.clone(), oms_type)?;
725 if self.config.snapshot_positions {
726 self.create_position_state_snapshot(&position);
727 }
728 position
729 };
730
731 let ts_init = self.clock.borrow().timestamp_ns();
732 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
733 let mut msgbus = self.msgbus.borrow_mut();
734 let topic = msgbus
735 .switchboard
736 .get_event_positions_topic(event.strategy_id);
737 msgbus.publish(&topic, &event);
738
739 Ok(position)
740 }
741
742 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
743 position.apply(&fill);
744
745 if let Err(e) = self.cache.borrow_mut().update_position(position) {
746 log::error!("Failed to update position: {e:?}");
747 return;
748 }
749
750 if self.config.snapshot_positions {
751 self.create_position_state_snapshot(position);
752 }
753
754 let mut msgbus = self.msgbus.borrow_mut();
755 let topic = msgbus
756 .switchboard
757 .get_event_positions_topic(position.strategy_id);
758 let ts_init = self.clock.borrow().timestamp_ns();
759
760 if position.is_closed() {
761 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
762 msgbus.publish(&topic, &event);
763 } else {
764 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
765 msgbus.publish(&topic, &event);
766 }
767 }
768
769 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
770 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
771 }
772
773 fn flip_position(
774 &mut self,
775 instrument: InstrumentAny,
776 position: &mut Position,
777 fill: OrderFilled,
778 oms_type: OmsType,
779 ) {
780 let difference = match position.side {
781 PositionSide::Long => Quantity::from_raw(
782 fill.last_qty.raw - position.quantity.raw,
783 position.size_precision,
784 ),
785 PositionSide::Short => Quantity::from_raw(
786 position.quantity.raw - fill.last_qty.raw,
787 position.size_precision,
788 ),
789 _ => fill.last_qty,
790 };
791
792 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
794 let (commission1, commission2) = if let Some(commission) = fill.commission {
795 let commission_currency = commission.currency;
796 let commission1 = Money::new(commission * fill_percent, commission_currency);
797 let commission2 = commission - commission1;
798 (Some(commission1), Some(commission2))
799 } else {
800 log::error!("Commission is not available.");
801 (None, None)
802 };
803
804 let mut fill_split1: Option<OrderFilled> = None;
805 if position.is_open() {
806 fill_split1 = Some(OrderFilled::new(
807 fill.trader_id,
808 fill.strategy_id,
809 fill.instrument_id,
810 fill.client_order_id,
811 fill.venue_order_id,
812 fill.account_id,
813 fill.trade_id,
814 fill.order_side,
815 fill.order_type,
816 position.quantity,
817 fill.last_px,
818 fill.currency,
819 fill.liquidity_side,
820 UUID4::new(),
821 fill.ts_event,
822 fill.ts_init,
823 fill.reconciliation,
824 fill.position_id,
825 commission1,
826 ));
827
828 self.update_position(position, fill_split1.unwrap());
829 }
830
831 if difference.raw == 0 {
833 log::warn!(
834 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
835 );
836 return;
837 }
838
839 let position_id_flip = if oms_type == OmsType::Hedging {
840 if let Some(position_id) = fill.position_id {
841 if position_id.is_virtual() {
842 Some(self.pos_id_generator.generate(fill.strategy_id, true))
844 } else {
845 Some(position_id)
846 }
847 } else {
848 None
849 }
850 } else {
851 fill.position_id
852 };
853
854 let fill_split2 = OrderFilled::new(
855 fill.trader_id,
856 fill.strategy_id,
857 fill.instrument_id,
858 fill.client_order_id,
859 fill.venue_order_id,
860 fill.account_id,
861 fill.trade_id,
862 fill.order_side,
863 fill.order_type,
864 difference,
865 fill.last_px,
866 fill.currency,
867 fill.liquidity_side,
868 UUID4::new(),
869 fill.ts_event,
870 fill.ts_init,
871 fill.reconciliation,
872 position_id_flip,
873 commission2,
874 );
875
876 if oms_type == OmsType::Hedging {
877 if let Some(position_id) = fill.position_id {
878 if position_id.is_virtual() {
879 log::warn!("Closing position {fill_split1:?}");
880 log::warn!("Flipping position {fill_split2:?}");
881 }
882 }
883 }
884
885 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
887 log::error!("Failed to open flipped position: {e:?}");
888 }
889 }
890
891 fn set_position_id_counts(&mut self) {
894 let cache = self.cache.borrow();
896 let positions = cache.positions(None, None, None, None);
897
898 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
900
901 for position in positions {
902 *counts.entry(position.strategy_id).or_insert(0) += 1;
903 }
904
905 self.pos_id_generator.reset();
906
907 for (strategy_id, count) in counts {
908 self.pos_id_generator.set_count(count, strategy_id);
909 log::info!("Set PositionId count for {strategy_id} to {count}");
910 }
911 }
912
913 fn last_px_for_conversion(
914 &self,
915 instrument_id: &InstrumentId,
916 side: OrderSide,
917 ) -> Option<Price> {
918 let cache = self.cache.borrow();
919
920 if let Some(trade) = cache.trade(instrument_id) {
922 return Some(trade.price);
923 }
924
925 if let Some(quote) = cache.quote(instrument_id) {
927 match side {
928 OrderSide::Buy => Some(quote.ask_price),
929 OrderSide::Sell => Some(quote.bid_price),
930 OrderSide::NoOrderSide => None,
931 }
932 } else {
933 None
934 }
935 }
936
937 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
938 log::info!(
939 "Setting {} order quote quantity {} to base quantity {}",
940 order.instrument_id(),
941 order.quantity(),
942 base_qty
943 );
944
945 let original_qty = order.quantity();
946 order.set_quantity(base_qty);
947 order.set_leaves_qty(base_qty);
948 order.set_is_quote_quantity(false);
949
950 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
951 return;
952 }
953
954 if let Some(linked_order_ids) = order.linked_order_ids() {
955 for client_order_id in linked_order_ids {
956 match self.cache.borrow_mut().mut_order(&client_order_id) {
957 Some(contingent_order) => {
958 if !contingent_order.is_quote_quantity() {
959 continue; }
961
962 if contingent_order.quantity() != original_qty {
963 log::warn!(
964 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
965 contingent_order.quantity(),
966 original_qty,
967 base_qty
968 );
969 }
970
971 log::info!(
972 "Setting {} order quote quantity {} to base quantity {}",
973 contingent_order.instrument_id(),
974 contingent_order.quantity(),
975 base_qty
976 );
977
978 contingent_order.set_quantity(base_qty);
979 contingent_order.set_leaves_qty(base_qty);
980 contingent_order.set_is_quote_quantity(false);
981 }
982 None => {
983 log::error!("Contingency order {client_order_id} not found");
984 }
985 }
986 }
987 } else {
988 log::warn!(
989 "No linked order IDs found for order {}",
990 order.client_order_id()
991 );
992 }
993 }
994
995 fn deny_order(&self, order: &OrderAny, reason: &str) {
996 log::error!(
997 "Order denied: {reason}, order ID: {}",
998 order.client_order_id()
999 );
1000
1001 let denied = OrderDenied::new(
1002 order.trader_id(),
1003 order.strategy_id(),
1004 order.instrument_id(),
1005 order.client_order_id(),
1006 reason.into(),
1007 UUID4::new(),
1008 self.clock.borrow().timestamp_ns(),
1009 self.clock.borrow().timestamp_ns(),
1010 );
1011
1012 let mut order = order.clone();
1013
1014 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1015 log::error!("Failed to apply denied event to order: {e}");
1016 return;
1017 }
1018
1019 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1020 log::error!("Failed to update order in cache: {e}");
1021 return;
1022 }
1023
1024 let mut msgbus = self.msgbus.borrow_mut();
1025 let topic = msgbus
1026 .switchboard
1027 .get_event_orders_topic(order.strategy_id());
1028 msgbus.publish(&topic, &denied);
1029
1030 if self.config.snapshot_orders {
1031 self.create_order_state_snapshot(&order);
1032 }
1033 }
1034}
1035
1036#[cfg(test)]
1040mod tests {
1041 use std::{cell::RefCell, rc::Rc};
1042
1043 use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1044 use rstest::fixture;
1045
1046 use super::*;
1047
1048 #[fixture]
1049 fn msgbus() -> MessageBus {
1050 MessageBus::default()
1051 }
1052
1053 #[fixture]
1054 fn simple_cache() -> Cache {
1055 Cache::new(None, None)
1056 }
1057
1058 #[fixture]
1059 fn clock() -> TestClock {
1060 TestClock::new()
1061 }
1062
1063 fn _get_exec_engine(
1065 msgbus: Rc<RefCell<MessageBus>>,
1066 cache: Rc<RefCell<Cache>>,
1067 clock: Rc<RefCell<TestClock>>,
1068 config: Option<ExecutionEngineConfig>,
1069 ) -> ExecutionEngine {
1070 ExecutionEngine::new(clock, cache, msgbus, config)
1071 }
1072
1073 }