1pub mod config;
24
25use std::{
26 cell::{RefCell, RefMut},
27 collections::{HashMap, HashSet},
28 fmt::Debug,
29 rc::Rc,
30 time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35 cache::Cache,
36 clock::Clock,
37 generators::position_id::PositionIdGenerator,
38 logging::{CMD, EVT, RECV},
39 messages::execution::{
40 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
41 SubmitOrder, SubmitOrderList, TradingCommand,
42 },
43 msgbus::{
44 self, get_message_bus,
45 switchboard::{self},
46 },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51 events::{
52 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53 PositionOpened,
54 },
55 identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58 orders::{Order, OrderAny, OrderError},
59 position::Position,
60 types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65pub struct ExecutionEngine {
72 clock: Rc<RefCell<dyn Clock>>,
73 cache: Rc<RefCell<Cache>>,
74 clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
75 default_client: Option<Rc<dyn ExecutionClient>>,
76 routing_map: HashMap<Venue, ClientId>,
77 oms_overrides: HashMap<StrategyId, OmsType>,
78 external_order_claims: HashMap<InstrumentId, StrategyId>,
79 external_clients: HashSet<ClientId>,
80 pos_id_generator: PositionIdGenerator,
81 config: ExecutionEngineConfig,
82}
83
84impl Debug for ExecutionEngine {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct(stringify!(ExecutionEngine))
87 .field("client_count", &self.clients.len())
88 .finish()
89 }
90}
91
92impl ExecutionEngine {
93 pub fn new(
95 clock: Rc<RefCell<dyn Clock>>,
96 cache: Rc<RefCell<Cache>>,
97 config: Option<ExecutionEngineConfig>,
98 ) -> Self {
99 let trader_id = get_message_bus().borrow().trader_id;
100 Self {
101 clock: clock.clone(),
102 cache,
103 clients: HashMap::new(),
104 default_client: None,
105 routing_map: HashMap::new(),
106 oms_overrides: HashMap::new(),
107 external_order_claims: HashMap::new(),
108 external_clients: config
109 .as_ref()
110 .and_then(|c| c.external_clients.clone())
111 .unwrap_or_default()
112 .into_iter()
113 .collect(),
114 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
115 config: config.unwrap_or_default(),
116 }
117 }
118
119 #[must_use]
120 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
122 self.pos_id_generator.count(strategy_id)
123 }
124
125 #[must_use]
126 pub fn check_integrity(&self) -> bool {
128 self.cache.borrow_mut().check_integrity()
129 }
130
131 #[must_use]
132 pub fn check_connected(&self) -> bool {
134 self.clients.values().all(|c| c.is_connected())
135 }
136
137 #[must_use]
138 pub fn check_disconnected(&self) -> bool {
140 self.clients.values().all(|c| !c.is_connected())
141 }
142
143 #[must_use]
144 pub fn check_residuals(&self) -> bool {
146 self.cache.borrow().check_residuals()
147 }
148
149 #[must_use]
150 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
152 self.external_order_claims.keys().copied().collect()
153 }
154
155 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
163 if self.clients.contains_key(&client.client_id()) {
164 anyhow::bail!("Client already registered with ID {}", client.client_id());
165 }
166
167 self.routing_map.insert(client.venue(), client.client_id());
169
170 log::info!("Registered client {}", client.client_id());
171 self.clients.insert(client.client_id(), client);
172 Ok(())
173 }
174
175 pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
177 log::info!("Registered default client {}", client.client_id());
178 self.default_client = Some(client);
179 }
180
181 #[must_use]
182 pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
184 self.clients.get(client_id).cloned()
185 }
186
187 pub fn register_venue_routing(
193 &mut self,
194 client_id: ClientId,
195 venue: Venue,
196 ) -> anyhow::Result<()> {
197 if !self.clients.contains_key(&client_id) {
198 anyhow::bail!("No client registered with ID {client_id}");
199 }
200
201 self.routing_map.insert(venue, client_id);
202 log::info!("Set client {client_id} routing for {venue}");
203 Ok(())
204 }
205
206 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
215 if self.clients.remove(&client_id).is_some() {
216 self.routing_map
218 .retain(|_, mapped_id| mapped_id != &client_id);
219 log::info!("Deregistered client {client_id}");
220 Ok(())
221 } else {
222 anyhow::bail!("No client registered with ID {client_id}")
223 }
224 }
225
226 #[allow(clippy::await_holding_refcell_ref)]
229 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
235 let ts = SystemTime::now();
236
237 {
238 let mut cache = self.cache.borrow_mut();
239 cache.clear_index();
240 cache.cache_general()?;
241 self.cache.borrow_mut().cache_all().await?;
242 cache.build_index();
243 let _ = cache.check_integrity();
244
245 if self.config.manage_own_order_books {
246 for order in cache.orders(None, None, None, None) {
247 if order.is_closed() || !should_handle_own_book_order(order) {
248 continue;
249 }
250 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
251 own_book.add(order.to_own_book_order());
252 }
253 }
254 }
255
256 self.set_position_id_counts();
257
258 log::info!(
259 "Loaded cache in {}ms",
260 SystemTime::now()
261 .duration_since(ts)
262 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
263 .as_millis()
264 );
265
266 Ok(())
267 }
268
269 pub fn flush_db(&self) {
271 self.cache.borrow_mut().flush_db();
272 }
273
274 pub fn process(&mut self, event: &OrderEventAny) {
276 self.handle_event(event);
277 }
278
279 pub fn execute(&self, command: &TradingCommand) {
281 self.execute_command(command);
282 }
283
284 fn execute_command(&self, command: &TradingCommand) {
287 if self.config.debug {
288 log::debug!("{RECV}{CMD} {command:?}");
289 }
290
291 if self.external_clients.contains(&command.client_id()) {
292 if self.config.debug {
293 let cid = command.client_id();
294 log::debug!("Skipping execution command for external client {cid}: {command:?}");
295 }
296 return;
297 }
298
299 let client: Rc<dyn ExecutionClient> = if let Some(client) = self
300 .clients
301 .get(&command.client_id())
302 .or_else(|| {
303 self.routing_map
304 .get(&command.instrument_id().venue)
305 .and_then(|client_id| self.clients.get(client_id))
306 })
307 .or(self.default_client.as_ref())
308 {
309 client.clone()
310 } else {
311 log::error!(
312 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
313 command.client_id(),
314 command.instrument_id().venue,
315 );
316 return;
317 };
318
319 match command {
320 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
321 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
322 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
323 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
324 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
325 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
326 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
327 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
328 }
329 }
330
331 fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
332 let mut order = cmd.order.clone();
333 let client_order_id = order.client_order_id();
334 let instrument_id = order.instrument_id();
335
336 if !self.cache.borrow().order_exists(&client_order_id) {
338 {
340 let mut cache = self.cache.borrow_mut();
341 if let Err(e) =
342 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
343 {
344 log::error!("Error adding order to cache: {e}");
345 return;
346 }
347 }
348
349 if self.config.snapshot_orders {
350 self.create_order_state_snapshot(&order);
351 }
352 }
353
354 let instrument = {
356 let cache = self.cache.borrow();
357 if let Some(instrument) = cache.instrument(&instrument_id) {
358 instrument.clone()
359 } else {
360 log::error!(
361 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
362 );
363 return;
364 }
365 };
366
367 if self.config.convert_quote_qty_to_base
369 && !instrument.is_inverse()
370 && order.is_quote_quantity()
371 {
372 log::warn!(
373 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
374 );
375 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
376
377 if let Some(price) = last_px {
378 let base_qty = instrument.get_base_quantity(order.quantity(), price);
379 self.set_order_base_qty(&mut order, base_qty);
380 } else {
381 self.deny_order(
382 &order,
383 &format!("no-price-to-convert-quote-qty {instrument_id}"),
384 );
385 return;
386 }
387 }
388
389 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
390 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
391 own_book.add(order.to_own_book_order());
392 }
393
394 if let Err(e) = client.submit_order(cmd) {
396 log::error!("Error submitting order to client: {e}");
397 self.deny_order(
398 &cmd.order,
399 &format!("failed-to-submit-order-to-client: {e}"),
400 );
401 }
402 }
403
404 fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
405 let orders = cmd.order_list.orders.clone();
406
407 let mut cache = self.cache.borrow_mut();
408 for order in &orders {
409 if !cache.order_exists(&order.client_order_id()) {
410 if let Err(e) =
411 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
412 {
413 log::error!("Error adding order to cache: {e}");
414 return;
415 }
416
417 if self.config.snapshot_orders {
418 self.create_order_state_snapshot(order);
419 }
420 }
421 }
422 drop(cache);
423
424 let instrument = {
425 let cache = self.cache.borrow();
426 if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
427 instrument.clone()
428 } else {
429 log::error!(
430 "Cannot handle submit order list: no instrument found for {}, {cmd}",
431 cmd.instrument_id,
432 );
433 return;
434 }
435 };
436
437 if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
439 let mut conversions: Vec<(ClientOrderId, Quantity)> =
440 Vec::with_capacity(cmd.order_list.orders.len());
441
442 for order in &cmd.order_list.orders {
443 if !order.is_quote_quantity() {
444 continue; }
446
447 let last_px =
448 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
449
450 if let Some(px) = last_px {
451 let base_qty = instrument.get_base_quantity(order.quantity(), px);
452 conversions.push((order.client_order_id(), base_qty));
453 } else {
454 for order in &cmd.order_list.orders {
455 self.deny_order(
456 order,
457 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
458 );
459 }
460 return; }
462 }
463
464 if !conversions.is_empty() {
465 log::warn!(
466 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
467 );
468
469 let mut cache = self.cache.borrow_mut();
470 for (client_order_id, base_qty) in conversions {
471 if let Some(mut_order) = cache.mut_order(&client_order_id) {
472 self.set_order_base_qty(mut_order, base_qty);
473 }
474 }
475 }
476 }
477
478 if self.config.manage_own_order_books {
479 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
480 for order in &cmd.order_list.orders {
481 if should_handle_own_book_order(order) {
482 own_book.add(order.to_own_book_order());
483 }
484 }
485 }
486
487 if let Err(e) = client.submit_order_list(cmd) {
489 log::error!("Error submitting order list to client: {e}");
490 for order in &orders {
491 self.deny_order(
492 order,
493 &format!("failed-to-submit-order-list-to-client: {e}"),
494 );
495 }
496 }
497 }
498
499 fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
500 if let Err(e) = client.modify_order(cmd) {
501 log::error!("Error modifying order: {e}");
502 }
503 }
504
505 fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
506 if let Err(e) = client.cancel_order(cmd) {
507 log::error!("Error canceling order: {e}");
508 }
509 }
510
511 fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
512 if let Err(e) = client.cancel_all_orders(cmd) {
513 log::error!("Error canceling all orders: {e}");
514 }
515 }
516
517 fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
518 if let Err(e) = client.batch_cancel_orders(cmd) {
519 log::error!("Error batch canceling orders: {e}");
520 }
521 }
522
523 fn handle_query_account(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryAccount) {
524 if let Err(e) = client.query_account(cmd) {
525 log::error!("Error querying account: {e}");
526 }
527 }
528
529 fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
530 if let Err(e) = client.query_order(cmd) {
531 log::error!("Error querying order: {e}");
532 }
533 }
534
535 fn create_order_state_snapshot(&self, order: &OrderAny) {
536 if self.config.debug {
537 log::debug!("Creating order state snapshot for {order}");
538 }
539
540 if self.cache.borrow().has_backing()
541 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
542 {
543 log::error!("Failed to snapshot order state: {e}");
544 return;
545 }
546
547 if get_message_bus().borrow().has_backing {
548 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
549 msgbus::publish(topic, order);
550 }
551 }
552
553 fn create_position_state_snapshot(&self, position: &Position) {
554 if self.config.debug {
555 log::debug!("Creating position state snapshot for {position}");
556 }
557
558 let topic = switchboard::get_positions_snapshots_topic(position.id);
564 msgbus::publish(topic, position);
565 }
566
567 fn handle_event(&mut self, event: &OrderEventAny) {
570 if self.config.debug {
571 log::debug!("{RECV}{EVT} {event:?}");
572 }
573
574 let client_order_id = event.client_order_id();
575 let cache = self.cache.borrow();
576 let mut order = if let Some(order) = cache.order(&client_order_id) {
577 order.clone()
578 } else {
579 log::warn!(
580 "Order with {} not found in the cache to apply {}",
581 event.client_order_id(),
582 event
583 );
584
585 let venue_order_id = if let Some(id) = event.venue_order_id() {
587 id
588 } else {
589 log::error!(
590 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
591 event.client_order_id()
592 );
593 return;
594 };
595
596 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
598 id
599 } else {
600 log::error!(
601 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
602 event.client_order_id(),
603 );
604 return;
605 };
606
607 if let Some(order) = cache.order(client_order_id) {
609 log::info!("Order with {client_order_id} was found in the cache");
610 order.clone()
611 } else {
612 log::error!(
613 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
614 );
615 return;
616 }
617 };
618
619 drop(cache);
620 match event {
621 OrderEventAny::Filled(fill) => {
622 let oms_type = self.determine_oms_type(fill);
623 let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
624
625 let mut fill = *fill;
627 if fill.position_id.is_none() {
628 fill.position_id = Some(position_id);
629 }
630
631 self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
632 self.handle_order_fill(&order, fill, oms_type);
633 }
634 _ => {
635 self.apply_event_to_order(&mut order, event.clone());
636 }
637 }
638 }
639
640 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
641 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
643 return *oms_type;
644 }
645
646 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
648 && let Some(client) = self.clients.get(client_id)
649 {
650 return client.oms_type();
651 }
652
653 if let Some(client) = &self.default_client {
654 return client.oms_type();
655 }
656
657 OmsType::Netting }
659
660 fn determine_position_id(
661 &mut self,
662 fill: OrderFilled,
663 oms_type: OmsType,
664 order: Option<&OrderAny>,
665 ) -> PositionId {
666 match oms_type {
667 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
668 OmsType::Netting => self.determine_netting_position_id(fill),
669 _ => self.determine_netting_position_id(fill), }
671 }
672
673 fn determine_hedging_position_id(
674 &mut self,
675 fill: OrderFilled,
676 order: Option<&OrderAny>,
677 ) -> PositionId {
678 if let Some(position_id) = fill.position_id {
680 if self.config.debug {
681 log::debug!("Already had a position ID of: {position_id}");
682 }
683 return position_id;
684 }
685
686 let cache = self.cache.borrow();
687
688 let order = if let Some(o) = order {
689 o
690 } else {
691 match cache.order(&fill.client_order_id()) {
692 Some(o) => o,
693 None => {
694 panic!(
695 "Order for {} not found to determine position ID",
696 fill.client_order_id()
697 );
698 }
699 }
700 };
701
702 if let Some(spawn_id) = order.exec_spawn_id() {
704 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
705 for spawned_order in spawn_orders {
706 if let Some(pos_id) = spawned_order.position_id() {
707 if self.config.debug {
708 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
709 }
710 return pos_id;
711 }
712 }
713 }
714
715 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
717 if self.config.debug {
718 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
719 }
720 position_id
721 }
722
723 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
724 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
725 }
726
727 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
728 if let Err(e) = order.apply(event.clone()) {
729 if matches!(e, OrderError::InvalidStateTransition) {
730 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
731 } else {
732 log::error!("Error applying event: {e}, did not apply {event}");
735 if should_handle_own_book_order(order) {
736 self.cache.borrow_mut().update_own_order_book(order);
737 }
738 }
739 return;
740 }
741
742 if let Err(e) = self.cache.borrow_mut().update_order(order) {
743 log::error!("Error updating order in cache: {e}");
744 }
745
746 let topic = switchboard::get_event_orders_topic(event.strategy_id());
747 msgbus::publish(topic, order);
748
749 if self.config.snapshot_orders {
750 self.create_order_state_snapshot(order);
751 }
752 }
753
754 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
755 let instrument =
756 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
757 instrument.clone()
758 } else {
759 log::error!(
760 "Cannot handle order fill: no instrument found for {}, {fill}",
761 fill.instrument_id,
762 );
763 return;
764 };
765
766 if self.cache.borrow().account(&fill.account_id).is_none() {
767 log::error!(
768 "Cannot handle order fill: no account found for {}, {fill}",
769 fill.instrument_id.venue,
770 );
771 return;
772 }
773
774 let position = if instrument.is_spread() {
777 None
778 } else {
779 self.handle_position_update(instrument.clone(), fill, oms_type);
780 let position_id = fill.position_id.unwrap();
781 self.cache.borrow().position(&position_id).cloned()
782 };
783
784 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
787 if !instrument.is_spread()
789 && let Some(ref pos) = position
790 && pos.is_open()
791 {
792 let position_id = pos.id;
793 for client_order_id in order.linked_order_ids().unwrap_or_default() {
794 let mut cache = self.cache.borrow_mut();
795 let contingent_order = cache.mut_order(client_order_id);
796 if let Some(contingent_order) = contingent_order
797 && contingent_order.position_id().is_none()
798 {
799 contingent_order.set_position_id(Some(position_id));
800
801 if let Err(e) = self.cache.borrow_mut().add_position_id(
802 &position_id,
803 &contingent_order.instrument_id().venue,
804 &contingent_order.client_order_id(),
805 &contingent_order.strategy_id(),
806 ) {
807 log::error!("Failed to add position ID: {e}");
808 }
809 }
810 }
811 }
812 }
815 }
816
817 fn handle_position_update(
821 &mut self,
822 instrument: InstrumentAny,
823 fill: OrderFilled,
824 oms_type: OmsType,
825 ) {
826 let position_id = if let Some(position_id) = fill.position_id {
827 position_id
828 } else {
829 log::error!("Cannot handle position update: no position ID found for fill {fill}");
830 return;
831 };
832
833 let position_opt = self.cache.borrow().position(&position_id).cloned();
834
835 match position_opt {
836 None => {
837 if self.open_position(instrument, None, fill, oms_type).is_ok() {
839 }
841 }
842 Some(pos) if pos.is_closed() => {
843 if self
845 .open_position(instrument, Some(&pos), fill, oms_type)
846 .is_ok()
847 {
848 }
850 }
851 Some(mut pos) => {
852 if self.will_flip_position(&pos, fill) {
853 self.flip_position(instrument, &mut pos, fill, oms_type);
855 } else {
856 self.update_position(&mut pos, fill);
858 }
859 }
860 }
861 }
862
863 fn open_position(
864 &self,
865 instrument: InstrumentAny,
866 position: Option<&Position>,
867 fill: OrderFilled,
868 oms_type: OmsType,
869 ) -> anyhow::Result<Position> {
870 let position = if let Some(position) = position {
871 self.cache.borrow_mut().snapshot_position(position)?;
873 let mut position = position.clone();
874 position.apply(&fill);
875 self.cache.borrow_mut().update_position(&position)?;
876 position
877 } else {
878 let position = Position::new(&instrument, fill);
879 self.cache
880 .borrow_mut()
881 .add_position(position.clone(), oms_type)?;
882 if self.config.snapshot_positions {
883 self.create_position_state_snapshot(&position);
884 }
885 position
886 };
887
888 let ts_init = self.clock.borrow().timestamp_ns();
889 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
890 let topic = switchboard::get_event_positions_topic(event.strategy_id);
891 msgbus::publish(topic, &event);
892
893 Ok(position)
894 }
895
896 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
897 position.apply(&fill);
899
900 let is_closed = position.is_closed();
902
903 if let Err(e) = self.cache.borrow_mut().update_position(position) {
905 log::error!("Failed to update position: {e:?}");
906 return;
907 }
908
909 let cache = self.cache.borrow();
911
912 drop(cache);
913
914 if self.config.snapshot_positions {
916 self.create_position_state_snapshot(position);
917 }
918
919 let topic = switchboard::get_event_positions_topic(position.strategy_id);
921 let ts_init = self.clock.borrow().timestamp_ns();
922
923 if is_closed {
924 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
925 msgbus::publish(topic, &event);
926 } else {
927 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
928 msgbus::publish(topic, &event);
929 }
930 }
931
932 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
933 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
934 }
935
936 fn flip_position(
937 &mut self,
938 instrument: InstrumentAny,
939 position: &mut Position,
940 fill: OrderFilled,
941 oms_type: OmsType,
942 ) {
943 let difference = match position.side {
944 PositionSide::Long => Quantity::from_raw(
945 fill.last_qty.raw - position.quantity.raw,
946 position.size_precision,
947 ),
948 PositionSide::Short => Quantity::from_raw(
949 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
951 ),
952 _ => fill.last_qty,
953 };
954
955 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
957 let (commission1, commission2) = if let Some(commission) = fill.commission {
958 let commission_currency = commission.currency;
959 let commission1 = Money::new(commission * fill_percent, commission_currency);
960 let commission2 = commission - commission1;
961 (Some(commission1), Some(commission2))
962 } else {
963 log::error!("Commission is not available.");
964 (None, None)
965 };
966
967 let mut fill_split1: Option<OrderFilled> = None;
968 if position.is_open() {
969 fill_split1 = Some(OrderFilled::new(
970 fill.trader_id,
971 fill.strategy_id,
972 fill.instrument_id,
973 fill.client_order_id,
974 fill.venue_order_id,
975 fill.account_id,
976 fill.trade_id,
977 fill.order_side,
978 fill.order_type,
979 position.quantity,
980 fill.last_px,
981 fill.currency,
982 fill.liquidity_side,
983 UUID4::new(),
984 fill.ts_event,
985 fill.ts_init,
986 fill.reconciliation,
987 fill.position_id,
988 commission1,
989 ));
990
991 self.update_position(position, fill_split1.unwrap());
992 }
993
994 if difference.raw == 0 {
996 log::warn!(
997 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
998 );
999 return;
1000 }
1001
1002 let position_id_flip = if oms_type == OmsType::Hedging
1003 && let Some(position_id) = fill.position_id
1004 && position_id.is_virtual()
1005 {
1006 Some(self.pos_id_generator.generate(fill.strategy_id, true))
1008 } else {
1009 fill.position_id
1011 };
1012
1013 let fill_split2 = OrderFilled::new(
1014 fill.trader_id,
1015 fill.strategy_id,
1016 fill.instrument_id,
1017 fill.client_order_id,
1018 fill.venue_order_id,
1019 fill.account_id,
1020 fill.trade_id,
1021 fill.order_side,
1022 fill.order_type,
1023 difference,
1024 fill.last_px,
1025 fill.currency,
1026 fill.liquidity_side,
1027 UUID4::new(),
1028 fill.ts_event,
1029 fill.ts_init,
1030 fill.reconciliation,
1031 position_id_flip,
1032 commission2,
1033 );
1034
1035 if oms_type == OmsType::Hedging
1036 && let Some(position_id) = fill.position_id
1037 && position_id.is_virtual()
1038 {
1039 log::warn!("Closing position {fill_split1:?}");
1040 log::warn!("Flipping position {fill_split2:?}");
1041 }
1042 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1044 log::error!("Failed to open flipped position: {e:?}");
1045 }
1046 }
1047
1048 fn set_position_id_counts(&mut self) {
1051 let cache = self.cache.borrow();
1053 let positions = cache.positions(None, None, None, None);
1054
1055 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1057
1058 for position in positions {
1059 *counts.entry(position.strategy_id).or_insert(0) += 1;
1060 }
1061
1062 self.pos_id_generator.reset();
1063
1064 for (strategy_id, count) in counts {
1065 self.pos_id_generator.set_count(count, strategy_id);
1066 log::info!("Set PositionId count for {strategy_id} to {count}");
1067 }
1068 }
1069
1070 fn last_px_for_conversion(
1071 &self,
1072 instrument_id: &InstrumentId,
1073 side: OrderSide,
1074 ) -> Option<Price> {
1075 let cache = self.cache.borrow();
1076
1077 if let Some(trade) = cache.trade(instrument_id) {
1079 return Some(trade.price);
1080 }
1081
1082 if let Some(quote) = cache.quote(instrument_id) {
1084 match side {
1085 OrderSide::Buy => Some(quote.ask_price),
1086 OrderSide::Sell => Some(quote.bid_price),
1087 OrderSide::NoOrderSide => None,
1088 }
1089 } else {
1090 None
1091 }
1092 }
1093
1094 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1095 log::info!(
1096 "Setting {} order quote quantity {} to base quantity {}",
1097 order.instrument_id(),
1098 order.quantity(),
1099 base_qty
1100 );
1101
1102 let original_qty = order.quantity();
1103 order.set_quantity(base_qty);
1104 order.set_leaves_qty(base_qty);
1105 order.set_is_quote_quantity(false);
1106
1107 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1108 return;
1109 }
1110
1111 if let Some(linked_order_ids) = order.linked_order_ids() {
1112 for client_order_id in linked_order_ids {
1113 match self.cache.borrow_mut().mut_order(client_order_id) {
1114 Some(contingent_order) => {
1115 if !contingent_order.is_quote_quantity() {
1116 continue; }
1118
1119 if contingent_order.quantity() != original_qty {
1120 log::warn!(
1121 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1122 contingent_order.quantity(),
1123 original_qty,
1124 base_qty
1125 );
1126 }
1127
1128 log::info!(
1129 "Setting {} order quote quantity {} to base quantity {}",
1130 contingent_order.instrument_id(),
1131 contingent_order.quantity(),
1132 base_qty
1133 );
1134
1135 contingent_order.set_quantity(base_qty);
1136 contingent_order.set_leaves_qty(base_qty);
1137 contingent_order.set_is_quote_quantity(false);
1138 }
1139 None => {
1140 log::error!("Contingency order {client_order_id} not found");
1141 }
1142 }
1143 }
1144 } else {
1145 log::warn!(
1146 "No linked order IDs found for order {}",
1147 order.client_order_id()
1148 );
1149 }
1150 }
1151
1152 fn deny_order(&self, order: &OrderAny, reason: &str) {
1153 log::error!(
1154 "Order denied: {reason}, order ID: {}",
1155 order.client_order_id()
1156 );
1157
1158 let denied = OrderDenied::new(
1159 order.trader_id(),
1160 order.strategy_id(),
1161 order.instrument_id(),
1162 order.client_order_id(),
1163 reason.into(),
1164 UUID4::new(),
1165 self.clock.borrow().timestamp_ns(),
1166 self.clock.borrow().timestamp_ns(),
1167 );
1168
1169 let mut order = order.clone();
1170
1171 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1172 log::error!("Failed to apply denied event to order: {e}");
1173 return;
1174 }
1175
1176 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1177 log::error!("Failed to update order in cache: {e}");
1178 return;
1179 }
1180
1181 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1182 msgbus::publish(topic, &denied);
1183
1184 if self.config.snapshot_orders {
1185 self.create_order_state_snapshot(&order);
1186 }
1187 }
1188
1189 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1190 let mut cache = self.cache.borrow_mut();
1191 if cache.own_order_book_mut(instrument_id).is_none() {
1192 let own_book = OwnOrderBook::new(*instrument_id);
1193 cache.add_own_order_book(own_book).unwrap();
1194 }
1195
1196 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1197 }
1198}
1199
1200mod stubs;
1204#[cfg(test)]
1205mod tests;