1pub mod config;
24
25use std::{
26 cell::{RefCell, RefMut},
27 collections::{HashMap, HashSet},
28 fmt::Debug,
29 rc::Rc,
30 time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35 cache::Cache,
36 clock::Clock,
37 generators::position_id::PositionIdGenerator,
38 logging::{CMD, EVT, RECV},
39 messages::execution::{
40 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
41 SubmitOrder, SubmitOrderList, TradingCommand,
42 },
43 msgbus::{
44 self, get_message_bus,
45 switchboard::{self},
46 },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51 events::{
52 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53 PositionOpened,
54 },
55 identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58 orders::{Order, OrderAny, OrderError},
59 position::Position,
60 types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65pub struct ExecutionEngine {
72 clock: Rc<RefCell<dyn Clock>>,
73 cache: Rc<RefCell<Cache>>,
74 clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
75 default_client: Option<Rc<dyn ExecutionClient>>,
76 routing_map: HashMap<Venue, ClientId>,
77 oms_overrides: HashMap<StrategyId, OmsType>,
78 external_order_claims: HashMap<InstrumentId, StrategyId>,
79 external_clients: HashSet<ClientId>,
80 pos_id_generator: PositionIdGenerator,
81 config: ExecutionEngineConfig,
82}
83
84impl Debug for ExecutionEngine {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct(stringify!(ExecutionEngine))
87 .field("client_count", &self.clients.len())
88 .finish()
89 }
90}
91
92impl ExecutionEngine {
93 pub fn new(
95 clock: Rc<RefCell<dyn Clock>>,
96 cache: Rc<RefCell<Cache>>,
97 config: Option<ExecutionEngineConfig>,
98 ) -> Self {
99 let trader_id = get_message_bus().borrow().trader_id;
100 Self {
101 clock: clock.clone(),
102 cache,
103 clients: HashMap::new(),
104 default_client: None,
105 routing_map: HashMap::new(),
106 oms_overrides: HashMap::new(),
107 external_order_claims: HashMap::new(),
108 external_clients: config
109 .as_ref()
110 .and_then(|c| c.external_clients.clone())
111 .unwrap_or_default()
112 .into_iter()
113 .collect(),
114 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
115 config: config.unwrap_or_default(),
116 }
117 }
118
119 #[must_use]
120 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
122 self.pos_id_generator.count(strategy_id)
123 }
124
125 #[must_use]
126 pub fn check_integrity(&self) -> bool {
128 self.cache.borrow_mut().check_integrity()
129 }
130
131 #[must_use]
132 pub fn check_connected(&self) -> bool {
134 self.clients.values().all(|c| c.is_connected())
135 }
136
137 #[must_use]
138 pub fn check_disconnected(&self) -> bool {
140 self.clients.values().all(|c| !c.is_connected())
141 }
142
143 #[must_use]
144 pub fn check_residuals(&self) -> bool {
146 self.cache.borrow().check_residuals()
147 }
148
149 #[must_use]
150 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
152 self.external_order_claims.keys().copied().collect()
153 }
154
155 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
163 if self.clients.contains_key(&client.client_id()) {
164 anyhow::bail!("Client already registered with ID {}", client.client_id());
165 }
166
167 self.routing_map.insert(client.venue(), client.client_id());
169
170 log::info!("Registered client {}", client.client_id());
171 self.clients.insert(client.client_id(), client);
172 Ok(())
173 }
174
175 pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
177 log::info!("Registered default client {}", client.client_id());
178 self.default_client = Some(client);
179 }
180
181 #[must_use]
182 pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
184 self.clients.get(client_id).cloned()
185 }
186
187 pub fn register_venue_routing(
193 &mut self,
194 client_id: ClientId,
195 venue: Venue,
196 ) -> anyhow::Result<()> {
197 if !self.clients.contains_key(&client_id) {
198 anyhow::bail!("No client registered with ID {client_id}");
199 }
200
201 self.routing_map.insert(venue, client_id);
202 log::info!("Set client {client_id} routing for {venue}");
203 Ok(())
204 }
205
206 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
215 if self.clients.remove(&client_id).is_some() {
216 self.routing_map
218 .retain(|_, mapped_id| mapped_id != &client_id);
219 log::info!("Deregistered client {client_id}");
220 Ok(())
221 } else {
222 anyhow::bail!("No client registered with ID {client_id}")
223 }
224 }
225
226 #[allow(clippy::await_holding_refcell_ref)]
229 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
235 let ts = SystemTime::now();
236
237 {
238 let mut cache = self.cache.borrow_mut();
239 cache.clear_index();
240 cache.cache_general()?;
241 self.cache.borrow_mut().cache_all().await?;
242 cache.build_index();
243 let _ = cache.check_integrity();
244
245 if self.config.manage_own_order_books {
246 for order in cache.orders(None, None, None, None) {
247 if order.is_closed() || !should_handle_own_book_order(order) {
248 continue;
249 }
250 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
251 own_book.add(order.to_own_book_order());
252 }
253 }
254 }
255
256 self.set_position_id_counts();
257
258 log::info!(
259 "Loaded cache in {}ms",
260 SystemTime::now()
261 .duration_since(ts)
262 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
263 .as_millis()
264 );
265
266 Ok(())
267 }
268
269 pub fn flush_db(&self) {
271 self.cache.borrow_mut().flush_db();
272 }
273
274 pub fn process(&mut self, event: &OrderEventAny) {
276 self.handle_event(event);
277 }
278
279 pub fn execute(&self, command: &TradingCommand) {
281 self.execute_command(command);
282 }
283
284 fn execute_command(&self, command: &TradingCommand) {
287 if self.config.debug {
288 log::debug!("{RECV}{CMD} {command:?}");
289 }
290
291 if self.external_clients.contains(&command.client_id()) {
292 if self.config.debug {
293 let cid = command.client_id();
294 log::debug!("Skipping execution command for external client {cid}: {command:?}");
295 }
296 return;
297 }
298
299 let client: Rc<dyn ExecutionClient> = if let Some(client) = self
300 .clients
301 .get(&command.client_id())
302 .or_else(|| {
303 self.routing_map
304 .get(&command.instrument_id().venue)
305 .and_then(|client_id| self.clients.get(client_id))
306 })
307 .or(self.default_client.as_ref())
308 {
309 client.clone()
310 } else {
311 log::error!(
312 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
313 command.client_id(),
314 command.instrument_id().venue,
315 );
316 return;
317 };
318
319 match command {
320 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
321 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
322 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
323 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
324 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
325 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
326 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
327 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
328 }
329 }
330
331 fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
332 let mut order = cmd.order.clone();
333 let client_order_id = order.client_order_id();
334 let instrument_id = order.instrument_id();
335
336 if !self.cache.borrow().order_exists(&client_order_id) {
338 {
340 let mut cache = self.cache.borrow_mut();
341 if let Err(e) =
342 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
343 {
344 log::error!("Error adding order to cache: {e}");
345 return;
346 }
347 }
348
349 if self.config.snapshot_orders {
350 self.create_order_state_snapshot(&order);
351 }
352 }
353
354 let instrument = {
356 let cache = self.cache.borrow();
357 if let Some(instrument) = cache.instrument(&instrument_id) {
358 instrument.clone()
359 } else {
360 log::error!(
361 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
362 );
363 return;
364 }
365 };
366
367 if self.config.convert_quote_qty_to_base
369 && !instrument.is_inverse()
370 && order.is_quote_quantity()
371 {
372 log::warn!(
373 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
374 );
375 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
376
377 if let Some(price) = last_px {
378 let base_qty = instrument.get_base_quantity(order.quantity(), price);
379 self.set_order_base_qty(&mut order, base_qty);
380 } else {
381 self.deny_order(
382 &order,
383 &format!("no-price-to-convert-quote-qty {instrument_id}"),
384 );
385 return;
386 }
387 }
388
389 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
390 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
391 own_book.add(order.to_own_book_order());
392 }
393
394 if let Err(e) = client.submit_order(cmd) {
396 log::error!("Error submitting order to client: {e}");
397 self.deny_order(
398 &cmd.order,
399 &format!("failed-to-submit-order-to-client: {e}"),
400 );
401 }
402 }
403
404 fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
405 let orders = cmd.order_list.orders.clone();
406
407 let mut cache = self.cache.borrow_mut();
409 for order in &orders {
410 if !cache.order_exists(&order.client_order_id()) {
411 if let Err(e) =
412 cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
413 {
414 log::error!("Error adding order to cache: {e}");
415 return;
416 }
417
418 if self.config.snapshot_orders {
419 self.create_order_state_snapshot(order);
420 }
421 }
422 }
423 drop(cache);
424
425 let instrument = {
427 let cache = self.cache.borrow();
428 if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
429 instrument.clone()
430 } else {
431 log::error!(
432 "Cannot handle submit order list: no instrument found for {}, {cmd}",
433 cmd.instrument_id,
434 );
435 return;
436 }
437 };
438
439 if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
441 let mut conversions: Vec<(ClientOrderId, Quantity)> =
442 Vec::with_capacity(cmd.order_list.orders.len());
443
444 for order in &cmd.order_list.orders {
445 if !order.is_quote_quantity() {
446 continue; }
448
449 let last_px =
450 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
451
452 if let Some(px) = last_px {
453 let base_qty = instrument.get_base_quantity(order.quantity(), px);
454 conversions.push((order.client_order_id(), base_qty));
455 } else {
456 for order in &cmd.order_list.orders {
457 self.deny_order(
458 order,
459 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
460 );
461 }
462 return; }
464 }
465
466 if !conversions.is_empty() {
467 log::warn!(
468 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
469 );
470
471 let mut cache = self.cache.borrow_mut();
472 for (client_order_id, base_qty) in conversions {
473 if let Some(mut_order) = cache.mut_order(&client_order_id) {
474 self.set_order_base_qty(mut_order, base_qty);
475 }
476 }
477 }
478 }
479
480 if self.config.manage_own_order_books {
481 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
482 for order in &cmd.order_list.orders {
483 if should_handle_own_book_order(order) {
484 own_book.add(order.to_own_book_order());
485 }
486 }
487 }
488
489 if let Err(e) = client.submit_order_list(cmd) {
491 log::error!("Error submitting order list to client: {e}");
492 for order in &orders {
493 self.deny_order(
494 order,
495 &format!("failed-to-submit-order-list-to-client: {e}"),
496 );
497 }
498 }
499 }
500
501 fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
502 if let Err(e) = client.modify_order(cmd) {
503 log::error!("Error modifying order: {e}");
504 }
505 }
506
507 fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
508 if let Err(e) = client.cancel_order(cmd) {
509 log::error!("Error canceling order: {e}");
510 }
511 }
512
513 fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
514 if let Err(e) = client.cancel_all_orders(cmd) {
515 log::error!("Error canceling all orders: {e}");
516 }
517 }
518
519 fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
520 if let Err(e) = client.batch_cancel_orders(cmd) {
521 log::error!("Error batch canceling orders: {e}");
522 }
523 }
524
525 fn handle_query_account(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryAccount) {
526 if let Err(e) = client.query_account(cmd) {
527 log::error!("Error querying account: {e}");
528 }
529 }
530
531 fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
532 if let Err(e) = client.query_order(cmd) {
533 log::error!("Error querying order: {e}");
534 }
535 }
536
537 fn create_order_state_snapshot(&self, order: &OrderAny) {
538 if self.config.debug {
539 log::debug!("Creating order state snapshot for {order}");
540 }
541
542 if self.cache.borrow().has_backing()
543 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
544 {
545 log::error!("Failed to snapshot order state: {e}");
546 return;
547 }
548
549 if get_message_bus().borrow().has_backing {
550 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
551 msgbus::publish(topic, order);
552 }
553 }
554
555 fn create_position_state_snapshot(&self, position: &Position) {
556 if self.config.debug {
557 log::debug!("Creating position state snapshot for {position}");
558 }
559
560 let topic = switchboard::get_positions_snapshots_topic(position.id);
566 msgbus::publish(topic, position);
567 }
568
569 fn handle_event(&mut self, event: &OrderEventAny) {
572 if self.config.debug {
573 log::debug!("{RECV}{EVT} {event:?}");
574 }
575
576 let client_order_id = event.client_order_id();
577 let cache = self.cache.borrow();
578 let mut order = if let Some(order) = cache.order(&client_order_id) {
579 order.clone()
580 } else {
581 log::warn!(
582 "Order with {} not found in the cache to apply {}",
583 event.client_order_id(),
584 event
585 );
586
587 let venue_order_id = if let Some(id) = event.venue_order_id() {
589 id
590 } else {
591 log::error!(
592 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
593 event.client_order_id()
594 );
595 return;
596 };
597
598 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
600 id
601 } else {
602 log::error!(
603 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
604 event.client_order_id(),
605 );
606 return;
607 };
608
609 if let Some(order) = cache.order(client_order_id) {
611 log::info!("Order with {client_order_id} was found in the cache");
612 order.clone()
613 } else {
614 log::error!(
615 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
616 );
617 return;
618 }
619 };
620
621 drop(cache);
622 match event {
623 OrderEventAny::Filled(fill) => {
624 let oms_type = self.determine_oms_type(fill);
625 let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
626
627 let mut fill = *fill;
629 if fill.position_id.is_none() {
630 fill.position_id = Some(position_id);
631 }
632
633 self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
634 self.handle_order_fill(&order, fill, oms_type);
635 }
636 _ => {
637 self.apply_event_to_order(&mut order, event.clone());
638 }
639 }
640 }
641
642 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
643 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
645 return *oms_type;
646 }
647
648 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
650 && let Some(client) = self.clients.get(client_id)
651 {
652 return client.oms_type();
653 }
654
655 if let Some(client) = &self.default_client {
656 return client.oms_type();
657 }
658
659 OmsType::Netting }
661
662 fn determine_position_id(
663 &mut self,
664 fill: OrderFilled,
665 oms_type: OmsType,
666 order: Option<&OrderAny>,
667 ) -> PositionId {
668 match oms_type {
669 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
670 OmsType::Netting => self.determine_netting_position_id(fill),
671 _ => self.determine_netting_position_id(fill), }
673 }
674
675 fn determine_hedging_position_id(
676 &mut self,
677 fill: OrderFilled,
678 order: Option<&OrderAny>,
679 ) -> PositionId {
680 if let Some(position_id) = fill.position_id {
682 if self.config.debug {
683 log::debug!("Already had a position ID of: {position_id}");
684 }
685 return position_id;
686 }
687
688 let cache = self.cache.borrow();
689
690 let order = if let Some(o) = order {
691 o
692 } else {
693 match cache.order(&fill.client_order_id()) {
694 Some(o) => o,
695 None => {
696 panic!(
697 "Order for {} not found to determine position ID",
698 fill.client_order_id()
699 );
700 }
701 }
702 };
703
704 if let Some(spawn_id) = order.exec_spawn_id() {
706 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
707 for spawned_order in spawn_orders {
708 if let Some(pos_id) = spawned_order.position_id() {
709 if self.config.debug {
710 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
711 }
712 return pos_id;
713 }
714 }
715 }
716
717 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
719 if self.config.debug {
720 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
721 }
722 position_id
723 }
724
725 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
726 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
727 }
728
729 fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
730 if let Err(e) = order.apply(event.clone()) {
731 if matches!(e, OrderError::InvalidStateTransition) {
732 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
733 } else {
734 log::error!("Error applying event: {e}, did not apply {event}");
737 if should_handle_own_book_order(order) {
738 self.cache.borrow_mut().update_own_order_book(order);
739 }
740 }
741 return;
742 }
743
744 if let Err(e) = self.cache.borrow_mut().update_order(order) {
745 log::error!("Error updating order in cache: {e}");
746 }
747
748 let topic = switchboard::get_event_orders_topic(event.strategy_id());
749 msgbus::publish(topic, order);
750
751 if self.config.snapshot_orders {
752 self.create_order_state_snapshot(order);
753 }
754 }
755
756 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
757 let instrument =
758 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
759 instrument.clone()
760 } else {
761 log::error!(
762 "Cannot handle order fill: no instrument found for {}, {fill}",
763 fill.instrument_id,
764 );
765 return;
766 };
767
768 if self.cache.borrow().account(&fill.account_id).is_none() {
769 log::error!(
770 "Cannot handle order fill: no account found for {}, {fill}",
771 fill.instrument_id.venue,
772 );
773 return;
774 }
775
776 let position = if instrument.is_spread() {
779 None
780 } else {
781 self.handle_position_update(instrument.clone(), fill, oms_type);
782 let position_id = fill.position_id.unwrap();
783 self.cache.borrow().position(&position_id).cloned()
784 };
785
786 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
789 if !instrument.is_spread()
791 && let Some(ref pos) = position
792 && pos.is_open()
793 {
794 let position_id = pos.id;
795 for client_order_id in order.linked_order_ids().unwrap_or_default() {
796 let mut cache = self.cache.borrow_mut();
797 let contingent_order = cache.mut_order(client_order_id);
798 if let Some(contingent_order) = contingent_order
799 && contingent_order.position_id().is_none()
800 {
801 contingent_order.set_position_id(Some(position_id));
802
803 if let Err(e) = self.cache.borrow_mut().add_position_id(
804 &position_id,
805 &contingent_order.instrument_id().venue,
806 &contingent_order.client_order_id(),
807 &contingent_order.strategy_id(),
808 ) {
809 log::error!("Failed to add position ID: {e}");
810 }
811 }
812 }
813 }
814 }
817 }
818
819 fn handle_position_update(
823 &mut self,
824 instrument: InstrumentAny,
825 fill: OrderFilled,
826 oms_type: OmsType,
827 ) {
828 let position_id = if let Some(position_id) = fill.position_id {
829 position_id
830 } else {
831 log::error!("Cannot handle position update: no position ID found for fill {fill}");
832 return;
833 };
834
835 let position_opt = self.cache.borrow().position(&position_id).cloned();
836
837 match position_opt {
838 None => {
839 if self.open_position(instrument, None, fill, oms_type).is_ok() {
841 }
843 }
844 Some(pos) if pos.is_closed() => {
845 if self
847 .open_position(instrument, Some(&pos), fill, oms_type)
848 .is_ok()
849 {
850 }
852 }
853 Some(mut pos) => {
854 if self.will_flip_position(&pos, fill) {
855 self.flip_position(instrument, &mut pos, fill, oms_type);
857 } else {
858 self.update_position(&mut pos, fill);
860 }
861 }
862 }
863 }
864
865 fn open_position(
866 &self,
867 instrument: InstrumentAny,
868 position: Option<&Position>,
869 fill: OrderFilled,
870 oms_type: OmsType,
871 ) -> anyhow::Result<Position> {
872 let position = if let Some(position) = position {
873 self.cache.borrow_mut().snapshot_position(position)?;
875 let mut position = position.clone();
876 position.apply(&fill);
877 self.cache.borrow_mut().update_position(&position)?;
878 position
879 } else {
880 let position = Position::new(&instrument, fill);
881 self.cache
882 .borrow_mut()
883 .add_position(position.clone(), oms_type)?;
884 if self.config.snapshot_positions {
885 self.create_position_state_snapshot(&position);
886 }
887 position
888 };
889
890 let ts_init = self.clock.borrow().timestamp_ns();
891 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
892 let topic = switchboard::get_event_positions_topic(event.strategy_id);
893 msgbus::publish(topic, &event);
894
895 Ok(position)
896 }
897
898 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
899 position.apply(&fill);
901
902 let is_closed = position.is_closed();
904
905 if let Err(e) = self.cache.borrow_mut().update_position(position) {
907 log::error!("Failed to update position: {e:?}");
908 return;
909 }
910
911 let cache = self.cache.borrow();
913
914 drop(cache);
915
916 if self.config.snapshot_positions {
918 self.create_position_state_snapshot(position);
919 }
920
921 let topic = switchboard::get_event_positions_topic(position.strategy_id);
923 let ts_init = self.clock.borrow().timestamp_ns();
924
925 if is_closed {
926 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
927 msgbus::publish(topic, &event);
928 } else {
929 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
930 msgbus::publish(topic, &event);
931 }
932 }
933
934 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
935 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
936 }
937
938 fn flip_position(
939 &mut self,
940 instrument: InstrumentAny,
941 position: &mut Position,
942 fill: OrderFilled,
943 oms_type: OmsType,
944 ) {
945 let difference = match position.side {
946 PositionSide::Long => Quantity::from_raw(
947 fill.last_qty.raw - position.quantity.raw,
948 position.size_precision,
949 ),
950 PositionSide::Short => Quantity::from_raw(
951 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
953 ),
954 _ => fill.last_qty,
955 };
956
957 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
959 let (commission1, commission2) = if let Some(commission) = fill.commission {
960 let commission_currency = commission.currency;
961 let commission1 = Money::new(commission * fill_percent, commission_currency);
962 let commission2 = commission - commission1;
963 (Some(commission1), Some(commission2))
964 } else {
965 log::error!("Commission is not available.");
966 (None, None)
967 };
968
969 let mut fill_split1: Option<OrderFilled> = None;
970 if position.is_open() {
971 fill_split1 = Some(OrderFilled::new(
972 fill.trader_id,
973 fill.strategy_id,
974 fill.instrument_id,
975 fill.client_order_id,
976 fill.venue_order_id,
977 fill.account_id,
978 fill.trade_id,
979 fill.order_side,
980 fill.order_type,
981 position.quantity,
982 fill.last_px,
983 fill.currency,
984 fill.liquidity_side,
985 UUID4::new(),
986 fill.ts_event,
987 fill.ts_init,
988 fill.reconciliation,
989 fill.position_id,
990 commission1,
991 ));
992
993 self.update_position(position, fill_split1.unwrap());
994 }
995
996 if difference.raw == 0 {
998 log::warn!(
999 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1000 );
1001 return;
1002 }
1003
1004 let position_id_flip = if oms_type == OmsType::Hedging
1005 && let Some(position_id) = fill.position_id
1006 && position_id.is_virtual()
1007 {
1008 Some(self.pos_id_generator.generate(fill.strategy_id, true))
1010 } else {
1011 fill.position_id
1013 };
1014
1015 let fill_split2 = OrderFilled::new(
1016 fill.trader_id,
1017 fill.strategy_id,
1018 fill.instrument_id,
1019 fill.client_order_id,
1020 fill.venue_order_id,
1021 fill.account_id,
1022 fill.trade_id,
1023 fill.order_side,
1024 fill.order_type,
1025 difference,
1026 fill.last_px,
1027 fill.currency,
1028 fill.liquidity_side,
1029 UUID4::new(),
1030 fill.ts_event,
1031 fill.ts_init,
1032 fill.reconciliation,
1033 position_id_flip,
1034 commission2,
1035 );
1036
1037 if oms_type == OmsType::Hedging
1038 && let Some(position_id) = fill.position_id
1039 && position_id.is_virtual()
1040 {
1041 log::warn!("Closing position {fill_split1:?}");
1042 log::warn!("Flipping position {fill_split2:?}");
1043 }
1044 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1046 log::error!("Failed to open flipped position: {e:?}");
1047 }
1048 }
1049
1050 fn set_position_id_counts(&mut self) {
1053 let cache = self.cache.borrow();
1055 let positions = cache.positions(None, None, None, None);
1056
1057 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1059
1060 for position in positions {
1061 *counts.entry(position.strategy_id).or_insert(0) += 1;
1062 }
1063
1064 self.pos_id_generator.reset();
1065
1066 for (strategy_id, count) in counts {
1067 self.pos_id_generator.set_count(count, strategy_id);
1068 log::info!("Set PositionId count for {strategy_id} to {count}");
1069 }
1070 }
1071
1072 fn last_px_for_conversion(
1073 &self,
1074 instrument_id: &InstrumentId,
1075 side: OrderSide,
1076 ) -> Option<Price> {
1077 let cache = self.cache.borrow();
1078
1079 if let Some(trade) = cache.trade(instrument_id) {
1081 return Some(trade.price);
1082 }
1083
1084 if let Some(quote) = cache.quote(instrument_id) {
1086 match side {
1087 OrderSide::Buy => Some(quote.ask_price),
1088 OrderSide::Sell => Some(quote.bid_price),
1089 OrderSide::NoOrderSide => None,
1090 }
1091 } else {
1092 None
1093 }
1094 }
1095
1096 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1097 log::info!(
1098 "Setting {} order quote quantity {} to base quantity {}",
1099 order.instrument_id(),
1100 order.quantity(),
1101 base_qty
1102 );
1103
1104 let original_qty = order.quantity();
1105 order.set_quantity(base_qty);
1106 order.set_leaves_qty(base_qty);
1107 order.set_is_quote_quantity(false);
1108
1109 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1110 return;
1111 }
1112
1113 if let Some(linked_order_ids) = order.linked_order_ids() {
1114 for client_order_id in linked_order_ids {
1115 match self.cache.borrow_mut().mut_order(client_order_id) {
1116 Some(contingent_order) => {
1117 if !contingent_order.is_quote_quantity() {
1118 continue; }
1120
1121 if contingent_order.quantity() != original_qty {
1122 log::warn!(
1123 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1124 contingent_order.quantity(),
1125 original_qty,
1126 base_qty
1127 );
1128 }
1129
1130 log::info!(
1131 "Setting {} order quote quantity {} to base quantity {}",
1132 contingent_order.instrument_id(),
1133 contingent_order.quantity(),
1134 base_qty
1135 );
1136
1137 contingent_order.set_quantity(base_qty);
1138 contingent_order.set_leaves_qty(base_qty);
1139 contingent_order.set_is_quote_quantity(false);
1140 }
1141 None => {
1142 log::error!("Contingency order {client_order_id} not found");
1143 }
1144 }
1145 }
1146 } else {
1147 log::warn!(
1148 "No linked order IDs found for order {}",
1149 order.client_order_id()
1150 );
1151 }
1152 }
1153
1154 fn deny_order(&self, order: &OrderAny, reason: &str) {
1155 log::error!(
1156 "Order denied: {reason}, order ID: {}",
1157 order.client_order_id()
1158 );
1159
1160 let denied = OrderDenied::new(
1161 order.trader_id(),
1162 order.strategy_id(),
1163 order.instrument_id(),
1164 order.client_order_id(),
1165 reason.into(),
1166 UUID4::new(),
1167 self.clock.borrow().timestamp_ns(),
1168 self.clock.borrow().timestamp_ns(),
1169 );
1170
1171 let mut order = order.clone();
1172
1173 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1174 log::error!("Failed to apply denied event to order: {e}");
1175 return;
1176 }
1177
1178 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1179 log::error!("Failed to update order in cache: {e}");
1180 return;
1181 }
1182
1183 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1184 msgbus::publish(topic, &denied);
1185
1186 if self.config.snapshot_orders {
1187 self.create_order_state_snapshot(&order);
1188 }
1189 }
1190
1191 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1192 let mut cache = self.cache.borrow_mut();
1193 if cache.own_order_book_mut(instrument_id).is_none() {
1194 let own_book = OwnOrderBook::new(*instrument_id);
1195 cache.add_own_order_book(own_book).unwrap();
1196 }
1197
1198 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1199 }
1200}
1201
1202mod stubs;
1206#[cfg(test)]
1207mod tests;