1pub mod config;
24
25use std::{
26 cell::RefCell,
27 collections::{HashMap, HashSet},
28 rc::Rc,
29 time::SystemTime,
30};
31
32use config::ExecutionEngineConfig;
33use nautilus_common::{
34 cache::Cache,
35 clock::Clock,
36 generators::position_id::PositionIdGenerator,
37 logging::{CMD, EVT, RECV},
38 msgbus::MessageBus,
39};
40use nautilus_core::UUID4;
41use nautilus_model::{
42 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
43 events::{
44 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
45 PositionOpened,
46 },
47 identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
48 instruments::InstrumentAny,
49 orders::{OrderAny, OrderError},
50 position::Position,
51 types::{Money, Price, Quantity},
52};
53
54use crate::{
55 client::ExecutionClient,
56 messages::{
57 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
58 SubmitOrderList, TradingCommand,
59 },
60};
61
62pub struct ExecutionEngine {
63 clock: Rc<RefCell<dyn Clock>>,
64 cache: Rc<RefCell<Cache>>,
65 msgbus: Rc<RefCell<MessageBus>>,
66 clients: HashMap<ClientId, ExecutionClient>,
67 default_client: Option<ExecutionClient>,
68 routing_map: HashMap<Venue, ClientId>,
69 oms_overrides: HashMap<StrategyId, OmsType>,
70 external_order_claims: HashMap<InstrumentId, StrategyId>,
71 pos_id_generator: PositionIdGenerator,
72 config: ExecutionEngineConfig,
73}
74
75impl ExecutionEngine {
76 pub fn new(
77 clock: Rc<RefCell<dyn Clock>>,
78 cache: Rc<RefCell<Cache>>,
79 msgbus: Rc<RefCell<MessageBus>>,
80 config: ExecutionEngineConfig,
81 ) -> Self {
82 let trader_id = msgbus.borrow().trader_id;
83 Self {
84 clock: clock.clone(),
85 cache,
86 msgbus,
87 clients: HashMap::new(),
88 default_client: None,
89 routing_map: HashMap::new(),
90 oms_overrides: HashMap::new(),
91 external_order_claims: HashMap::new(),
92 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
93 config,
94 }
95 }
96
97 #[must_use]
98 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
99 self.pos_id_generator.count(strategy_id)
100 }
101
102 #[must_use]
103 pub fn check_integrity(&self) -> bool {
104 self.cache.borrow_mut().check_integrity()
105 }
106
107 #[must_use]
108 pub fn check_connected(&self) -> bool {
109 self.clients.values().all(|c| c.is_connected)
110 }
111
112 #[must_use]
113 pub fn check_disconnected(&self) -> bool {
114 self.clients.values().all(|c| !c.is_connected)
115 }
116
117 #[must_use]
118 pub fn check_residuals(&self) -> bool {
119 self.cache.borrow().check_residuals()
120 }
121
122 #[must_use]
123 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
124 self.external_order_claims.keys().copied().collect()
125 }
126
127 pub fn register_client(&mut self, client: ExecutionClient) -> anyhow::Result<()> {
130 if self.clients.contains_key(&client.client_id) {
131 anyhow::bail!("Client already registered with ID {}", client.client_id);
132 }
133
134 self.routing_map.insert(client.venue, client.client_id);
136
137 log::info!("Registered client {}", client.client_id);
138 self.clients.insert(client.client_id, client);
139 Ok(())
140 }
141
142 pub fn register_default_client(&mut self, client: ExecutionClient) {
143 log::info!("Registered default client {}", client.client_id);
144 self.default_client = Some(client);
145 }
146
147 pub fn register_venue_routing(
148 &mut self,
149 client_id: ClientId,
150 venue: Venue,
151 ) -> anyhow::Result<()> {
152 if !self.clients.contains_key(&client_id) {
153 anyhow::bail!("No client registered with ID {client_id}");
154 }
155
156 self.routing_map.insert(venue, client_id);
157 log::info!("Set client {client_id} routing for {venue}");
158 Ok(())
159 }
160
161 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
167 if self.clients.remove(&client_id).is_some() {
168 self.routing_map
170 .retain(|_, mapped_id| mapped_id != &client_id);
171 log::info!("Deregistered client {client_id}");
172 Ok(())
173 } else {
174 anyhow::bail!("No client registered with ID {client_id}")
175 }
176 }
177
178 pub fn load_cache(&mut self) -> anyhow::Result<()> {
181 let ts = SystemTime::now();
182
183 {
184 let mut cache = self.cache.borrow_mut();
185 cache.cache_general()?;
186 cache.cache_currencies()?;
187 cache.cache_instruments()?;
188 cache.cache_accounts()?;
189 cache.cache_orders()?;
190 cache.cache_positions()?;
191
192 cache.build_index();
193 let _ = cache.check_integrity();
194 }
195
196 self.set_position_id_counts();
197
198 log::info!(
199 "Loaded cache in {}ms",
200 SystemTime::now()
201 .duration_since(ts)
202 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {}", e))?
203 .as_millis()
204 );
205
206 Ok(())
207 }
208
209 pub fn flush_db(&self) {
210 self.cache.borrow_mut().flush_db();
211 }
212
213 pub fn process(&mut self, event: &OrderEventAny) {
214 self.handle_event(event);
215 }
216
217 pub fn execute(&self, command: TradingCommand) {
218 self.execute_command(command);
219 }
220
221 fn execute_command(&self, command: TradingCommand) {
224 if self.config.debug {
225 log::debug!("{RECV}{CMD} {command:?}");
226 }
227
228 let client = if let Some(client) = self
229 .clients
230 .get(&command.client_id())
231 .or_else(|| {
232 self.routing_map
233 .get(&command.instrument_id().venue)
234 .and_then(|client_id| self.clients.get(client_id))
235 })
236 .or(self.default_client.as_ref())
237 {
238 client
239 } else {
240 log::error!(
241 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
242 command.client_id(),
243 command.instrument_id().venue,
244 );
245 return;
246 };
247
248 match command {
249 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
250 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
251 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
252 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
253 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
254 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
255 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
256 }
257 }
258
259 fn handle_submit_order(&self, client: &ExecutionClient, command: SubmitOrder) {
260 let mut command = command;
261 let mut order = command.order.clone();
262 let client_order_id = order.client_order_id();
263 let instrument_id = order.instrument_id();
264
265 if !self.cache.borrow().order_exists(&client_order_id) {
267 {
269 let mut cache = self.cache.borrow_mut();
270 if let Err(e) = cache.add_order(
271 order.clone(),
272 command.position_id,
273 Some(command.client_id),
274 true,
275 ) {
276 log::error!("Error adding order to cache: {e}");
277 return;
278 }
279 }
280
281 if self.config.snapshot_orders {
282 self.create_order_state_snapshot(&order);
283 }
284 }
285
286 let instrument = {
288 let cache = self.cache.borrow();
289 if let Some(instrument) = cache.instrument(&instrument_id) {
290 instrument.clone()
291 } else {
292 log::error!(
293 "Cannot handle submit order: no instrument found for {instrument_id}, {command}",
294 );
295 return;
296 }
297 };
298
299 if !instrument.is_inverse() && order.is_quote_quantity() {
301 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
302
303 if let Some(price) = last_px {
304 let base_qty = instrument.get_base_quantity(order.quantity(), price);
305 self.set_order_base_qty(&mut order, base_qty);
306 } else {
307 self.deny_order(
308 &order,
309 &format!("no-price-to-convert-quote-qty {instrument_id}"),
310 );
311 return;
312 }
313 }
314
315 command.order = order;
316
317 if let Err(e) = client.submit_order(command.clone()) {
319 log::error!("Error submitting order to client: {e}");
320 self.deny_order(
321 &command.order,
322 &format!("failed-to-submit-order-to-client: {e}"),
323 );
324 }
325 }
326
327 fn handle_submit_order_list(&self, client: &ExecutionClient, mut command: SubmitOrderList) {
328 let orders = command.order_list.orders.clone();
329
330 let mut cache = self.cache.borrow_mut();
332 for order in &orders {
333 if !cache.order_exists(&order.client_order_id()) {
334 if let Err(e) = cache.add_order(
335 order.clone(),
336 command.position_id,
337 Some(command.client_id),
338 true,
339 ) {
340 log::error!("Error adding order to cache: {e}");
341 return;
342 }
343
344 if self.config.snapshot_orders {
345 self.create_order_state_snapshot(order);
346 }
347 }
348 }
349 drop(cache);
350
351 let cache = self.cache.borrow();
353 let instrument = if let Some(instrument) = cache.instrument(&command.instrument_id) {
354 instrument
355 } else {
356 log::error!(
357 "Cannot handle submit order list: no instrument found for {}, {command}",
358 command.instrument_id,
359 );
360 return;
361 };
362
363 if !instrument.is_inverse() && command.order_list.orders[0].is_quote_quantity() {
365 let mut quote_qty = None;
366 let mut last_px = None;
367
368 for order in &mut command.order_list.orders {
369 if !order.is_quote_quantity() {
370 continue; }
372
373 if Some(order.quantity()) != quote_qty {
374 last_px =
375 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
376 quote_qty = Some(order.quantity());
377 }
378
379 if let Some(px) = last_px {
380 let base_qty = instrument.get_base_quantity(order.quantity(), px);
381 self.set_order_base_qty(order, base_qty);
382 } else {
383 for order in &command.order_list.orders {
384 self.deny_order(
385 order,
386 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
387 );
388 }
389 return; }
391 }
392 }
393
394 if let Err(e) = client.submit_order_list(command) {
396 log::error!("Error submitting order list to client: {e}");
397 for order in &orders {
398 self.deny_order(
399 order,
400 &format!("failed-to-submit-order-list-to-client: {e}"),
401 );
402 }
403 }
404 }
405
406 fn handle_modify_order(&self, client: &ExecutionClient, command: ModifyOrder) {
407 if let Err(e) = client.modify_order(command) {
408 log::error!("Error modifying order: {e}");
409 }
410 }
411
412 fn handle_cancel_order(&self, client: &ExecutionClient, command: CancelOrder) {
413 if let Err(e) = client.cancel_order(command) {
414 log::error!("Error canceling order: {e}");
415 }
416 }
417
418 fn handle_cancel_all_orders(&self, client: &ExecutionClient, command: CancelAllOrders) {
419 if let Err(e) = client.cancel_all_orders(command) {
420 log::error!("Error canceling all orders: {e}");
421 }
422 }
423
424 fn handle_batch_cancel_orders(&self, client: &ExecutionClient, command: BatchCancelOrders) {
425 if let Err(e) = client.batch_cancel_orders(command) {
426 log::error!("Error batch canceling orders: {e}");
427 }
428 }
429
430 fn handle_query_order(&self, client: &ExecutionClient, command: QueryOrder) {
431 if let Err(e) = client.query_order(command) {
432 log::error!("Error querying order: {e}");
433 }
434 }
435
436 fn create_order_state_snapshot(&self, order: &OrderAny) {
437 if self.config.debug {
438 log::debug!("Creating order state snapshot for {order}");
439 }
440
441 if self.cache.borrow().has_backing() {
442 if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
443 log::error!("Failed to snapshot order state: {e}");
444 return;
445 }
446 }
447
448 let mut msgbus = self.msgbus.borrow_mut();
449 if msgbus.has_backing {
450 let topic = msgbus
451 .switchboard
452 .get_order_snapshots_topic(order.client_order_id());
453 msgbus.publish(&topic, order);
454 }
455 }
456
457 fn create_position_state_snapshot(&self, position: &Position) {
458 if self.config.debug {
459 log::debug!("Creating position state snapshot for {position}");
460 }
461
462 let mut msgbus = self.msgbus.borrow_mut();
468 let topic = msgbus
469 .switchboard
470 .get_positions_snapshots_topic(position.id);
471 msgbus.publish(&topic, position);
472 }
473
474 fn handle_event(&mut self, event: &OrderEventAny) {
477 if self.config.debug {
478 log::debug!("{RECV}{EVT} {event:?}");
479 }
480
481 let client_order_id = event.client_order_id();
482 let borrowed_cache = self.cache.borrow();
483 let mut order = if let Some(order) = borrowed_cache.order(&client_order_id) {
484 order.clone()
485 } else {
486 log::warn!(
487 "Order with {} not found in the cache to apply {}",
488 event.client_order_id(),
489 event
490 );
491
492 let venue_order_id = if let Some(id) = event.venue_order_id() {
494 id
495 } else {
496 log::error!(
497 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
498 event.client_order_id()
499 );
500 return;
501 };
502
503 let client_order_id = if let Some(id) = borrowed_cache.client_order_id(&venue_order_id)
505 {
506 id
507 } else {
508 log::error!(
509 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
510 event.client_order_id(),
511 );
512 return;
513 };
514
515 if let Some(order) = borrowed_cache.order(client_order_id) {
517 log::info!("Order with {client_order_id} was found in the cache");
518 order.clone()
519 } else {
520 log::error!(
521 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
522 );
523 return;
524 }
525 };
526
527 drop(borrowed_cache);
528 match event {
529 OrderEventAny::Filled(order_filled) => {
530 let oms_type = self.determine_oms_type(order_filled);
531 let position_id = self.determine_position_id(*order_filled, oms_type);
532
533 let mut order_filled = *order_filled;
535 if order_filled.position_id.is_none() {
536 order_filled.position_id = Some(position_id);
537 }
538
539 self.apply_event_to_order(&mut order, OrderEventAny::Filled(order_filled));
540 self.handle_order_fill(&order, order_filled, oms_type);
541 }
542 _ => {
543 self.apply_event_to_order(&mut order, event.clone());
544 }
545 }
546 }
547
548 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
549 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
551 return *oms_type;
552 }
553
554 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
556 if let Some(client) = self.clients.get(client_id) {
557 return client.oms_type;
558 }
559 }
560
561 if let Some(client) = &self.default_client {
562 return client.oms_type;
563 }
564
565 OmsType::Netting }
567
568 fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
569 match oms_type {
570 OmsType::Hedging => self.determine_hedging_position_id(fill),
571 OmsType::Netting => self.determine_netting_position_id(fill),
572 _ => self.determine_netting_position_id(fill), }
574 }
575
576 fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
577 if let Some(position_id) = fill.position_id {
579 if self.config.debug {
580 log::debug!("Already had a position ID of: {}", position_id);
581 }
582 return position_id;
583 }
584
585 let cache = self.cache.borrow();
587 let order = match cache.order(&fill.client_order_id()) {
588 Some(o) => o,
589 None => {
590 panic!(
591 "Order for {} not found to determine position ID",
592 fill.client_order_id()
593 );
594 }
595 };
596
597 if let Some(spawn_id) = order.exec_spawn_id() {
599 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
600 for spawned_order in spawn_orders {
601 if let Some(pos_id) = spawned_order.position_id() {
602 if self.config.debug {
603 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
604 }
605 return pos_id;
606 }
607 }
608 }
609
610 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
612 if self.config.debug {
613 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
614 }
615 position_id
616 }
617
618 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
619 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
620 }
621
622 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
623 if let Err(e) = order.apply(event.clone()) {
624 match e {
625 OrderError::InvalidStateTransition => {
626 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
627 }
628 _ => {
629 log::error!("Error applying event: {e}, did not apply {event}");
630 }
631 }
632 return;
633 }
634
635 if let Err(e) = self.cache.borrow_mut().update_order(order) {
636 log::error!("Error updating order in cache: {e}");
637 }
638
639 let mut msgbus = self.msgbus.borrow_mut();
640 let topic = msgbus
641 .switchboard
642 .get_event_orders_topic(event.strategy_id());
643 msgbus.publish(&topic, order);
644
645 if self.config.snapshot_orders {
646 self.create_order_state_snapshot(order);
647 }
648 }
649
650 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
651 let instrument =
652 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
653 instrument.clone()
654 } else {
655 log::error!(
656 "Cannot handle order fill: no instrument found for {}, {fill}",
657 fill.instrument_id,
658 );
659 return;
660 };
661
662 if self.cache.borrow().account(&fill.account_id).is_none() {
663 log::error!(
664 "Cannot handle order fill: no account found for {}, {fill}",
665 fill.instrument_id.venue,
666 );
667 return;
668 }
669
670 let position_id = if let Some(position_id) = fill.position_id {
671 position_id
672 } else {
673 log::error!("Cannot handle order fill: no position ID found for fill {fill}",);
674 return;
675 };
676
677 let mut position = match self.cache.borrow().position(&position_id) {
678 Some(pos) if !pos.is_closed() => pos.clone(),
679 _ => self
680 .open_position(instrument.clone(), None, fill, oms_type)
681 .unwrap(),
682 };
683
684 if self.will_flip_position(&position, fill) {
685 self.flip_position(instrument, &mut position, fill, oms_type);
686 } else {
687 self.update_position(&mut position, fill);
688 }
689
690 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
691 for client_order_id in order.linked_order_ids().unwrap_or_default() {
692 let mut cache = self.cache.borrow_mut();
693 let contingent_order = cache.mut_order(&client_order_id);
694 if let Some(contingent_order) = contingent_order {
695 if contingent_order.position_id().is_none() {
696 contingent_order.set_position_id(Some(position_id));
697
698 if let Err(e) = self.cache.borrow_mut().add_position_id(
699 &position_id,
700 &contingent_order.instrument_id().venue,
701 &contingent_order.client_order_id(),
702 &contingent_order.strategy_id(),
703 ) {
704 log::error!("Failed to add position ID: {e}");
705 }
706 }
707 }
708 }
709 }
710 }
711
712 fn open_position(
713 &self,
714 instrument: InstrumentAny,
715 position: Option<&Position>,
716 fill: OrderFilled,
717 oms_type: OmsType,
718 ) -> anyhow::Result<Position> {
719 let position = if let Some(position) = position {
720 self.cache.borrow_mut().snapshot_position(position)?;
722 let mut position = position.clone();
723 position.apply(&fill);
724 self.cache.borrow_mut().update_position(&position)?;
725 position
726 } else {
727 let position = Position::new(&instrument, fill);
728 self.cache
729 .borrow_mut()
730 .add_position(position.clone(), oms_type)?;
731 if self.config.snapshot_positions {
732 self.create_position_state_snapshot(&position);
733 }
734 position
735 };
736
737 let ts_init = self.clock.borrow().timestamp_ns();
738 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
739 let mut msgbus = self.msgbus.borrow_mut();
740 let topic = msgbus
741 .switchboard
742 .get_event_positions_topic(event.strategy_id);
743 msgbus.publish(&topic, &event);
744
745 Ok(position)
746 }
747
748 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
749 position.apply(&fill);
750
751 if let Err(e) = self.cache.borrow_mut().update_position(position) {
752 log::error!("Failed to update position: {e:?}");
753 return;
754 }
755
756 if self.config.snapshot_positions {
757 self.create_position_state_snapshot(position);
758 }
759
760 let mut msgbus = self.msgbus.borrow_mut();
761 let topic = msgbus
762 .switchboard
763 .get_event_positions_topic(position.strategy_id);
764 let ts_init = self.clock.borrow().timestamp_ns();
765
766 if position.is_closed() {
767 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
768 msgbus.publish(&topic, &event);
769 } else {
770 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
771 msgbus.publish(&topic, &event);
772 }
773 }
774
775 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
776 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
777 }
778
779 fn flip_position(
780 &mut self,
781 instrument: InstrumentAny,
782 position: &mut Position,
783 fill: OrderFilled,
784 oms_type: OmsType,
785 ) {
786 let difference = match position.side {
787 PositionSide::Long => Quantity::from_raw(
788 fill.last_qty.raw - position.quantity.raw,
789 position.size_precision,
790 ),
791 PositionSide::Short => Quantity::from_raw(
792 position.quantity.raw - fill.last_qty.raw,
793 position.size_precision,
794 ),
795 _ => fill.last_qty,
796 };
797
798 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
800 let (commission1, commission2) = if let Some(commission) = fill.commission {
801 let commission_currency = commission.currency;
802 let commission1 = Money::new(commission * fill_percent, commission_currency);
803 let commission2 = commission - commission1;
804 (Some(commission1), Some(commission2))
805 } else {
806 log::error!("Commission is not available.");
807 (None, None)
808 };
809
810 let mut fill_split1: Option<OrderFilled> = None;
811 if position.is_open() {
812 fill_split1 = Some(OrderFilled::new(
813 fill.trader_id,
814 fill.strategy_id,
815 fill.instrument_id,
816 fill.client_order_id,
817 fill.venue_order_id,
818 fill.account_id,
819 fill.trade_id,
820 fill.order_side,
821 fill.order_type,
822 position.quantity,
823 fill.last_px,
824 fill.currency,
825 fill.liquidity_side,
826 UUID4::new(),
827 fill.ts_event,
828 fill.ts_init,
829 fill.reconciliation,
830 fill.position_id,
831 commission1,
832 ));
833
834 self.update_position(position, fill_split1.unwrap());
835 }
836
837 if difference.raw == 0 {
839 log::warn!(
840 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
841 );
842 return;
843 }
844
845 let position_id_flip = if oms_type == OmsType::Hedging {
846 if let Some(position_id) = fill.position_id {
847 if position_id.is_virtual() {
848 Some(self.pos_id_generator.generate(fill.strategy_id, true))
850 } else {
851 Some(position_id)
852 }
853 } else {
854 None
855 }
856 } else {
857 fill.position_id
858 };
859
860 let fill_split2 = OrderFilled::new(
861 fill.trader_id,
862 fill.strategy_id,
863 fill.instrument_id,
864 fill.client_order_id,
865 fill.venue_order_id,
866 fill.account_id,
867 fill.trade_id,
868 fill.order_side,
869 fill.order_type,
870 difference,
871 fill.last_px,
872 fill.currency,
873 fill.liquidity_side,
874 UUID4::new(),
875 fill.ts_event,
876 fill.ts_init,
877 fill.reconciliation,
878 position_id_flip,
879 commission2,
880 );
881
882 if oms_type == OmsType::Hedging {
883 if let Some(position_id) = fill.position_id {
884 if position_id.is_virtual() {
885 log::warn!("Closing position {fill_split1:?}");
886 log::warn!("Flipping position {fill_split2:?}");
887 }
888 }
889 }
890
891 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
893 log::error!("Failed to open flipped position: {e:?}");
894 }
895 }
896
897 fn set_position_id_counts(&mut self) {
900 let borrowed_cache = self.cache.borrow();
902 let positions = borrowed_cache.positions(None, None, None, None);
903
904 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
906
907 for position in positions {
908 *counts.entry(position.strategy_id).or_insert(0) += 1;
909 }
910
911 self.pos_id_generator.reset();
912
913 for (strategy_id, count) in counts {
914 self.pos_id_generator.set_count(count, strategy_id);
915 log::info!("Set PositionId count for {strategy_id} to {count}");
916 }
917 }
918
919 fn last_px_for_conversion(
920 &self,
921 instrument_id: &InstrumentId,
922 side: OrderSide,
923 ) -> Option<Price> {
924 let cache = self.cache.borrow();
925
926 if let Some(trade) = cache.trade(instrument_id) {
928 return Some(trade.price);
929 }
930
931 if let Some(quote) = cache.quote(instrument_id) {
933 match side {
934 OrderSide::Buy => Some(quote.ask_price),
935 OrderSide::Sell => Some(quote.bid_price),
936 OrderSide::NoOrderSide => None,
937 }
938 } else {
939 None
940 }
941 }
942
943 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
944 log::info!(
945 "Setting {} order quote quantity {} to base quantity {}",
946 order.instrument_id(),
947 order.quantity(),
948 base_qty
949 );
950
951 let original_qty = order.quantity();
952 order.set_quantity(base_qty);
953 order.set_leaves_qty(base_qty);
954 order.set_is_quote_quantity(false);
955
956 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
957 return;
958 }
959
960 if let Some(linked_order_ids) = order.linked_order_ids() {
961 for client_order_id in linked_order_ids {
962 match self.cache.borrow_mut().mut_order(&client_order_id) {
963 Some(contingent_order) => {
964 if !contingent_order.is_quote_quantity() {
965 continue; }
967
968 if contingent_order.quantity() != original_qty {
969 log::warn!(
970 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
971 contingent_order.quantity(),
972 original_qty,
973 base_qty
974 );
975 }
976
977 log::info!(
978 "Setting {} order quote quantity {} to base quantity {}",
979 contingent_order.instrument_id(),
980 contingent_order.quantity(),
981 base_qty
982 );
983
984 contingent_order.set_quantity(base_qty);
985 contingent_order.set_leaves_qty(base_qty);
986 contingent_order.set_is_quote_quantity(false);
987 }
988 None => {
989 log::error!("Contingency order {client_order_id} not found");
990 }
991 }
992 }
993 } else {
994 log::warn!(
995 "No linked order IDs found for order {}",
996 order.client_order_id()
997 );
998 }
999 }
1000
1001 fn deny_order(&self, order: &OrderAny, reason: &str) {
1002 log::error!(
1003 "Order denied: {reason}, order ID: {}",
1004 order.client_order_id()
1005 );
1006
1007 let denied = OrderDenied::new(
1008 order.trader_id(),
1009 order.strategy_id(),
1010 order.instrument_id(),
1011 order.client_order_id(),
1012 reason.into(),
1013 UUID4::new(),
1014 self.clock.borrow().timestamp_ns(),
1015 self.clock.borrow().timestamp_ns(),
1016 );
1017
1018 let mut order = order.clone();
1019
1020 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1021 log::error!("Failed to apply denied event to order: {e}");
1022 return;
1023 }
1024
1025 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1026 log::error!("Failed to update order in cache: {e}");
1027 return;
1028 }
1029
1030 let mut msgbus = self.msgbus.borrow_mut();
1031 let topic = msgbus
1032 .switchboard
1033 .get_event_orders_topic(order.strategy_id());
1034 msgbus.publish(&topic, &denied);
1035
1036 if self.config.snapshot_orders {
1037 self.create_order_state_snapshot(&order);
1038 }
1039 }
1040}
1041
1042#[cfg(test)]
1046mod tests {
1047 use std::{cell::RefCell, rc::Rc};
1048
1049 use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1050 use rstest::fixture;
1051
1052 use super::*;
1053
1054 #[fixture]
1055 fn msgbus() -> MessageBus {
1056 MessageBus::default()
1057 }
1058
1059 #[fixture]
1060 fn simple_cache() -> Cache {
1061 Cache::new(None, None)
1062 }
1063
1064 #[fixture]
1065 fn clock() -> TestClock {
1066 TestClock::new()
1067 }
1068
1069 fn _get_exec_engine(
1071 msgbus: Rc<RefCell<MessageBus>>,
1072 cache: Rc<RefCell<Cache>>,
1073 clock: Rc<RefCell<TestClock>>,
1074 config: Option<ExecutionEngineConfig>,
1075 ) -> ExecutionEngine {
1076 let config = config.unwrap_or_default();
1077 ExecutionEngine::new(clock, cache, msgbus, config)
1078 }
1079
1080 }