1pub mod config;
24pub mod stubs;
25
26use std::{
27 cell::{RefCell, RefMut},
28 collections::{HashMap, HashSet},
29 fmt::Debug,
30 rc::Rc,
31 time::SystemTime,
32};
33
34use ahash::{AHashMap, AHashSet};
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use nautilus_common::{
38 cache::Cache,
39 clients::ExecutionClient,
40 clock::Clock,
41 generators::position_id::PositionIdGenerator,
42 logging::{CMD, EVT, RECV, SEND},
43 messages::{
44 ExecutionReport,
45 execution::{
46 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47 SubmitOrder, SubmitOrderList, TradingCommand,
48 },
49 },
50 msgbus::{
51 self, MessagingSwitchboard, TypedIntoHandler, get_message_bus,
52 switchboard::{self},
53 },
54 runner::try_get_trading_cmd_sender,
55};
56use nautilus_core::{UUID4, UnixNanos, WeakCell};
57use nautilus_model::{
58 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
59 events::{
60 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
61 PositionEvent, PositionOpened,
62 },
63 identifiers::{
64 ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue, VenueOrderId,
65 },
66 instruments::{Instrument, InstrumentAny},
67 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
68 orders::{Order, OrderAny, OrderError},
69 position::Position,
70 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
71 types::{Money, Price, Quantity},
72};
73use rust_decimal::Decimal;
74
75use crate::{
76 client::ExecutionClientAdapter,
77 reconciliation::{
78 check_position_reconciliation, reconcile_fill_report as reconcile_fill,
79 reconcile_order_report,
80 },
81};
82
83pub struct ExecutionEngine {
90 clock: Rc<RefCell<dyn Clock>>,
91 cache: Rc<RefCell<Cache>>,
92 clients: AHashMap<ClientId, ExecutionClientAdapter>,
93 default_client: Option<ExecutionClientAdapter>,
94 routing_map: HashMap<Venue, ClientId>,
95 oms_overrides: HashMap<StrategyId, OmsType>,
96 external_order_claims: HashMap<InstrumentId, StrategyId>,
97 external_clients: HashSet<ClientId>,
98 pos_id_generator: PositionIdGenerator,
99 config: ExecutionEngineConfig,
100}
101
102impl Debug for ExecutionEngine {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct(stringify!(ExecutionEngine))
105 .field("client_count", &self.clients.len())
106 .finish()
107 }
108}
109
110impl ExecutionEngine {
111 pub fn new(
113 clock: Rc<RefCell<dyn Clock>>,
114 cache: Rc<RefCell<Cache>>,
115 config: Option<ExecutionEngineConfig>,
116 ) -> Self {
117 let trader_id = get_message_bus().borrow().trader_id;
118 Self {
119 clock: clock.clone(),
120 cache,
121 clients: AHashMap::new(),
122 default_client: None,
123 routing_map: HashMap::new(),
124 oms_overrides: HashMap::new(),
125 external_order_claims: HashMap::new(),
126 external_clients: config
127 .as_ref()
128 .and_then(|c| c.external_clients.clone())
129 .unwrap_or_default()
130 .into_iter()
131 .collect(),
132 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
133 config: config.unwrap_or_default(),
134 }
135 }
136
137 pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
139 let weak = WeakCell::from(Rc::downgrade(&engine));
140
141 let weak1 = weak.clone();
142 msgbus::register_trading_command_endpoint(
143 MessagingSwitchboard::exec_engine_execute(),
144 TypedIntoHandler::from(move |cmd: TradingCommand| {
145 if let Some(rc) = weak1.upgrade() {
146 rc.borrow().execute(cmd);
147 }
148 }),
149 );
150
151 msgbus::register_trading_command_endpoint(
154 MessagingSwitchboard::exec_engine_queue_execute(),
155 TypedIntoHandler::from(move |cmd: TradingCommand| {
156 if let Some(sender) = try_get_trading_cmd_sender() {
157 sender.execute(cmd);
158 } else {
159 let endpoint = MessagingSwitchboard::exec_engine_execute();
160 msgbus::send_trading_command(endpoint, cmd);
161 }
162 }),
163 );
164
165 let weak2 = weak.clone();
166 msgbus::register_order_event_endpoint(
167 MessagingSwitchboard::exec_engine_process(),
168 TypedIntoHandler::from(move |event: OrderEventAny| {
169 if let Some(rc) = weak2.upgrade() {
170 rc.borrow_mut().process(event);
171 }
172 }),
173 );
174
175 let weak3 = weak;
176 msgbus::register_execution_report_endpoint(
177 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
178 TypedIntoHandler::from(move |report: ExecutionReport| {
179 if let Some(rc) = weak3.upgrade() {
180 rc.borrow_mut().reconcile_execution_report(report);
181 }
182 }),
183 );
184 }
185
186 #[must_use]
187 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
189 self.pos_id_generator.count(strategy_id)
190 }
191
192 #[must_use]
193 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
195 &self.cache
196 }
197
198 #[must_use]
199 pub const fn config(&self) -> &ExecutionEngineConfig {
201 &self.config
202 }
203
204 #[must_use]
205 pub fn check_integrity(&self) -> bool {
207 self.cache.borrow_mut().check_integrity()
208 }
209
210 #[must_use]
211 pub fn check_connected(&self) -> bool {
213 let clients_connected = self.clients.values().all(|c| c.is_connected());
214 let default_connected = self
215 .default_client
216 .as_ref()
217 .is_none_or(|c| c.is_connected());
218 clients_connected && default_connected
219 }
220
221 #[must_use]
222 pub fn check_disconnected(&self) -> bool {
224 let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
225 let default_disconnected = self
226 .default_client
227 .as_ref()
228 .is_none_or(|c| !c.is_connected());
229 clients_disconnected && default_disconnected
230 }
231
232 #[must_use]
234 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
235 let mut status: Vec<_> = self
236 .clients
237 .values()
238 .map(|c| (c.client_id(), c.is_connected()))
239 .collect();
240
241 if let Some(default) = &self.default_client {
242 status.push((default.client_id(), default.is_connected()));
243 }
244
245 status
246 }
247
248 #[must_use]
249 pub fn check_residuals(&self) -> bool {
251 self.cache.borrow().check_residuals()
252 }
253
254 #[must_use]
255 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
257 self.external_order_claims.keys().copied().collect()
258 }
259
260 #[must_use]
261 pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
263 self.external_clients.clone()
264 }
265
266 #[must_use]
267 pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
269 self.external_order_claims.get(instrument_id).copied()
270 }
271
272 pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
278 let client_id = client.client_id();
279 let venue = client.venue();
280
281 if self.clients.contains_key(&client_id) {
282 anyhow::bail!("Client already registered with ID {client_id}");
283 }
284
285 let adapter = ExecutionClientAdapter::new(client);
286
287 self.routing_map.insert(venue, client_id);
288
289 log::debug!("Registered client {client_id}");
290 self.clients.insert(client_id, adapter);
291 Ok(())
292 }
293
294 pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
296 let client_id = client.client_id();
297 let adapter = ExecutionClientAdapter::new(client);
298
299 log::debug!("Registered default client {client_id}");
300 self.default_client = Some(adapter);
301 }
302
303 #[must_use]
304 pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
306 self.clients.get(client_id).map(|a| a.client.as_ref())
307 }
308
309 #[must_use]
310 pub fn get_client_adapter_mut(
312 &mut self,
313 client_id: &ClientId,
314 ) -> Option<&mut ExecutionClientAdapter> {
315 if let Some(default) = &self.default_client
316 && &default.client_id == client_id
317 {
318 return self.default_client.as_mut();
319 }
320 self.clients.get_mut(client_id)
321 }
322
323 pub async fn generate_mass_status(
329 &mut self,
330 client_id: &ClientId,
331 lookback_mins: Option<u64>,
332 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
333 if let Some(client) = self.get_client_adapter_mut(client_id) {
334 client.generate_mass_status(lookback_mins).await
335 } else {
336 anyhow::bail!("Client {client_id} not found")
337 }
338 }
339
340 pub fn register_external_order(
345 &self,
346 client_order_id: ClientOrderId,
347 venue_order_id: VenueOrderId,
348 instrument_id: InstrumentId,
349 strategy_id: StrategyId,
350 ts_init: UnixNanos,
351 ) {
352 let venue = instrument_id.venue;
353 if let Some(client_id) = self.routing_map.get(&venue) {
354 if let Some(client) = self.clients.get(client_id) {
355 client.register_external_order(
356 client_order_id,
357 venue_order_id,
358 instrument_id,
359 strategy_id,
360 ts_init,
361 );
362 }
363 } else if let Some(default) = &self.default_client {
364 default.register_external_order(
365 client_order_id,
366 venue_order_id,
367 instrument_id,
368 strategy_id,
369 ts_init,
370 );
371 }
372 }
373
374 #[must_use]
375 pub fn client_ids(&self) -> Vec<ClientId> {
377 let mut ids: Vec<_> = self.clients.keys().copied().collect();
378
379 if let Some(default) = &self.default_client {
380 ids.push(default.client_id);
381 }
382 ids
383 }
384
385 #[must_use]
386 pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
388 let mut adapters: Vec<_> = self.clients.values_mut().collect();
389 if let Some(default) = &mut self.default_client {
390 adapters.push(default);
391 }
392 adapters
393 }
394
395 #[must_use]
396 pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
401 let mut client_ids: AHashSet<ClientId> = AHashSet::new();
402 let mut venues: AHashSet<Venue> = AHashSet::new();
403
404 for order in orders {
406 venues.insert(order.instrument_id().venue);
407 if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
408 client_ids.insert(*client_id);
409 }
410 }
411
412 let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
413
414 for client_id in &client_ids {
416 if let Some(adapter) = self.clients.get(client_id)
417 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
418 {
419 clients.push(adapter.client.as_ref());
420 }
421 }
422
423 for venue in &venues {
425 if let Some(client_id) = self.routing_map.get(venue) {
426 if let Some(adapter) = self.clients.get(client_id)
427 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
428 {
429 clients.push(adapter.client.as_ref());
430 }
431 } else if let Some(adapter) = &self.default_client
432 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
433 {
434 clients.push(adapter.client.as_ref());
435 }
436 }
437
438 clients
439 }
440
441 pub fn register_venue_routing(
447 &mut self,
448 client_id: ClientId,
449 venue: Venue,
450 ) -> anyhow::Result<()> {
451 if !self.clients.contains_key(&client_id) {
452 anyhow::bail!("No client registered with ID {client_id}");
453 }
454
455 self.routing_map.insert(venue, client_id);
456 log::info!("Set client {client_id} routing for {venue}");
457 Ok(())
458 }
459
460 pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
464 self.oms_overrides.insert(strategy_id, oms_type);
465 log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
466 }
467
468 pub fn register_external_order_claims(
476 &mut self,
477 strategy_id: StrategyId,
478 instrument_ids: HashSet<InstrumentId>,
479 ) -> anyhow::Result<()> {
480 for instrument_id in &instrument_ids {
482 if let Some(existing) = self.external_order_claims.get(instrument_id) {
483 anyhow::bail!(
484 "External order claim for {instrument_id} already exists for {existing}"
485 );
486 }
487 }
488
489 for instrument_id in &instrument_ids {
491 self.external_order_claims
492 .insert(*instrument_id, strategy_id);
493 }
494
495 if !instrument_ids.is_empty() {
496 log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
497 }
498
499 Ok(())
500 }
501
502 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
506 if self.clients.remove(&client_id).is_some() {
507 self.routing_map
509 .retain(|_, mapped_id| mapped_id != &client_id);
510 log::info!("Deregistered client {client_id}");
511 Ok(())
512 } else {
513 anyhow::bail!("No client registered with ID {client_id}")
514 }
515 }
516
517 pub async fn connect(&mut self) {
521 let futures: Vec<_> = self
522 .get_clients_mut()
523 .into_iter()
524 .map(|client| client.connect())
525 .collect();
526
527 let results = join_all(futures).await;
528
529 for error in results.into_iter().filter_map(Result::err) {
530 log::error!("Failed to connect execution client: {error}");
531 }
532 }
533
534 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
540 let futures: Vec<_> = self
541 .get_clients_mut()
542 .into_iter()
543 .map(|client| client.disconnect())
544 .collect();
545
546 let results = join_all(futures).await;
547 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
548
549 if errors.is_empty() {
550 Ok(())
551 } else {
552 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
553 anyhow::bail!(
554 "Failed to disconnect execution clients: {}",
555 error_msgs.join("; ")
556 )
557 }
558 }
559
560 pub fn set_manage_own_order_books(&mut self, value: bool) {
562 self.config.manage_own_order_books = value;
563 }
564
565 pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
567 self.config.convert_quote_qty_to_base = value;
568 }
569
570 pub fn start_snapshot_timer(&mut self) {
574 if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
575 log::info!("Starting position snapshots timer at {interval_secs} second intervals");
576 }
577 }
578
579 pub fn stop_snapshot_timer(&mut self) {
581 if self.config.snapshot_positions_interval_secs.is_some() {
582 log::info!("Canceling position snapshots timer");
583 }
584 }
585
586 pub fn snapshot_open_position_states(&self) {
588 let positions: Vec<Position> = self
589 .cache
590 .borrow()
591 .positions_open(None, None, None, None, None)
592 .into_iter()
593 .cloned()
594 .collect();
595
596 for position in positions {
597 self.create_position_state_snapshot(&position);
598 }
599 }
600
601 #[allow(clippy::await_holding_refcell_ref)]
602 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
608 let ts = SystemTime::now();
609
610 {
611 let mut cache = self.cache.borrow_mut();
612 cache.clear_index();
613 cache.cache_general()?;
614 self.cache.borrow_mut().cache_all().await?;
615 cache.build_index();
616 let _ = cache.check_integrity();
617
618 if self.config.manage_own_order_books {
619 for order in cache.orders(None, None, None, None, None) {
620 if order.is_closed() || !should_handle_own_book_order(order) {
621 continue;
622 }
623 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
624 own_book.add(order.to_own_book_order());
625 }
626 }
627 }
628
629 self.set_position_id_counts();
630
631 log::info!(
632 "Loaded cache in {}ms",
633 SystemTime::now()
634 .duration_since(ts)
635 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
636 .as_millis()
637 );
638
639 Ok(())
640 }
641
642 pub fn flush_db(&self) {
644 self.cache.borrow_mut().flush_db();
645 }
646
647 pub fn reconcile_execution_report(&mut self, report: ExecutionReport) {
649 match &report {
650 ExecutionReport::Order(order_report) => {
651 self.reconcile_order_status_report(order_report);
652 }
653 ExecutionReport::Fill(fill_report) => {
654 self.reconcile_fill_report(fill_report);
655 }
656 ExecutionReport::Position(position_report) => {
657 self.reconcile_position_report(position_report);
658 }
659 ExecutionReport::MassStatus(mass_status) => {
660 self.reconcile_execution_mass_status(mass_status);
661 }
662 }
663 }
664
665 pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
671 let cache = self.cache.borrow();
672
673 let order = report
674 .client_order_id
675 .and_then(|id| cache.order(&id).cloned())
676 .or_else(|| {
677 cache
678 .client_order_id(&report.venue_order_id)
679 .and_then(|cid| cache.order(cid).cloned())
680 });
681
682 let Some(order) = order else {
683 log::debug!(
684 "Order not found in cache for reconciliation: client_order_id={:?}, venue_order_id={}",
685 report.client_order_id,
686 report.venue_order_id
687 );
688 return;
689 };
690
691 let instrument = cache.instrument(&report.instrument_id).cloned();
692
693 drop(cache);
694
695 let ts_now = self.clock.borrow().timestamp_ns();
696
697 if let Some(event) = reconcile_order_report(&order, report, instrument.as_ref(), ts_now) {
698 self.handle_event(&event);
699 }
700 }
701
702 pub fn reconcile_fill_report(&mut self, report: &FillReport) {
707 let cache = self.cache.borrow();
708
709 let order = report
710 .client_order_id
711 .and_then(|id| cache.order(&id).cloned())
712 .or_else(|| {
713 cache
714 .client_order_id(&report.venue_order_id)
715 .and_then(|cid| cache.order(cid).cloned())
716 });
717
718 let Some(order) = order else {
719 log::warn!(
720 "Cannot reconcile fill report: order not found for venue_order_id={}, client_order_id={:?}",
721 report.venue_order_id,
722 report.client_order_id
723 );
724 return;
725 };
726
727 let instrument = cache.instrument(&report.instrument_id).cloned();
728
729 drop(cache);
730
731 let Some(instrument) = instrument else {
732 log::debug!(
733 "Cannot reconcile fill report for {}: instrument {} not found",
734 order.client_order_id(),
735 report.instrument_id
736 );
737 return;
738 };
739
740 let ts_now = self.clock.borrow().timestamp_ns();
741
742 if let Some(event) = reconcile_fill(
743 &order,
744 report,
745 &instrument,
746 ts_now,
747 self.config.allow_overfills,
748 ) {
749 self.handle_event(&event);
750 }
751 }
752
753 pub fn reconcile_position_report(&mut self, report: &PositionStatusReport) {
758 let cache = self.cache.borrow();
759
760 let size_precision = cache
761 .instrument(&report.instrument_id)
762 .map(|i| i.size_precision());
763
764 if report.venue_position_id.is_some() {
765 self.reconcile_position_report_hedging(report, &cache);
766 } else {
767 self.reconcile_position_report_netting(report, &cache, size_precision);
768 }
769 }
770
771 fn reconcile_position_report_hedging(&self, report: &PositionStatusReport, cache: &Cache) {
772 let venue_position_id = report.venue_position_id.as_ref().unwrap();
773
774 log::info!(
775 "Reconciling HEDGE position for {}, venue_position_id={}",
776 report.instrument_id,
777 venue_position_id
778 );
779
780 let Some(position) = cache.position(venue_position_id) else {
781 log::error!("Cannot reconcile position: {venue_position_id} not found in cache");
782 return;
783 };
784
785 let cached_signed_qty = match position.side {
786 PositionSide::Long => position.quantity.as_decimal(),
787 PositionSide::Short => -position.quantity.as_decimal(),
788 _ => Decimal::ZERO,
789 };
790 let venue_signed_qty = report.signed_decimal_qty;
791
792 if cached_signed_qty != venue_signed_qty {
793 log::error!(
794 "Position mismatch for {} {}: cached={}, venue={}",
795 report.instrument_id,
796 venue_position_id,
797 cached_signed_qty,
798 venue_signed_qty
799 );
800 }
801 }
802
803 fn reconcile_position_report_netting(
804 &self,
805 report: &PositionStatusReport,
806 cache: &Cache,
807 size_precision: Option<u8>,
808 ) {
809 log::info!("Reconciling NET position for {}", report.instrument_id);
810
811 let positions_open =
812 cache.positions_open(None, Some(&report.instrument_id), None, None, None);
813
814 let cached_signed_qty: Decimal = positions_open
816 .iter()
817 .map(|p| match p.side {
818 PositionSide::Long => p.quantity.as_decimal(),
819 PositionSide::Short => -p.quantity.as_decimal(),
820 _ => Decimal::ZERO,
821 })
822 .sum();
823
824 log::info!(
825 "Position report: venue_signed_qty={}, cached_signed_qty={}",
826 report.signed_decimal_qty,
827 cached_signed_qty
828 );
829
830 let _ = check_position_reconciliation(report, cached_signed_qty, size_precision);
831 }
832
833 pub fn reconcile_execution_mass_status(&mut self, mass_status: &ExecutionMassStatus) {
838 log::info!(
839 "Reconciling mass status for client={}, account={}, venue={}",
840 mass_status.client_id,
841 mass_status.account_id,
842 mass_status.venue
843 );
844
845 for order_report in mass_status.order_reports().values() {
846 self.reconcile_order_status_report(order_report);
847 }
848
849 for fill_reports in mass_status.fill_reports().values() {
850 for fill_report in fill_reports {
851 self.reconcile_fill_report(fill_report);
852 }
853 }
854
855 for position_reports in mass_status.position_reports().values() {
856 for position_report in position_reports {
857 self.reconcile_position_report(position_report);
858 }
859 }
860
861 log::info!(
862 "Mass status reconciliation complete: {} orders, {} fills, {} positions",
863 mass_status.order_reports().len(),
864 mass_status
865 .fill_reports()
866 .values()
867 .map(|v| v.len())
868 .sum::<usize>(),
869 mass_status
870 .position_reports()
871 .values()
872 .map(|v| v.len())
873 .sum::<usize>()
874 );
875 }
876
877 pub fn execute(&self, command: TradingCommand) {
879 self.execute_command(&command);
880 }
881
882 pub fn process(&mut self, event: OrderEventAny) {
884 self.handle_event(&event);
885 }
886
887 pub fn start(&mut self) {
889 self.start_snapshot_timer();
890
891 log::info!("Started");
892 }
893
894 pub fn stop(&mut self) {
896 self.stop_snapshot_timer();
897
898 log::info!("Stopped");
899 }
900
901 pub fn reset(&mut self) {
903 self.pos_id_generator.reset();
904
905 log::info!("Reset");
906 }
907
908 pub fn dispose(&mut self) {
910 log::info!("Disposed");
911 }
912
913 fn execute_command(&self, command: &TradingCommand) {
914 if self.config.debug {
915 log::debug!("{RECV}{CMD} {command:?}");
916 }
917
918 if let Some(cid) = command.client_id()
919 && self.external_clients.contains(&cid)
920 {
921 if self.config.debug {
922 log::debug!("Skipping execution command for external client {cid}: {command:?}");
923 }
924 return;
925 }
926
927 let client = if let Some(adapter) = command
928 .client_id()
929 .and_then(|cid| self.clients.get(&cid))
930 .or_else(|| {
931 self.routing_map
932 .get(&command.instrument_id().venue)
933 .and_then(|client_id| self.clients.get(client_id))
934 })
935 .or(self.default_client.as_ref())
936 {
937 adapter.client.as_ref()
938 } else {
939 log::error!(
940 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
941 command.client_id(),
942 command.instrument_id().venue,
943 );
944 return;
945 };
946
947 match command {
948 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
949 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
950 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
951 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
952 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
953 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
954 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
955 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
956 }
957 }
958
959 fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
960 let client_order_id = cmd.client_order_id;
961
962 let mut order = {
963 let cache = self.cache.borrow();
964 match cache.order(&client_order_id) {
965 Some(order) => order.clone(),
966 None => {
967 log::error!(
968 "Cannot handle submit order: order not found in cache for {client_order_id}"
969 );
970 return;
971 }
972 }
973 };
974
975 let order_venue = order.instrument_id().venue;
976 let client_venue = client.venue();
977 if order_venue != client_venue {
978 self.deny_order(
979 &order,
980 &format!("Order venue {order_venue} does not match client venue {client_venue}"),
981 );
982 return;
983 }
984
985 let instrument_id = order.instrument_id();
986
987 if self.config.snapshot_orders {
988 self.create_order_state_snapshot(&order);
989 }
990
991 let instrument = {
992 let cache = self.cache.borrow();
993 if let Some(instrument) = cache.instrument(&instrument_id) {
994 instrument.clone()
995 } else {
996 log::error!(
997 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
998 );
999 return;
1000 }
1001 };
1002
1003 if self.config.convert_quote_qty_to_base
1005 && !instrument.is_inverse()
1006 && order.is_quote_quantity()
1007 {
1008 log::warn!(
1009 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
1010 );
1011 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
1012
1013 if let Some(price) = last_px {
1014 let base_qty = instrument.get_base_quantity(order.quantity(), price);
1015 self.set_order_base_qty(&mut order, base_qty);
1016 } else {
1017 self.deny_order(
1018 &order,
1019 &format!("no-price-to-convert-quote-qty {instrument_id}"),
1020 );
1021 return;
1022 }
1023 }
1024
1025 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
1026 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
1027 own_book.add(order.to_own_book_order());
1028 }
1029
1030 if let Err(e) = client.submit_order(cmd) {
1031 self.deny_order(&order, &format!("failed-to-submit-order-to-client: {e}"));
1032 }
1033 }
1034
1035 fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
1036 let orders: Vec<OrderAny> = self
1037 .cache
1038 .borrow()
1039 .orders_for_ids(&cmd.order_list.client_order_ids, cmd);
1040
1041 if orders.len() != cmd.order_list.client_order_ids.len() {
1042 for order in &orders {
1043 self.deny_order(
1044 order,
1045 &format!("Incomplete order list: missing orders in cache for {cmd}"),
1046 );
1047 }
1048 return;
1049 }
1050
1051 let order_list_venue = cmd.instrument_id.venue;
1052 let client_venue = client.venue();
1053 if order_list_venue != client_venue {
1054 for order in &orders {
1055 self.deny_order(
1056 order,
1057 &format!("Order list venue {order_list_venue} does not match client venue {client_venue}"),
1058 );
1059 }
1060 return;
1061 }
1062
1063 if self.config.snapshot_orders {
1064 for order in &orders {
1065 self.create_order_state_snapshot(order);
1066 }
1067 }
1068
1069 let instrument = {
1070 let cache = self.cache.borrow();
1071 if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
1072 instrument.clone()
1073 } else {
1074 log::error!(
1075 "Cannot handle submit order list: no instrument found for {}, {cmd}",
1076 cmd.instrument_id,
1077 );
1078 return;
1079 }
1080 };
1081
1082 if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
1084 let mut conversions: Vec<(ClientOrderId, Quantity)> = Vec::with_capacity(orders.len());
1085
1086 for order in &orders {
1087 if !order.is_quote_quantity() {
1088 continue; }
1090
1091 let last_px =
1092 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
1093
1094 if let Some(px) = last_px {
1095 let base_qty = instrument.get_base_quantity(order.quantity(), px);
1096 conversions.push((order.client_order_id(), base_qty));
1097 } else {
1098 for order in &orders {
1099 self.deny_order(
1100 order,
1101 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
1102 );
1103 }
1104 return; }
1106 }
1107
1108 if !conversions.is_empty() {
1109 log::warn!(
1110 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
1111 );
1112
1113 let mut cache = self.cache.borrow_mut();
1114 for (client_order_id, base_qty) in conversions {
1115 if let Some(mut_order) = cache.mut_order(&client_order_id) {
1116 self.set_order_base_qty(mut_order, base_qty);
1117 }
1118 }
1119 }
1120 }
1121
1122 if self.config.manage_own_order_books {
1123 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
1124 for order in &orders {
1125 if should_handle_own_book_order(order) {
1126 own_book.add(order.to_own_book_order());
1127 }
1128 }
1129 }
1130
1131 if let Err(e) = client.submit_order_list(cmd) {
1132 log::error!("Error submitting order list to client: {e}");
1133 for order in &orders {
1134 self.deny_order(
1135 order,
1136 &format!("failed-to-submit-order-list-to-client: {e}"),
1137 );
1138 }
1139 }
1140 }
1141
1142 fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
1143 if let Err(e) = client.modify_order(cmd) {
1144 log::error!("Error modifying order: {e}");
1145 }
1146 }
1147
1148 fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
1149 if let Err(e) = client.cancel_order(cmd) {
1150 log::error!("Error canceling order: {e}");
1151 }
1152 }
1153
1154 fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
1155 if let Err(e) = client.cancel_all_orders(cmd) {
1156 log::error!("Error canceling all orders: {e}");
1157 }
1158 }
1159
1160 fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
1161 if let Err(e) = client.batch_cancel_orders(cmd) {
1162 log::error!("Error batch canceling orders: {e}");
1163 }
1164 }
1165
1166 fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
1167 if let Err(e) = client.query_account(cmd) {
1168 log::error!("Error querying account: {e}");
1169 }
1170 }
1171
1172 fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
1173 if let Err(e) = client.query_order(cmd) {
1174 log::error!("Error querying order: {e}");
1175 }
1176 }
1177
1178 fn create_order_state_snapshot(&self, order: &OrderAny) {
1179 if self.config.debug {
1180 log::debug!("Creating order state snapshot for {order}");
1181 }
1182
1183 if self.cache.borrow().has_backing()
1184 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
1185 {
1186 log::error!("Failed to snapshot order state: {e}");
1187 }
1188 }
1189
1190 fn create_position_state_snapshot(&self, position: &Position) {
1191 if self.config.debug {
1192 log::debug!("Creating position state snapshot for {position}");
1193 }
1194
1195 }
1200
1201 fn handle_event(&mut self, event: &OrderEventAny) {
1202 if self.config.debug {
1203 log::debug!("{RECV}{EVT} {event:?}");
1204 }
1205
1206 let client_order_id = event.client_order_id();
1207 let cache = self.cache.borrow();
1208 let mut order = if let Some(order) = cache.order(&client_order_id) {
1209 order.clone()
1210 } else {
1211 log::warn!(
1212 "Order with {} not found in the cache to apply {}",
1213 event.client_order_id(),
1214 event
1215 );
1216
1217 let venue_order_id = if let Some(id) = event.venue_order_id() {
1219 id
1220 } else {
1221 log::error!(
1222 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
1223 event.client_order_id()
1224 );
1225 return;
1226 };
1227
1228 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
1230 id
1231 } else {
1232 log::error!(
1233 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
1234 event.client_order_id(),
1235 );
1236 return;
1237 };
1238
1239 if let Some(order) = cache.order(client_order_id) {
1241 log::info!("Order with {client_order_id} was found in the cache");
1242 order.clone()
1243 } else {
1244 log::error!(
1245 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
1246 );
1247 return;
1248 }
1249 };
1250
1251 drop(cache);
1252
1253 match event {
1254 OrderEventAny::Filled(fill) => {
1255 let oms_type = self.determine_oms_type(fill);
1256 let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
1257
1258 let mut fill = *fill;
1259 if fill.position_id.is_none() {
1260 fill.position_id = Some(position_id);
1261 }
1262
1263 if self.apply_fill_to_order(&mut order, fill).is_ok() {
1264 self.handle_order_fill(&order, fill, oms_type);
1265 }
1266 }
1267 _ => {
1268 let _ = self.apply_event_to_order(&mut order, event.clone());
1269 }
1270 }
1271 }
1272
1273 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
1274 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
1276 return *oms_type;
1277 }
1278
1279 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
1281 && let Some(client) = self.clients.get(client_id)
1282 {
1283 return client.oms_type();
1284 }
1285
1286 if let Some(client) = &self.default_client {
1287 return client.oms_type();
1288 }
1289
1290 OmsType::Netting }
1292
1293 fn determine_position_id(
1294 &mut self,
1295 fill: OrderFilled,
1296 oms_type: OmsType,
1297 order: Option<&OrderAny>,
1298 ) -> PositionId {
1299 match oms_type {
1300 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
1301 OmsType::Netting => self.determine_netting_position_id(fill),
1302 _ => self.determine_netting_position_id(fill), }
1304 }
1305
1306 fn determine_hedging_position_id(
1307 &mut self,
1308 fill: OrderFilled,
1309 order: Option<&OrderAny>,
1310 ) -> PositionId {
1311 if let Some(position_id) = fill.position_id {
1313 if self.config.debug {
1314 log::debug!("Already had a position ID of: {position_id}");
1315 }
1316 return position_id;
1317 }
1318
1319 let cache = self.cache.borrow();
1320
1321 let order = if let Some(o) = order {
1322 o
1323 } else {
1324 match cache.order(&fill.client_order_id()) {
1325 Some(o) => o,
1326 None => {
1327 panic!(
1328 "Order for {} not found to determine position ID",
1329 fill.client_order_id()
1330 );
1331 }
1332 }
1333 };
1334
1335 if let Some(spawn_id) = order.exec_spawn_id() {
1337 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
1338 for spawned_order in spawn_orders {
1339 if let Some(pos_id) = spawned_order.position_id() {
1340 if self.config.debug {
1341 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
1342 }
1343 return pos_id;
1344 }
1345 }
1346 }
1347
1348 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
1350 if self.config.debug {
1351 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
1352 }
1353 position_id
1354 }
1355
1356 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
1357 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
1358 }
1359
1360 fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
1361 if order.is_duplicate_fill(&fill) {
1362 log::warn!(
1363 "Duplicate fill: {} trade_id={} already applied, skipping",
1364 order.client_order_id(),
1365 fill.trade_id
1366 );
1367 anyhow::bail!("Duplicate fill");
1368 }
1369
1370 self.check_overfill(order, &fill)?;
1371 let event = OrderEventAny::Filled(fill);
1372 self.apply_order_event(order, event)
1373 }
1374
1375 fn apply_event_to_order(
1376 &self,
1377 order: &mut OrderAny,
1378 event: OrderEventAny,
1379 ) -> anyhow::Result<()> {
1380 self.apply_order_event(order, event)
1381 }
1382
1383 fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
1384 if let Err(e) = order.apply(event.clone()) {
1385 match e {
1386 OrderError::InvalidStateTransition => {
1387 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
1390 }
1391 OrderError::DuplicateFill(trade_id) => {
1392 log::warn!(
1394 "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
1395 );
1396 anyhow::bail!("{e}");
1397 }
1398 _ => {
1399 log::error!("Error applying event: {e}, did not apply {event}");
1401 if should_handle_own_book_order(order) {
1402 self.cache.borrow_mut().update_own_order_book(order);
1403 }
1404 anyhow::bail!("{e}");
1405 }
1406 }
1407 }
1408
1409 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1410 log::error!("Error updating order in cache: {e}");
1411 }
1412
1413 if self.config.debug {
1414 log::debug!("{SEND}{EVT} {event}");
1415 }
1416
1417 let topic = switchboard::get_event_orders_topic(event.strategy_id());
1418 msgbus::publish_order_event(topic, &event);
1419
1420 if self.config.snapshot_orders {
1421 self.create_order_state_snapshot(order);
1422 }
1423
1424 Ok(())
1425 }
1426
1427 fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1428 let potential_overfill = order.calculate_overfill(fill.last_qty);
1429
1430 if potential_overfill.is_positive() {
1431 if self.config.allow_overfills {
1432 log::warn!(
1433 "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1434 order.client_order_id(),
1435 potential_overfill,
1436 order.filled_qty(),
1437 fill.last_qty,
1438 order.quantity()
1439 );
1440 } else {
1441 let msg = format!(
1442 "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1443 Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1444 order.client_order_id(),
1445 potential_overfill,
1446 order.filled_qty(),
1447 fill.last_qty,
1448 order.quantity()
1449 );
1450 anyhow::bail!("{msg}");
1451 }
1452 }
1453
1454 Ok(())
1455 }
1456
1457 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1458 let instrument =
1459 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1460 instrument.clone()
1461 } else {
1462 log::error!(
1463 "Cannot handle order fill: no instrument found for {}, {fill}",
1464 fill.instrument_id,
1465 );
1466 return;
1467 };
1468
1469 if self.cache.borrow().account(&fill.account_id).is_none() {
1470 log::error!(
1471 "Cannot handle order fill: no account found for {}, {fill}",
1472 fill.instrument_id.venue,
1473 );
1474 return;
1475 }
1476
1477 let position = if instrument.is_spread() {
1480 None
1481 } else {
1482 self.handle_position_update(instrument.clone(), fill, oms_type);
1483 let position_id = fill.position_id.unwrap();
1484 self.cache.borrow().position(&position_id).cloned()
1485 };
1486
1487 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1490 if !instrument.is_spread()
1492 && let Some(ref pos) = position
1493 && pos.is_open()
1494 {
1495 let position_id = pos.id;
1496 for client_order_id in order.linked_order_ids().unwrap_or_default() {
1497 let mut cache = self.cache.borrow_mut();
1498 let contingent_order = cache.mut_order(client_order_id);
1499 if let Some(contingent_order) = contingent_order
1500 && contingent_order.position_id().is_none()
1501 {
1502 contingent_order.set_position_id(Some(position_id));
1503
1504 if let Err(e) = self.cache.borrow_mut().add_position_id(
1505 &position_id,
1506 &contingent_order.instrument_id().venue,
1507 &contingent_order.client_order_id(),
1508 &contingent_order.strategy_id(),
1509 ) {
1510 log::error!("Failed to add position ID: {e}");
1511 }
1512 }
1513 }
1514 }
1515 }
1518 }
1519
1520 fn handle_position_update(
1524 &mut self,
1525 instrument: InstrumentAny,
1526 fill: OrderFilled,
1527 oms_type: OmsType,
1528 ) {
1529 let position_id = if let Some(position_id) = fill.position_id {
1530 position_id
1531 } else {
1532 log::error!("Cannot handle position update: no position ID found for fill {fill}");
1533 return;
1534 };
1535
1536 let position_opt = self.cache.borrow().position(&position_id).cloned();
1537
1538 match position_opt {
1539 None => {
1540 if self.open_position(instrument, None, fill, oms_type).is_ok() {
1542 }
1544 }
1545 Some(pos) if pos.is_closed() => {
1546 if self
1548 .open_position(instrument, Some(&pos), fill, oms_type)
1549 .is_ok()
1550 {
1551 }
1553 }
1554 Some(mut pos) => {
1555 if self.will_flip_position(&pos, fill) {
1556 self.flip_position(instrument, &mut pos, fill, oms_type);
1558 } else {
1559 self.update_position(&mut pos, fill);
1561 }
1562 }
1563 }
1564 }
1565
1566 fn open_position(
1567 &self,
1568 instrument: InstrumentAny,
1569 position: Option<&Position>,
1570 fill: OrderFilled,
1571 oms_type: OmsType,
1572 ) -> anyhow::Result<()> {
1573 if let Some(position) = position {
1574 if Self::is_duplicate_closed_fill(position, &fill) {
1575 log::warn!(
1576 "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1577 fill.trade_id,
1578 position.id,
1579 fill.order_side,
1580 fill.last_qty,
1581 fill.last_px
1582 );
1583 return Ok(());
1584 }
1585 self.reopen_position(position, oms_type)?;
1586 }
1587
1588 let position = Position::new(&instrument, fill);
1589 self.cache
1590 .borrow_mut()
1591 .add_position(position.clone(), oms_type)?; if self.config.snapshot_positions {
1594 self.create_position_state_snapshot(&position);
1595 }
1596
1597 let ts_init = self.clock.borrow().timestamp_ns();
1598 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1599 let topic = switchboard::get_event_positions_topic(event.strategy_id);
1600 msgbus::publish_position_event(topic, &PositionEvent::PositionOpened(event));
1601
1602 Ok(())
1603 }
1604
1605 fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1606 position.events.iter().any(|event| {
1607 event.trade_id == fill.trade_id
1608 && event.order_side == fill.order_side
1609 && event.last_px == fill.last_px
1610 && event.last_qty == fill.last_qty
1611 })
1612 }
1613
1614 fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1615 if oms_type == OmsType::Netting {
1616 if position.is_open() {
1617 anyhow::bail!(
1618 "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1619 position.id
1620 );
1621 }
1622 self.cache.borrow_mut().snapshot_position(position)?;
1624 } else {
1625 log::warn!(
1627 "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1628 position.id
1629 );
1630 }
1631 Ok(())
1632 }
1633
1634 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1635 position.apply(&fill);
1637
1638 let is_closed = position.is_closed();
1640
1641 if let Err(e) = self.cache.borrow_mut().update_position(position) {
1643 log::error!("Failed to update position: {e:?}");
1644 return;
1645 }
1646
1647 let cache = self.cache.borrow();
1649
1650 drop(cache);
1651
1652 if self.config.snapshot_positions {
1654 self.create_position_state_snapshot(position);
1655 }
1656
1657 let topic = switchboard::get_event_positions_topic(position.strategy_id);
1659 let ts_init = self.clock.borrow().timestamp_ns();
1660
1661 if is_closed {
1662 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1663 msgbus::publish_position_event(topic, &PositionEvent::PositionClosed(event));
1664 } else {
1665 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1666 msgbus::publish_position_event(topic, &PositionEvent::PositionChanged(event));
1667 }
1668 }
1669
1670 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1671 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1672 }
1673
1674 fn flip_position(
1675 &mut self,
1676 instrument: InstrumentAny,
1677 position: &mut Position,
1678 fill: OrderFilled,
1679 oms_type: OmsType,
1680 ) {
1681 let difference = match position.side {
1682 PositionSide::Long => Quantity::from_raw(
1683 fill.last_qty.raw - position.quantity.raw,
1684 position.size_precision,
1685 ),
1686 PositionSide::Short => Quantity::from_raw(
1687 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
1689 ),
1690 _ => fill.last_qty,
1691 };
1692
1693 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1695 let (commission1, commission2) = if let Some(commission) = fill.commission {
1696 let commission_currency = commission.currency;
1697 let commission1 = Money::new(commission * fill_percent, commission_currency);
1698 let commission2 = commission - commission1;
1699 (Some(commission1), Some(commission2))
1700 } else {
1701 log::error!("Commission is not available");
1702 (None, None)
1703 };
1704
1705 let mut fill_split1: Option<OrderFilled> = None;
1706 if position.is_open() {
1707 fill_split1 = Some(OrderFilled::new(
1708 fill.trader_id,
1709 fill.strategy_id,
1710 fill.instrument_id,
1711 fill.client_order_id,
1712 fill.venue_order_id,
1713 fill.account_id,
1714 fill.trade_id,
1715 fill.order_side,
1716 fill.order_type,
1717 position.quantity,
1718 fill.last_px,
1719 fill.currency,
1720 fill.liquidity_side,
1721 UUID4::new(),
1722 fill.ts_event,
1723 fill.ts_init,
1724 fill.reconciliation,
1725 fill.position_id,
1726 commission1,
1727 ));
1728
1729 self.update_position(position, fill_split1.unwrap());
1730
1731 if oms_type == OmsType::Netting
1733 && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1734 {
1735 log::error!("Failed to snapshot position during flip: {e:?}");
1736 }
1737 }
1738
1739 if difference.raw == 0 {
1741 log::warn!(
1742 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1743 );
1744 return;
1745 }
1746
1747 let position_id_flip = if oms_type == OmsType::Hedging
1748 && let Some(position_id) = fill.position_id
1749 && position_id.is_virtual()
1750 {
1751 Some(self.pos_id_generator.generate(fill.strategy_id, true))
1753 } else {
1754 fill.position_id
1756 };
1757
1758 let fill_split2 = OrderFilled::new(
1759 fill.trader_id,
1760 fill.strategy_id,
1761 fill.instrument_id,
1762 fill.client_order_id,
1763 fill.venue_order_id,
1764 fill.account_id,
1765 fill.trade_id,
1766 fill.order_side,
1767 fill.order_type,
1768 difference,
1769 fill.last_px,
1770 fill.currency,
1771 fill.liquidity_side,
1772 UUID4::new(),
1773 fill.ts_event,
1774 fill.ts_init,
1775 fill.reconciliation,
1776 position_id_flip,
1777 commission2,
1778 );
1779
1780 if oms_type == OmsType::Hedging
1781 && let Some(position_id) = fill.position_id
1782 && position_id.is_virtual()
1783 {
1784 log::warn!("Closing position {fill_split1:?}");
1785 log::warn!("Flipping position {fill_split2:?}");
1786 }
1787
1788 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1790 log::error!("Failed to open flipped position: {e:?}");
1791 }
1792 }
1793
1794 pub fn set_position_id_counts(&mut self) {
1796 let cache = self.cache.borrow();
1797 let positions = cache.positions(None, None, None, None, None);
1798
1799 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1801
1802 for position in positions {
1803 *counts.entry(position.strategy_id).or_insert(0) += 1;
1804 }
1805
1806 self.pos_id_generator.reset();
1807
1808 for (strategy_id, count) in counts {
1809 self.pos_id_generator.set_count(count, strategy_id);
1810 log::info!("Set PositionId count for {strategy_id} to {count}");
1811 }
1812 }
1813
1814 fn last_px_for_conversion(
1815 &self,
1816 instrument_id: &InstrumentId,
1817 side: OrderSide,
1818 ) -> Option<Price> {
1819 let cache = self.cache.borrow();
1820
1821 if let Some(trade) = cache.trade(instrument_id) {
1823 return Some(trade.price);
1824 }
1825
1826 if let Some(quote) = cache.quote(instrument_id) {
1828 match side {
1829 OrderSide::Buy => Some(quote.ask_price),
1830 OrderSide::Sell => Some(quote.bid_price),
1831 OrderSide::NoOrderSide => None,
1832 }
1833 } else {
1834 None
1835 }
1836 }
1837
1838 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1839 log::info!(
1840 "Setting {} order quote quantity {} to base quantity {}",
1841 order.instrument_id(),
1842 order.quantity(),
1843 base_qty
1844 );
1845
1846 let original_qty = order.quantity();
1847 order.set_quantity(base_qty);
1848 order.set_leaves_qty(base_qty);
1849 order.set_is_quote_quantity(false);
1850
1851 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1852 return;
1853 }
1854
1855 if let Some(linked_order_ids) = order.linked_order_ids() {
1856 for client_order_id in linked_order_ids {
1857 match self.cache.borrow_mut().mut_order(client_order_id) {
1858 Some(contingent_order) => {
1859 if !contingent_order.is_quote_quantity() {
1860 continue; }
1862
1863 if contingent_order.quantity() != original_qty {
1864 log::warn!(
1865 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1866 contingent_order.quantity(),
1867 original_qty,
1868 base_qty
1869 );
1870 }
1871
1872 log::info!(
1873 "Setting {} order quote quantity {} to base quantity {}",
1874 contingent_order.instrument_id(),
1875 contingent_order.quantity(),
1876 base_qty
1877 );
1878
1879 contingent_order.set_quantity(base_qty);
1880 contingent_order.set_leaves_qty(base_qty);
1881 contingent_order.set_is_quote_quantity(false);
1882 }
1883 None => {
1884 log::error!("Contingency order {client_order_id} not found");
1885 }
1886 }
1887 }
1888 } else {
1889 log::warn!(
1890 "No linked order IDs found for order {}",
1891 order.client_order_id()
1892 );
1893 }
1894 }
1895
1896 fn deny_order(&self, order: &OrderAny, reason: &str) {
1897 let denied = OrderDenied::new(
1898 order.trader_id(),
1899 order.strategy_id(),
1900 order.instrument_id(),
1901 order.client_order_id(),
1902 reason.into(),
1903 UUID4::new(),
1904 self.clock.borrow().timestamp_ns(),
1905 self.clock.borrow().timestamp_ns(),
1906 );
1907
1908 let mut order = order.clone();
1909
1910 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1911 log::error!("Failed to apply denied event to order: {e}");
1912 return;
1913 }
1914
1915 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1916 log::error!("Failed to update order in cache: {e}");
1917 return;
1918 }
1919
1920 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1921 msgbus::publish_order_event(topic, &OrderEventAny::Denied(denied));
1922
1923 if self.config.snapshot_orders {
1924 self.create_order_state_snapshot(&order);
1925 }
1926 }
1927
1928 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1929 let mut cache = self.cache.borrow_mut();
1930 if cache.own_order_book_mut(instrument_id).is_none() {
1931 let own_book = OwnOrderBook::new(*instrument_id);
1932 cache.add_own_order_book(own_book).unwrap();
1933 }
1934
1935 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1936 }
1937}