1pub mod config;
24
25use std::{
26 cell::{RefCell, RefMut},
27 collections::{HashMap, HashSet},
28 fmt::Debug,
29 rc::Rc,
30 time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35 cache::Cache,
36 clock::Clock,
37 generators::position_id::PositionIdGenerator,
38 logging::{CMD, EVT, RECV},
39 messages::execution::{
40 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
41 SubmitOrder, SubmitOrderList, TradingCommand,
42 },
43 msgbus::{
44 self, get_message_bus,
45 switchboard::{self},
46 },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51 events::{
52 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53 PositionOpened,
54 },
55 identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58 orders::{Order, OrderAny, OrderError},
59 position::Position,
60 types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65pub struct ExecutionEngine {
72 clock: Rc<RefCell<dyn Clock>>,
73 cache: Rc<RefCell<Cache>>,
74 clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
75 default_client: Option<Rc<dyn ExecutionClient>>,
76 routing_map: HashMap<Venue, ClientId>,
77 oms_overrides: HashMap<StrategyId, OmsType>,
78 external_order_claims: HashMap<InstrumentId, StrategyId>,
79 external_clients: HashSet<ClientId>,
80 pos_id_generator: PositionIdGenerator,
81 config: ExecutionEngineConfig,
82}
83
84impl Debug for ExecutionEngine {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct(stringify!(ExecutionEngine))
87 .field("client_count", &self.clients.len())
88 .finish()
89 }
90}
91
92impl ExecutionEngine {
93 pub fn new(
95 clock: Rc<RefCell<dyn Clock>>,
96 cache: Rc<RefCell<Cache>>,
97 config: Option<ExecutionEngineConfig>,
98 ) -> Self {
99 let trader_id = get_message_bus().borrow().trader_id;
100 Self {
101 clock: clock.clone(),
102 cache,
103 clients: HashMap::new(),
104 default_client: None,
105 routing_map: HashMap::new(),
106 oms_overrides: HashMap::new(),
107 external_order_claims: HashMap::new(),
108 external_clients: config
109 .as_ref()
110 .and_then(|c| c.external_clients.clone())
111 .unwrap_or_default()
112 .into_iter()
113 .collect(),
114 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
115 config: config.unwrap_or_default(),
116 }
117 }
118
119 #[must_use]
120 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
122 self.pos_id_generator.count(strategy_id)
123 }
124
125 #[must_use]
126 pub fn check_integrity(&self) -> bool {
128 self.cache.borrow_mut().check_integrity()
129 }
130
131 #[must_use]
132 pub fn check_connected(&self) -> bool {
134 self.clients.values().all(|c| c.is_connected())
135 }
136
137 #[must_use]
138 pub fn check_disconnected(&self) -> bool {
140 self.clients.values().all(|c| !c.is_connected())
141 }
142
143 #[must_use]
144 pub fn check_residuals(&self) -> bool {
146 self.cache.borrow().check_residuals()
147 }
148
149 #[must_use]
150 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
152 self.external_order_claims.keys().copied().collect()
153 }
154
155 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
163 if self.clients.contains_key(&client.client_id()) {
164 anyhow::bail!("Client already registered with ID {}", client.client_id());
165 }
166
167 self.routing_map.insert(client.venue(), client.client_id());
169
170 log::info!("Registered client {}", client.client_id());
171 self.clients.insert(client.client_id(), client);
172 Ok(())
173 }
174
175 pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
177 log::info!("Registered default client {}", client.client_id());
178 self.default_client = Some(client);
179 }
180
181 #[must_use]
182 pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
184 self.clients.get(client_id).cloned()
185 }
186
187 pub fn register_venue_routing(
193 &mut self,
194 client_id: ClientId,
195 venue: Venue,
196 ) -> anyhow::Result<()> {
197 if !self.clients.contains_key(&client_id) {
198 anyhow::bail!("No client registered with ID {client_id}");
199 }
200
201 self.routing_map.insert(venue, client_id);
202 log::info!("Set client {client_id} routing for {venue}");
203 Ok(())
204 }
205
206 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
215 if self.clients.remove(&client_id).is_some() {
216 self.routing_map
218 .retain(|_, mapped_id| mapped_id != &client_id);
219 log::info!("Deregistered client {client_id}");
220 Ok(())
221 } else {
222 anyhow::bail!("No client registered with ID {client_id}")
223 }
224 }
225
226 #[allow(clippy::await_holding_refcell_ref)]
229 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
235 let ts = SystemTime::now();
236
237 {
238 let mut cache = self.cache.borrow_mut();
239 cache.clear_index();
240 cache.cache_general()?;
241 self.cache.borrow_mut().cache_all().await?;
242 cache.build_index();
243 let _ = cache.check_integrity();
244
245 if self.config.manage_own_order_books {
246 for order in cache.orders(None, None, None, None) {
247 if order.is_closed() || !should_handle_own_book_order(order) {
248 continue;
249 }
250 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
251 own_book.add(order.to_own_book_order());
252 }
253 }
254 }
255
256 self.set_position_id_counts();
257
258 log::info!(
259 "Loaded cache in {}ms",
260 SystemTime::now()
261 .duration_since(ts)
262 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
263 .as_millis()
264 );
265
266 Ok(())
267 }
268
269 pub fn flush_db(&self) {
271 self.cache.borrow_mut().flush_db();
272 }
273
274 pub fn process(&mut self, event: &OrderEventAny) {
276 self.handle_event(event);
277 }
278
279 pub fn execute(&self, command: &TradingCommand) {
281 self.execute_command(command);
282 }
283
284 fn execute_command(&self, command: &TradingCommand) {
287 if self.config.debug {
288 log::debug!("{RECV}{CMD} {command:?}");
289 }
290
291 if self.external_clients.contains(&command.client_id()) {
292 if self.config.debug {
293 let cid = command.client_id();
294 log::debug!("Skipping execution command for external client {cid}: {command:?}");
295 }
296 return;
297 }
298
299 let client: Rc<dyn ExecutionClient> = if let Some(client) = self
300 .clients
301 .get(&command.client_id())
302 .or_else(|| {
303 self.routing_map
304 .get(&command.instrument_id().venue)
305 .and_then(|client_id| self.clients.get(client_id))
306 })
307 .or(self.default_client.as_ref())
308 {
309 client.clone()
310 } else {
311 log::error!(
312 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
313 command.client_id(),
314 command.instrument_id().venue,
315 );
316 return;
317 };
318
319 match command {
320 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
321 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
322 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
323 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
324 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
325 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
326 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
327 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
328 }
329 }
330
331 fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
332 let mut order = cmd.order.clone();
333 let client_order_id = order.client_order_id();
334 let instrument_id = order.instrument_id();
335
336 if !self.cache.borrow().order_exists(&client_order_id) {
338 {
340 let mut cache = self.cache.borrow_mut();
341 if let Err(e) =
342 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
343 {
344 log::error!("Error adding order to cache: {e}");
345 return;
346 }
347 }
348
349 if self.config.snapshot_orders {
350 self.create_order_state_snapshot(&order);
351 }
352 }
353
354 let instrument = {
356 let cache = self.cache.borrow();
357 if let Some(instrument) = cache.instrument(&instrument_id) {
358 instrument.clone()
359 } else {
360 log::error!(
361 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
362 );
363 return;
364 }
365 };
366
367 if !instrument.is_inverse() && order.is_quote_quantity() {
369 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
370
371 if let Some(price) = last_px {
372 let base_qty = instrument.get_base_quantity(order.quantity(), price);
373 self.set_order_base_qty(&mut order, base_qty);
374 } else {
375 self.deny_order(
376 &order,
377 &format!("no-price-to-convert-quote-qty {instrument_id}"),
378 );
379 return;
380 }
381 }
382
383 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
384 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
385 own_book.add(order.to_own_book_order());
386 }
387
388 if let Err(e) = client.submit_order(cmd) {
390 log::error!("Error submitting order to client: {e}");
391 self.deny_order(
392 &cmd.order,
393 &format!("failed-to-submit-order-to-client: {e}"),
394 );
395 }
396 }
397
398 fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
399 let orders = cmd.order_list.orders.clone();
400
401 let mut cache = self.cache.borrow_mut();
403 for order in &orders {
404 if !cache.order_exists(&order.client_order_id()) {
405 if let Err(e) =
406 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
407 {
408 log::error!("Error adding order to cache: {e}");
409 return;
410 }
411
412 if self.config.snapshot_orders {
413 self.create_order_state_snapshot(order);
414 }
415 }
416 }
417 drop(cache);
418
419 let cache = self.cache.borrow();
421 let instrument = if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
422 instrument
423 } else {
424 log::error!(
425 "Cannot handle submit order list: no instrument found for {}, {cmd}",
426 cmd.instrument_id,
427 );
428 return;
429 };
430
431 if !instrument.is_inverse() && cmd.order_list.orders[0].is_quote_quantity() {
433 let mut quote_qty = None;
434 let mut _last_px = None;
435
436 for order in &cmd.order_list.orders {
437 if !order.is_quote_quantity() {
438 continue; }
440
441 if Some(order.quantity()) != quote_qty {
442 _last_px =
443 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
444 quote_qty = Some(order.quantity());
445 }
446
447 }
461 }
462
463 if self.config.manage_own_order_books {
464 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
465 for order in &cmd.order_list.orders {
466 if should_handle_own_book_order(order) {
467 own_book.add(order.to_own_book_order());
468 }
469 }
470 }
471
472 if let Err(e) = client.submit_order_list(cmd) {
474 log::error!("Error submitting order list to client: {e}");
475 for order in &orders {
476 self.deny_order(
477 order,
478 &format!("failed-to-submit-order-list-to-client: {e}"),
479 );
480 }
481 }
482 }
483
484 fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
485 if let Err(e) = client.modify_order(cmd) {
486 log::error!("Error modifying order: {e}");
487 }
488 }
489
490 fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
491 if let Err(e) = client.cancel_order(cmd) {
492 log::error!("Error canceling order: {e}");
493 }
494 }
495
496 fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
497 if let Err(e) = client.cancel_all_orders(cmd) {
498 log::error!("Error canceling all orders: {e}");
499 }
500 }
501
502 fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
503 if let Err(e) = client.batch_cancel_orders(cmd) {
504 log::error!("Error batch canceling orders: {e}");
505 }
506 }
507
508 fn handle_query_account(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryAccount) {
509 if let Err(e) = client.query_account(cmd) {
510 log::error!("Error querying account: {e}");
511 }
512 }
513
514 fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
515 if let Err(e) = client.query_order(cmd) {
516 log::error!("Error querying order: {e}");
517 }
518 }
519
520 fn create_order_state_snapshot(&self, order: &OrderAny) {
521 if self.config.debug {
522 log::debug!("Creating order state snapshot for {order}");
523 }
524
525 if self.cache.borrow().has_backing()
526 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
527 {
528 log::error!("Failed to snapshot order state: {e}");
529 return;
530 }
531
532 if get_message_bus().borrow().has_backing {
533 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
534 msgbus::publish(topic, order);
535 }
536 }
537
538 fn create_position_state_snapshot(&self, position: &Position) {
539 if self.config.debug {
540 log::debug!("Creating position state snapshot for {position}");
541 }
542
543 let topic = switchboard::get_positions_snapshots_topic(position.id);
549 msgbus::publish(topic, position);
550 }
551
552 fn handle_event(&mut self, event: &OrderEventAny) {
555 if self.config.debug {
556 log::debug!("{RECV}{EVT} {event:?}");
557 }
558
559 let client_order_id = event.client_order_id();
560 let cache = self.cache.borrow();
561 let mut order = if let Some(order) = cache.order(&client_order_id) {
562 order.clone()
563 } else {
564 log::warn!(
565 "Order with {} not found in the cache to apply {}",
566 event.client_order_id(),
567 event
568 );
569
570 let venue_order_id = if let Some(id) = event.venue_order_id() {
572 id
573 } else {
574 log::error!(
575 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
576 event.client_order_id()
577 );
578 return;
579 };
580
581 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
583 id
584 } else {
585 log::error!(
586 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
587 event.client_order_id(),
588 );
589 return;
590 };
591
592 if let Some(order) = cache.order(client_order_id) {
594 log::info!("Order with {client_order_id} was found in the cache");
595 order.clone()
596 } else {
597 log::error!(
598 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
599 );
600 return;
601 }
602 };
603
604 drop(cache);
605 match event {
606 OrderEventAny::Filled(fill) => {
607 let oms_type = self.determine_oms_type(fill);
608 let position_id = self.determine_position_id(*fill, oms_type);
609
610 let mut fill = *fill;
612 if fill.position_id.is_none() {
613 fill.position_id = Some(position_id);
614 }
615
616 self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
617 self.handle_order_fill(&order, fill, oms_type);
618 }
619 _ => {
620 self.apply_event_to_order(&mut order, event.clone());
621 }
622 }
623 }
624
625 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
626 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
628 return *oms_type;
629 }
630
631 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
633 && let Some(client) = self.clients.get(client_id)
634 {
635 return client.oms_type();
636 }
637
638 if let Some(client) = &self.default_client {
639 return client.oms_type();
640 }
641
642 OmsType::Netting }
644
645 fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
646 match oms_type {
647 OmsType::Hedging => self.determine_hedging_position_id(fill),
648 OmsType::Netting => self.determine_netting_position_id(fill),
649 _ => self.determine_netting_position_id(fill), }
651 }
652
653 fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
654 if let Some(position_id) = fill.position_id {
656 if self.config.debug {
657 log::debug!("Already had a position ID of: {position_id}");
658 }
659 return position_id;
660 }
661
662 let cache = self.cache.borrow();
664 let order = match cache.order(&fill.client_order_id()) {
665 Some(o) => o,
666 None => {
667 panic!(
668 "Order for {} not found to determine position ID",
669 fill.client_order_id()
670 );
671 }
672 };
673
674 if let Some(spawn_id) = order.exec_spawn_id() {
676 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
677 for spawned_order in spawn_orders {
678 if let Some(pos_id) = spawned_order.position_id() {
679 if self.config.debug {
680 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
681 }
682 return pos_id;
683 }
684 }
685 }
686
687 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
689 if self.config.debug {
690 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
691 }
692 position_id
693 }
694
695 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
696 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
697 }
698
699 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
700 if let Err(e) = order.apply(event.clone()) {
701 if matches!(e, OrderError::InvalidStateTransition) {
702 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
703 } else {
704 log::error!("Error applying event: {e}, did not apply {event}");
707 if should_handle_own_book_order(order) {
708 self.cache.borrow_mut().update_own_order_book(order);
709 }
710 }
711 return;
712 }
713
714 if let Err(e) = self.cache.borrow_mut().update_order(order) {
715 log::error!("Error updating order in cache: {e}");
716 }
717
718 let topic = switchboard::get_event_orders_topic(event.strategy_id());
719 msgbus::publish(topic, order);
720
721 if self.config.snapshot_orders {
722 self.create_order_state_snapshot(order);
723 }
724 }
725
726 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
727 let instrument =
728 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
729 instrument.clone()
730 } else {
731 log::error!(
732 "Cannot handle order fill: no instrument found for {}, {fill}",
733 fill.instrument_id,
734 );
735 return;
736 };
737
738 if self.cache.borrow().account(&fill.account_id).is_none() {
739 log::error!(
740 "Cannot handle order fill: no account found for {}, {fill}",
741 fill.instrument_id.venue,
742 );
743 return;
744 }
745
746 let position = if instrument.is_spread() {
749 None
750 } else {
751 self.handle_position_update(instrument.clone(), fill, oms_type);
752 let position_id = fill.position_id.unwrap();
753 self.cache.borrow().position(&position_id).cloned()
754 };
755
756 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
759 if !instrument.is_spread()
761 && let Some(ref pos) = position
762 && pos.is_open()
763 {
764 let position_id = pos.id;
765 for client_order_id in order.linked_order_ids().unwrap_or_default() {
766 let mut cache = self.cache.borrow_mut();
767 let contingent_order = cache.mut_order(client_order_id);
768 if let Some(contingent_order) = contingent_order
769 && contingent_order.position_id().is_none()
770 {
771 contingent_order.set_position_id(Some(position_id));
772
773 if let Err(e) = self.cache.borrow_mut().add_position_id(
774 &position_id,
775 &contingent_order.instrument_id().venue,
776 &contingent_order.client_order_id(),
777 &contingent_order.strategy_id(),
778 ) {
779 log::error!("Failed to add position ID: {e}");
780 }
781 }
782 }
783 }
784 }
787 }
788
789 fn handle_position_update(
793 &mut self,
794 instrument: InstrumentAny,
795 fill: OrderFilled,
796 oms_type: OmsType,
797 ) {
798 let position_id = if let Some(position_id) = fill.position_id {
799 position_id
800 } else {
801 log::error!("Cannot handle position update: no position ID found for fill {fill}");
802 return;
803 };
804
805 let position_opt = self.cache.borrow().position(&position_id).cloned();
806
807 match position_opt {
808 None => {
809 if self.open_position(instrument, None, fill, oms_type).is_ok() {
811 }
813 }
814 Some(pos) if pos.is_closed() => {
815 if self
817 .open_position(instrument, Some(&pos), fill, oms_type)
818 .is_ok()
819 {
820 }
822 }
823 Some(mut pos) => {
824 if self.will_flip_position(&pos, fill) {
825 self.flip_position(instrument, &mut pos, fill, oms_type);
827 } else {
828 self.update_position(&mut pos, fill);
830 }
831 }
832 }
833 }
834
835 fn open_position(
836 &self,
837 instrument: InstrumentAny,
838 position: Option<&Position>,
839 fill: OrderFilled,
840 oms_type: OmsType,
841 ) -> anyhow::Result<Position> {
842 let position = if let Some(position) = position {
843 self.cache.borrow_mut().snapshot_position(position)?;
845 let mut position = position.clone();
846 position.apply(&fill);
847 self.cache.borrow_mut().update_position(&position)?;
848 position
849 } else {
850 let position = Position::new(&instrument, fill);
851 self.cache
852 .borrow_mut()
853 .add_position(position.clone(), oms_type)?;
854 if self.config.snapshot_positions {
855 self.create_position_state_snapshot(&position);
856 }
857 position
858 };
859
860 let ts_init = self.clock.borrow().timestamp_ns();
861 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
862 let topic = switchboard::get_event_positions_topic(event.strategy_id);
863 msgbus::publish(topic, &event);
864
865 Ok(position)
866 }
867
868 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
869 position.apply(&fill);
871
872 let is_closed = position.is_closed();
874
875 if let Err(e) = self.cache.borrow_mut().update_position(position) {
877 log::error!("Failed to update position: {e:?}");
878 return;
879 }
880
881 let cache = self.cache.borrow();
883
884 drop(cache);
885
886 if self.config.snapshot_positions {
888 self.create_position_state_snapshot(position);
889 }
890
891 let topic = switchboard::get_event_positions_topic(position.strategy_id);
893 let ts_init = self.clock.borrow().timestamp_ns();
894
895 if is_closed {
896 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
897 msgbus::publish(topic, &event);
898 } else {
899 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
900 msgbus::publish(topic, &event);
901 }
902 }
903
904 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
905 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
906 }
907
908 fn flip_position(
909 &mut self,
910 instrument: InstrumentAny,
911 position: &mut Position,
912 fill: OrderFilled,
913 oms_type: OmsType,
914 ) {
915 let difference = match position.side {
916 PositionSide::Long => Quantity::from_raw(
917 fill.last_qty.raw - position.quantity.raw,
918 position.size_precision,
919 ),
920 PositionSide::Short => Quantity::from_raw(
921 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
923 ),
924 _ => fill.last_qty,
925 };
926
927 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
929 let (commission1, commission2) = if let Some(commission) = fill.commission {
930 let commission_currency = commission.currency;
931 let commission1 = Money::new(commission * fill_percent, commission_currency);
932 let commission2 = commission - commission1;
933 (Some(commission1), Some(commission2))
934 } else {
935 log::error!("Commission is not available.");
936 (None, None)
937 };
938
939 let mut fill_split1: Option<OrderFilled> = None;
940 if position.is_open() {
941 fill_split1 = Some(OrderFilled::new(
942 fill.trader_id,
943 fill.strategy_id,
944 fill.instrument_id,
945 fill.client_order_id,
946 fill.venue_order_id,
947 fill.account_id,
948 fill.trade_id,
949 fill.order_side,
950 fill.order_type,
951 position.quantity,
952 fill.last_px,
953 fill.currency,
954 fill.liquidity_side,
955 UUID4::new(),
956 fill.ts_event,
957 fill.ts_init,
958 fill.reconciliation,
959 fill.position_id,
960 commission1,
961 ));
962
963 self.update_position(position, fill_split1.unwrap());
964 }
965
966 if difference.raw == 0 {
968 log::warn!(
969 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
970 );
971 return;
972 }
973
974 let position_id_flip = if oms_type == OmsType::Hedging
975 && let Some(position_id) = fill.position_id
976 && position_id.is_virtual()
977 {
978 Some(self.pos_id_generator.generate(fill.strategy_id, true))
980 } else {
981 fill.position_id
983 };
984
985 let fill_split2 = OrderFilled::new(
986 fill.trader_id,
987 fill.strategy_id,
988 fill.instrument_id,
989 fill.client_order_id,
990 fill.venue_order_id,
991 fill.account_id,
992 fill.trade_id,
993 fill.order_side,
994 fill.order_type,
995 difference,
996 fill.last_px,
997 fill.currency,
998 fill.liquidity_side,
999 UUID4::new(),
1000 fill.ts_event,
1001 fill.ts_init,
1002 fill.reconciliation,
1003 position_id_flip,
1004 commission2,
1005 );
1006
1007 if oms_type == OmsType::Hedging
1008 && let Some(position_id) = fill.position_id
1009 && position_id.is_virtual()
1010 {
1011 log::warn!("Closing position {fill_split1:?}");
1012 log::warn!("Flipping position {fill_split2:?}");
1013 }
1014 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1016 log::error!("Failed to open flipped position: {e:?}");
1017 }
1018 }
1019
1020 fn set_position_id_counts(&mut self) {
1023 let cache = self.cache.borrow();
1025 let positions = cache.positions(None, None, None, None);
1026
1027 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1029
1030 for position in positions {
1031 *counts.entry(position.strategy_id).or_insert(0) += 1;
1032 }
1033
1034 self.pos_id_generator.reset();
1035
1036 for (strategy_id, count) in counts {
1037 self.pos_id_generator.set_count(count, strategy_id);
1038 log::info!("Set PositionId count for {strategy_id} to {count}");
1039 }
1040 }
1041
1042 fn last_px_for_conversion(
1043 &self,
1044 instrument_id: &InstrumentId,
1045 side: OrderSide,
1046 ) -> Option<Price> {
1047 let cache = self.cache.borrow();
1048
1049 if let Some(trade) = cache.trade(instrument_id) {
1051 return Some(trade.price);
1052 }
1053
1054 if let Some(quote) = cache.quote(instrument_id) {
1056 match side {
1057 OrderSide::Buy => Some(quote.ask_price),
1058 OrderSide::Sell => Some(quote.bid_price),
1059 OrderSide::NoOrderSide => None,
1060 }
1061 } else {
1062 None
1063 }
1064 }
1065
1066 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1067 log::info!(
1068 "Setting {} order quote quantity {} to base quantity {}",
1069 order.instrument_id(),
1070 order.quantity(),
1071 base_qty
1072 );
1073
1074 let original_qty = order.quantity();
1075 order.set_quantity(base_qty);
1076 order.set_leaves_qty(base_qty);
1077 order.set_is_quote_quantity(false);
1078
1079 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1080 return;
1081 }
1082
1083 if let Some(linked_order_ids) = order.linked_order_ids() {
1084 for client_order_id in linked_order_ids {
1085 match self.cache.borrow_mut().mut_order(client_order_id) {
1086 Some(contingent_order) => {
1087 if !contingent_order.is_quote_quantity() {
1088 continue; }
1090
1091 if contingent_order.quantity() != original_qty {
1092 log::warn!(
1093 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1094 contingent_order.quantity(),
1095 original_qty,
1096 base_qty
1097 );
1098 }
1099
1100 log::info!(
1101 "Setting {} order quote quantity {} to base quantity {}",
1102 contingent_order.instrument_id(),
1103 contingent_order.quantity(),
1104 base_qty
1105 );
1106
1107 contingent_order.set_quantity(base_qty);
1108 contingent_order.set_leaves_qty(base_qty);
1109 contingent_order.set_is_quote_quantity(false);
1110 }
1111 None => {
1112 log::error!("Contingency order {client_order_id} not found");
1113 }
1114 }
1115 }
1116 } else {
1117 log::warn!(
1118 "No linked order IDs found for order {}",
1119 order.client_order_id()
1120 );
1121 }
1122 }
1123
1124 fn deny_order(&self, order: &OrderAny, reason: &str) {
1125 log::error!(
1126 "Order denied: {reason}, order ID: {}",
1127 order.client_order_id()
1128 );
1129
1130 let denied = OrderDenied::new(
1131 order.trader_id(),
1132 order.strategy_id(),
1133 order.instrument_id(),
1134 order.client_order_id(),
1135 reason.into(),
1136 UUID4::new(),
1137 self.clock.borrow().timestamp_ns(),
1138 self.clock.borrow().timestamp_ns(),
1139 );
1140
1141 let mut order = order.clone();
1142
1143 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1144 log::error!("Failed to apply denied event to order: {e}");
1145 return;
1146 }
1147
1148 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1149 log::error!("Failed to update order in cache: {e}");
1150 return;
1151 }
1152
1153 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1154 msgbus::publish(topic, &denied);
1155
1156 if self.config.snapshot_orders {
1157 self.create_order_state_snapshot(&order);
1158 }
1159 }
1160
1161 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1162 let mut cache = self.cache.borrow_mut();
1163 if cache.own_order_book_mut(instrument_id).is_none() {
1164 let own_book = OwnOrderBook::new(*instrument_id);
1165 cache.add_own_order_book(own_book).unwrap();
1166 }
1167
1168 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1169 }
1170}
1171
1172mod stubs;
1176#[cfg(test)]
1177mod tests;