1pub mod config;
24
25use std::{
26 cell::{RefCell, RefMut},
27 collections::{HashMap, HashSet},
28 rc::Rc,
29 time::SystemTime,
30};
31
32use config::ExecutionEngineConfig;
33use nautilus_common::{
34 cache::Cache,
35 clock::Clock,
36 generators::position_id::PositionIdGenerator,
37 logging::{CMD, EVT, RECV},
38 msgbus::{
39 self, get_message_bus,
40 switchboard::{self},
41 },
42};
43use nautilus_core::UUID4;
44use nautilus_model::{
45 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
46 events::{
47 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
48 PositionOpened,
49 },
50 identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
51 instruments::{Instrument, InstrumentAny},
52 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
53 orders::{Order, OrderAny, OrderError},
54 position::Position,
55 types::{Money, Price, Quantity},
56};
57
58use crate::{
59 client::ExecutionClient,
60 messages::{
61 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
62 SubmitOrderList, TradingCommand,
63 },
64};
65
66pub struct ExecutionEngine {
67 clock: Rc<RefCell<dyn Clock>>,
68 cache: Rc<RefCell<Cache>>,
69 clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
70 default_client: Option<Rc<dyn ExecutionClient>>,
71 routing_map: HashMap<Venue, ClientId>,
72 oms_overrides: HashMap<StrategyId, OmsType>,
73 external_order_claims: HashMap<InstrumentId, StrategyId>,
74 pos_id_generator: PositionIdGenerator,
75 config: ExecutionEngineConfig,
76}
77
78impl ExecutionEngine {
79 pub fn new(
80 clock: Rc<RefCell<dyn Clock>>,
81 cache: Rc<RefCell<Cache>>,
82 config: Option<ExecutionEngineConfig>,
83 ) -> Self {
84 let trader_id = get_message_bus().borrow().trader_id;
85 Self {
86 clock: clock.clone(),
87 cache,
88 clients: HashMap::new(),
89 default_client: None,
90 routing_map: HashMap::new(),
91 oms_overrides: HashMap::new(),
92 external_order_claims: HashMap::new(),
93 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
94 config: config.unwrap_or_default(),
95 }
96 }
97
98 #[must_use]
99 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
100 self.pos_id_generator.count(strategy_id)
101 }
102
103 #[must_use]
104 pub fn check_integrity(&self) -> bool {
105 self.cache.borrow_mut().check_integrity()
106 }
107
108 #[must_use]
109 pub fn check_connected(&self) -> bool {
110 self.clients.values().all(|c| c.is_connected())
111 }
112
113 #[must_use]
114 pub fn check_disconnected(&self) -> bool {
115 self.clients.values().all(|c| !c.is_connected())
116 }
117
118 #[must_use]
119 pub fn check_residuals(&self) -> bool {
120 self.cache.borrow().check_residuals()
121 }
122
123 #[must_use]
124 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
125 self.external_order_claims.keys().copied().collect()
126 }
127
128 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
131 if self.clients.contains_key(&client.client_id()) {
132 anyhow::bail!("Client already registered with ID {}", client.client_id());
133 }
134
135 self.routing_map.insert(client.venue(), client.client_id());
137
138 log::info!("Registered client {}", client.client_id());
139 self.clients.insert(client.client_id(), client);
140 Ok(())
141 }
142
143 pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
144 log::info!("Registered default client {}", client.client_id());
145 self.default_client = Some(client);
146 }
147
148 #[must_use]
149 pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
150 self.clients.get(client_id).cloned()
151 }
152
153 pub fn register_venue_routing(
154 &mut self,
155 client_id: ClientId,
156 venue: Venue,
157 ) -> anyhow::Result<()> {
158 if !self.clients.contains_key(&client_id) {
159 anyhow::bail!("No client registered with ID {client_id}");
160 }
161
162 self.routing_map.insert(venue, client_id);
163 log::info!("Set client {client_id} routing for {venue}");
164 Ok(())
165 }
166
167 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
173 if self.clients.remove(&client_id).is_some() {
174 self.routing_map
176 .retain(|_, mapped_id| mapped_id != &client_id);
177 log::info!("Deregistered client {client_id}");
178 Ok(())
179 } else {
180 anyhow::bail!("No client registered with ID {client_id}")
181 }
182 }
183
184 #[allow(clippy::await_holding_refcell_ref)]
187 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
188 let ts = SystemTime::now();
189
190 {
191 let mut cache = self.cache.borrow_mut();
192 cache.clear_index();
193 cache.cache_general()?;
194 self.cache.borrow_mut().cache_all().await?;
195 cache.build_index();
196 let _ = cache.check_integrity();
197
198 if self.config.manage_own_order_books {
199 for order in cache.orders(None, None, None, None) {
200 if order.is_closed() || !should_handle_own_book_order(order) {
201 continue;
202 }
203 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
204 own_book.add(order.to_own_book_order());
205 }
206 }
207 }
208
209 self.set_position_id_counts();
210
211 log::info!(
212 "Loaded cache in {}ms",
213 SystemTime::now()
214 .duration_since(ts)
215 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
216 .as_millis()
217 );
218
219 Ok(())
220 }
221
222 pub fn flush_db(&self) {
223 self.cache.borrow_mut().flush_db();
224 }
225
226 pub fn process(&mut self, event: &OrderEventAny) {
227 self.handle_event(event);
228 }
229
230 pub fn execute(&self, command: TradingCommand) {
231 self.execute_command(command);
232 }
233
234 fn execute_command(&self, command: TradingCommand) {
237 if self.config.debug {
238 log::debug!("{RECV}{CMD} {command:?}");
239 }
240
241 let client: Rc<dyn ExecutionClient> = if let Some(client) = self
242 .clients
243 .get(&command.client_id())
244 .or_else(|| {
245 self.routing_map
246 .get(&command.instrument_id().venue)
247 .and_then(|client_id| self.clients.get(client_id))
248 })
249 .or(self.default_client.as_ref())
250 {
251 client.clone()
252 } else {
253 log::error!(
254 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
255 command.client_id(),
256 command.instrument_id().venue,
257 );
258 return;
259 };
260
261 match command {
262 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
263 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
264 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
265 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
266 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
267 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
268 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
269 }
270 }
271
272 fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, command: SubmitOrder) {
273 let mut order = command.order.clone();
274 let client_order_id = order.client_order_id();
275 let instrument_id = order.instrument_id();
276
277 if !self.cache.borrow().order_exists(&client_order_id) {
279 {
281 let mut cache = self.cache.borrow_mut();
282 if let Err(e) = cache.add_order(
283 order.clone(),
284 command.position_id,
285 Some(command.client_id),
286 true,
287 ) {
288 log::error!("Error adding order to cache: {e}");
289 return;
290 }
291 }
292
293 if self.config.snapshot_orders {
294 self.create_order_state_snapshot(&order);
295 }
296 }
297
298 let instrument = {
300 let cache = self.cache.borrow();
301 if let Some(instrument) = cache.instrument(&instrument_id) {
302 instrument.clone()
303 } else {
304 log::error!(
305 "Cannot handle submit order: no instrument found for {instrument_id}, {command}",
306 );
307 return;
308 }
309 };
310
311 if !instrument.is_inverse() && order.is_quote_quantity() {
313 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
314
315 if let Some(price) = last_px {
316 let base_qty = instrument.get_base_quantity(order.quantity(), price);
317 self.set_order_base_qty(&mut order, base_qty);
318 } else {
319 self.deny_order(
320 &order,
321 &format!("no-price-to-convert-quote-qty {instrument_id}"),
322 );
323 return;
324 }
325 }
326
327 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
328 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
329 own_book.add(order.to_own_book_order());
330 }
331
332 if let Err(e) = client.submit_order(command.clone()) {
335 log::error!("Error submitting order to client: {e}");
336 self.deny_order(
337 &command.order,
338 &format!("failed-to-submit-order-to-client: {e}"),
339 );
340 }
341 }
342
343 fn handle_submit_order_list(
344 &self,
345 client: Rc<dyn ExecutionClient>,
346 mut command: SubmitOrderList,
347 ) {
348 let orders = command.order_list.orders.clone();
349
350 let mut cache = self.cache.borrow_mut();
352 for order in &orders {
353 if !cache.order_exists(&order.client_order_id()) {
354 if let Err(e) = cache.add_order(
355 order.clone(),
356 command.position_id,
357 Some(command.client_id),
358 true,
359 ) {
360 log::error!("Error adding order to cache: {e}");
361 return;
362 }
363
364 if self.config.snapshot_orders {
365 self.create_order_state_snapshot(order);
366 }
367 }
368 }
369 drop(cache);
370
371 let cache = self.cache.borrow();
373 let instrument = if let Some(instrument) = cache.instrument(&command.instrument_id) {
374 instrument
375 } else {
376 log::error!(
377 "Cannot handle submit order list: no instrument found for {}, {command}",
378 command.instrument_id,
379 );
380 return;
381 };
382
383 if !instrument.is_inverse() && command.order_list.orders[0].is_quote_quantity() {
385 let mut quote_qty = None;
386 let mut last_px = None;
387
388 for order in &mut command.order_list.orders {
389 if !order.is_quote_quantity() {
390 continue; }
392
393 if Some(order.quantity()) != quote_qty {
394 last_px =
395 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
396 quote_qty = Some(order.quantity());
397 }
398
399 if let Some(px) = last_px {
400 let base_qty = instrument.get_base_quantity(order.quantity(), px);
401 self.set_order_base_qty(order, base_qty);
402 } else {
403 for order in &command.order_list.orders {
404 self.deny_order(
405 order,
406 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
407 );
408 }
409 return; }
411 }
412 }
413
414 if self.config.manage_own_order_books {
415 let mut own_book = self.get_or_init_own_order_book(&command.instrument_id);
416 for order in &command.order_list.orders {
417 if should_handle_own_book_order(order) {
418 own_book.add(order.to_own_book_order());
419 }
420 }
421 }
422
423 if let Err(e) = client.submit_order_list(command) {
425 log::error!("Error submitting order list to client: {e}");
426 for order in &orders {
427 self.deny_order(
428 order,
429 &format!("failed-to-submit-order-list-to-client: {e}"),
430 );
431 }
432 }
433 }
434
435 fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, command: ModifyOrder) {
436 if let Err(e) = client.modify_order(command) {
437 log::error!("Error modifying order: {e}");
438 }
439 }
440
441 fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, command: CancelOrder) {
442 if let Err(e) = client.cancel_order(command) {
443 log::error!("Error canceling order: {e}");
444 }
445 }
446
447 fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, command: CancelAllOrders) {
448 if let Err(e) = client.cancel_all_orders(command) {
449 log::error!("Error canceling all orders: {e}");
450 }
451 }
452
453 fn handle_batch_cancel_orders(
454 &self,
455 client: Rc<dyn ExecutionClient>,
456 command: BatchCancelOrders,
457 ) {
458 if let Err(e) = client.batch_cancel_orders(command) {
459 log::error!("Error batch canceling orders: {e}");
460 }
461 }
462
463 fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, command: QueryOrder) {
464 if let Err(e) = client.query_order(command) {
465 log::error!("Error querying order: {e}");
466 }
467 }
468
469 fn create_order_state_snapshot(&self, order: &OrderAny) {
470 if self.config.debug {
471 log::debug!("Creating order state snapshot for {order}");
472 }
473
474 if self.cache.borrow().has_backing() {
475 if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
476 log::error!("Failed to snapshot order state: {e}");
477 return;
478 }
479 }
480
481 if get_message_bus().borrow().has_backing {
482 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
483 msgbus::publish(&topic, order);
484 }
485 }
486
487 fn create_position_state_snapshot(&self, position: &Position) {
488 if self.config.debug {
489 log::debug!("Creating position state snapshot for {position}");
490 }
491
492 let topic = switchboard::get_positions_snapshots_topic(position.id);
498 msgbus::publish(&topic, position);
499 }
500
501 fn handle_event(&mut self, event: &OrderEventAny) {
504 if self.config.debug {
505 log::debug!("{RECV}{EVT} {event:?}");
506 }
507
508 let client_order_id = event.client_order_id();
509 let cache = self.cache.borrow();
510 let mut order = if let Some(order) = cache.order(&client_order_id) {
511 order.clone()
512 } else {
513 log::warn!(
514 "Order with {} not found in the cache to apply {}",
515 event.client_order_id(),
516 event
517 );
518
519 let venue_order_id = if let Some(id) = event.venue_order_id() {
521 id
522 } else {
523 log::error!(
524 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
525 event.client_order_id()
526 );
527 return;
528 };
529
530 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
532 id
533 } else {
534 log::error!(
535 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
536 event.client_order_id(),
537 );
538 return;
539 };
540
541 if let Some(order) = cache.order(client_order_id) {
543 log::info!("Order with {client_order_id} was found in the cache");
544 order.clone()
545 } else {
546 log::error!(
547 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
548 );
549 return;
550 }
551 };
552
553 drop(cache);
554 match event {
555 OrderEventAny::Filled(order_filled) => {
556 let oms_type = self.determine_oms_type(order_filled);
557 let position_id = self.determine_position_id(*order_filled, oms_type);
558
559 let mut order_filled = *order_filled;
561 if order_filled.position_id.is_none() {
562 order_filled.position_id = Some(position_id);
563 }
564
565 self.apply_event_to_order(&mut order, OrderEventAny::Filled(order_filled));
566 self.handle_order_fill(&order, order_filled, oms_type);
567 }
568 _ => {
569 self.apply_event_to_order(&mut order, event.clone());
570 }
571 }
572 }
573
574 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
575 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
577 return *oms_type;
578 }
579
580 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
582 if let Some(client) = self.clients.get(client_id) {
583 return client.oms_type();
584 }
585 }
586
587 if let Some(client) = &self.default_client {
588 return client.oms_type();
589 }
590
591 OmsType::Netting }
593
594 fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
595 match oms_type {
596 OmsType::Hedging => self.determine_hedging_position_id(fill),
597 OmsType::Netting => self.determine_netting_position_id(fill),
598 _ => self.determine_netting_position_id(fill), }
600 }
601
602 fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
603 if let Some(position_id) = fill.position_id {
605 if self.config.debug {
606 log::debug!("Already had a position ID of: {position_id}");
607 }
608 return position_id;
609 }
610
611 let cache = self.cache.borrow();
613 let order = match cache.order(&fill.client_order_id()) {
614 Some(o) => o,
615 None => {
616 panic!(
617 "Order for {} not found to determine position ID",
618 fill.client_order_id()
619 );
620 }
621 };
622
623 if let Some(spawn_id) = order.exec_spawn_id() {
625 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
626 for spawned_order in spawn_orders {
627 if let Some(pos_id) = spawned_order.position_id() {
628 if self.config.debug {
629 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
630 }
631 return pos_id;
632 }
633 }
634 }
635
636 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
638 if self.config.debug {
639 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
640 }
641 position_id
642 }
643
644 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
645 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
646 }
647
648 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
649 if let Err(e) = order.apply(event.clone()) {
650 match e {
651 OrderError::InvalidStateTransition => {
652 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
653 }
654 _ => {
655 log::error!("Error applying event: {e}, did not apply {event}");
656 }
657 }
658 return;
659 }
660
661 if let Err(e) = self.cache.borrow_mut().update_order(order) {
662 log::error!("Error updating order in cache: {e}");
663 }
664
665 let topic = switchboard::get_event_orders_topic(event.strategy_id());
666 msgbus::publish(&topic, order);
667
668 if self.config.snapshot_orders {
669 self.create_order_state_snapshot(order);
670 }
671 }
672
673 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
674 let instrument =
675 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
676 instrument.clone()
677 } else {
678 log::error!(
679 "Cannot handle order fill: no instrument found for {}, {fill}",
680 fill.instrument_id,
681 );
682 return;
683 };
684
685 if self.cache.borrow().account(&fill.account_id).is_none() {
686 log::error!(
687 "Cannot handle order fill: no account found for {}, {fill}",
688 fill.instrument_id.venue,
689 );
690 return;
691 }
692
693 let position_id = if let Some(position_id) = fill.position_id {
694 position_id
695 } else {
696 log::error!("Cannot handle order fill: no position ID found for fill {fill}");
697 return;
698 };
699
700 let mut position = match self.cache.borrow().position(&position_id) {
701 Some(pos) if !pos.is_closed() => pos.clone(),
702 _ => self
703 .open_position(instrument.clone(), None, fill, oms_type)
704 .unwrap(),
705 };
706
707 if self.will_flip_position(&position, fill) {
708 self.flip_position(instrument, &mut position, fill, oms_type);
709 } else {
710 self.update_position(&mut position, fill);
711 }
712
713 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
714 for client_order_id in order.linked_order_ids().unwrap_or_default() {
715 let mut cache = self.cache.borrow_mut();
716 let contingent_order = cache.mut_order(client_order_id);
717 if let Some(contingent_order) = contingent_order {
718 if contingent_order.position_id().is_none() {
719 contingent_order.set_position_id(Some(position_id));
720
721 if let Err(e) = self.cache.borrow_mut().add_position_id(
722 &position_id,
723 &contingent_order.instrument_id().venue,
724 &contingent_order.client_order_id(),
725 &contingent_order.strategy_id(),
726 ) {
727 log::error!("Failed to add position ID: {e}");
728 }
729 }
730 }
731 }
732 }
733 }
734
735 fn open_position(
736 &self,
737 instrument: InstrumentAny,
738 position: Option<&Position>,
739 fill: OrderFilled,
740 oms_type: OmsType,
741 ) -> anyhow::Result<Position> {
742 let position = if let Some(position) = position {
743 self.cache.borrow_mut().snapshot_position(position)?;
745 let mut position = position.clone();
746 position.apply(&fill);
747 self.cache.borrow_mut().update_position(&position)?;
748 position
749 } else {
750 let position = Position::new(&instrument, fill);
751 self.cache
752 .borrow_mut()
753 .add_position(position.clone(), oms_type)?;
754 if self.config.snapshot_positions {
755 self.create_position_state_snapshot(&position);
756 }
757 position
758 };
759
760 let ts_init = self.clock.borrow().timestamp_ns();
761 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
762 let topic = switchboard::get_event_positions_topic(event.strategy_id);
763 msgbus::publish(&topic, &event);
764
765 Ok(position)
766 }
767
768 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
769 position.apply(&fill);
770
771 if let Err(e) = self.cache.borrow_mut().update_position(position) {
772 log::error!("Failed to update position: {e:?}");
773 return;
774 }
775
776 if self.config.snapshot_positions {
777 self.create_position_state_snapshot(position);
778 }
779
780 let topic = switchboard::get_event_positions_topic(position.strategy_id);
781 let ts_init = self.clock.borrow().timestamp_ns();
782
783 if position.is_closed() {
784 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
785 msgbus::publish(&topic, &event);
786 } else {
787 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
788 msgbus::publish(&topic, &event);
789 }
790 }
791
792 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
793 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
794 }
795
796 fn flip_position(
797 &mut self,
798 instrument: InstrumentAny,
799 position: &mut Position,
800 fill: OrderFilled,
801 oms_type: OmsType,
802 ) {
803 let difference = match position.side {
804 PositionSide::Long => Quantity::from_raw(
805 fill.last_qty.raw - position.quantity.raw,
806 position.size_precision,
807 ),
808 PositionSide::Short => Quantity::from_raw(
809 position.quantity.raw - fill.last_qty.raw,
810 position.size_precision,
811 ),
812 _ => fill.last_qty,
813 };
814
815 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
817 let (commission1, commission2) = if let Some(commission) = fill.commission {
818 let commission_currency = commission.currency;
819 let commission1 = Money::new(commission * fill_percent, commission_currency);
820 let commission2 = commission - commission1;
821 (Some(commission1), Some(commission2))
822 } else {
823 log::error!("Commission is not available.");
824 (None, None)
825 };
826
827 let mut fill_split1: Option<OrderFilled> = None;
828 if position.is_open() {
829 fill_split1 = Some(OrderFilled::new(
830 fill.trader_id,
831 fill.strategy_id,
832 fill.instrument_id,
833 fill.client_order_id,
834 fill.venue_order_id,
835 fill.account_id,
836 fill.trade_id,
837 fill.order_side,
838 fill.order_type,
839 position.quantity,
840 fill.last_px,
841 fill.currency,
842 fill.liquidity_side,
843 UUID4::new(),
844 fill.ts_event,
845 fill.ts_init,
846 fill.reconciliation,
847 fill.position_id,
848 commission1,
849 ));
850
851 self.update_position(position, fill_split1.unwrap());
852 }
853
854 if difference.raw == 0 {
856 log::warn!(
857 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
858 );
859 return;
860 }
861
862 let position_id_flip = if oms_type == OmsType::Hedging {
863 if let Some(position_id) = fill.position_id {
864 if position_id.is_virtual() {
865 Some(self.pos_id_generator.generate(fill.strategy_id, true))
867 } else {
868 Some(position_id)
869 }
870 } else {
871 None
872 }
873 } else {
874 fill.position_id
875 };
876
877 let fill_split2 = OrderFilled::new(
878 fill.trader_id,
879 fill.strategy_id,
880 fill.instrument_id,
881 fill.client_order_id,
882 fill.venue_order_id,
883 fill.account_id,
884 fill.trade_id,
885 fill.order_side,
886 fill.order_type,
887 difference,
888 fill.last_px,
889 fill.currency,
890 fill.liquidity_side,
891 UUID4::new(),
892 fill.ts_event,
893 fill.ts_init,
894 fill.reconciliation,
895 position_id_flip,
896 commission2,
897 );
898
899 if oms_type == OmsType::Hedging {
900 if let Some(position_id) = fill.position_id {
901 if position_id.is_virtual() {
902 log::warn!("Closing position {fill_split1:?}");
903 log::warn!("Flipping position {fill_split2:?}");
904 }
905 }
906 }
907
908 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
910 log::error!("Failed to open flipped position: {e:?}");
911 }
912 }
913
914 fn set_position_id_counts(&mut self) {
917 let cache = self.cache.borrow();
919 let positions = cache.positions(None, None, None, None);
920
921 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
923
924 for position in positions {
925 *counts.entry(position.strategy_id).or_insert(0) += 1;
926 }
927
928 self.pos_id_generator.reset();
929
930 for (strategy_id, count) in counts {
931 self.pos_id_generator.set_count(count, strategy_id);
932 log::info!("Set PositionId count for {strategy_id} to {count}");
933 }
934 }
935
936 fn last_px_for_conversion(
937 &self,
938 instrument_id: &InstrumentId,
939 side: OrderSide,
940 ) -> Option<Price> {
941 let cache = self.cache.borrow();
942
943 if let Some(trade) = cache.trade(instrument_id) {
945 return Some(trade.price);
946 }
947
948 if let Some(quote) = cache.quote(instrument_id) {
950 match side {
951 OrderSide::Buy => Some(quote.ask_price),
952 OrderSide::Sell => Some(quote.bid_price),
953 OrderSide::NoOrderSide => None,
954 }
955 } else {
956 None
957 }
958 }
959
960 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
961 log::info!(
962 "Setting {} order quote quantity {} to base quantity {}",
963 order.instrument_id(),
964 order.quantity(),
965 base_qty
966 );
967
968 let original_qty = order.quantity();
969 order.set_quantity(base_qty);
970 order.set_leaves_qty(base_qty);
971 order.set_is_quote_quantity(false);
972
973 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
974 return;
975 }
976
977 if let Some(linked_order_ids) = order.linked_order_ids() {
978 for client_order_id in linked_order_ids {
979 match self.cache.borrow_mut().mut_order(client_order_id) {
980 Some(contingent_order) => {
981 if !contingent_order.is_quote_quantity() {
982 continue; }
984
985 if contingent_order.quantity() != original_qty {
986 log::warn!(
987 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
988 contingent_order.quantity(),
989 original_qty,
990 base_qty
991 );
992 }
993
994 log::info!(
995 "Setting {} order quote quantity {} to base quantity {}",
996 contingent_order.instrument_id(),
997 contingent_order.quantity(),
998 base_qty
999 );
1000
1001 contingent_order.set_quantity(base_qty);
1002 contingent_order.set_leaves_qty(base_qty);
1003 contingent_order.set_is_quote_quantity(false);
1004 }
1005 None => {
1006 log::error!("Contingency order {client_order_id} not found");
1007 }
1008 }
1009 }
1010 } else {
1011 log::warn!(
1012 "No linked order IDs found for order {}",
1013 order.client_order_id()
1014 );
1015 }
1016 }
1017
1018 fn deny_order(&self, order: &OrderAny, reason: &str) {
1019 log::error!(
1020 "Order denied: {reason}, order ID: {}",
1021 order.client_order_id()
1022 );
1023
1024 let denied = OrderDenied::new(
1025 order.trader_id(),
1026 order.strategy_id(),
1027 order.instrument_id(),
1028 order.client_order_id(),
1029 reason.into(),
1030 UUID4::new(),
1031 self.clock.borrow().timestamp_ns(),
1032 self.clock.borrow().timestamp_ns(),
1033 );
1034
1035 let mut order = order.clone();
1036
1037 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1038 log::error!("Failed to apply denied event to order: {e}");
1039 return;
1040 }
1041
1042 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1043 log::error!("Failed to update order in cache: {e}");
1044 return;
1045 }
1046
1047 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1048 msgbus::publish(&topic, &denied);
1049
1050 if self.config.snapshot_orders {
1051 self.create_order_state_snapshot(&order);
1052 }
1053 }
1054
1055 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<OwnOrderBook> {
1056 let mut cache = self.cache.borrow_mut();
1057 if cache.own_order_book_mut(instrument_id).is_none() {
1058 let own_book = OwnOrderBook::new(*instrument_id);
1059 cache.add_own_order_book(own_book).unwrap();
1060 }
1061
1062 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1063 }
1064}
1065
1066#[cfg(test)]
1070mod tests {
1071 use std::{cell::RefCell, rc::Rc};
1072
1073 use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1074 use rstest::fixture;
1075
1076 use super::*;
1077
1078 #[fixture]
1079 fn msgbus() -> MessageBus {
1080 MessageBus::default()
1081 }
1082
1083 #[fixture]
1084 fn simple_cache() -> Cache {
1085 Cache::new(None, None)
1086 }
1087
1088 #[fixture]
1089 fn clock() -> TestClock {
1090 TestClock::new()
1091 }
1092
1093 fn _get_exec_engine(
1095 cache: Rc<RefCell<Cache>>,
1096 clock: Rc<RefCell<TestClock>>,
1097 config: Option<ExecutionEngineConfig>,
1098 ) -> ExecutionEngine {
1099 ExecutionEngine::new(clock, cache, config)
1100 }
1101
1102 }