nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24pub mod stubs;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30    cell::{RefCell, RefMut},
31    collections::{HashMap, HashSet},
32    fmt::Debug,
33    rc::Rc,
34    time::SystemTime,
35};
36
37use ahash::{AHashMap, AHashSet};
38use config::ExecutionEngineConfig;
39use futures::future::join_all;
40use nautilus_common::{
41    cache::Cache,
42    clock::Clock,
43    generators::position_id::PositionIdGenerator,
44    logging::{CMD, EVT, RECV, SEND},
45    messages::execution::{
46        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47        SubmitOrder, SubmitOrderList, TradingCommand,
48    },
49    msgbus::{
50        self, get_message_bus,
51        switchboard::{self},
52    },
53};
54use nautilus_core::UUID4;
55use nautilus_model::{
56    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
57    events::{
58        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
59        PositionOpened,
60    },
61    identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
62    instruments::{Instrument, InstrumentAny},
63    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
64    orders::{Order, OrderAny, OrderError},
65    position::Position,
66    types::{Money, Price, Quantity},
67};
68
69use crate::client::{ExecutionClient, ExecutionClientAdapter};
70
71/// Central execution engine responsible for orchestrating order routing and execution.
72///
73/// The execution engine manages the entire order lifecycle from submission to completion,
74/// handling routing to appropriate execution clients, position management, and event
75/// processing. It supports multiple execution venues through registered clients and
76/// provides sophisticated order management capabilities.
77pub struct ExecutionEngine {
78    clock: Rc<RefCell<dyn Clock>>,
79    cache: Rc<RefCell<Cache>>,
80    clients: AHashMap<ClientId, ExecutionClientAdapter>,
81    default_client: Option<ExecutionClientAdapter>,
82    routing_map: HashMap<Venue, ClientId>,
83    oms_overrides: HashMap<StrategyId, OmsType>,
84    external_order_claims: HashMap<InstrumentId, StrategyId>,
85    external_clients: HashSet<ClientId>,
86    pos_id_generator: PositionIdGenerator,
87    config: ExecutionEngineConfig,
88}
89
90impl Debug for ExecutionEngine {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct(stringify!(ExecutionEngine))
93            .field("client_count", &self.clients.len())
94            .finish()
95    }
96}
97
98impl ExecutionEngine {
99    /// Creates a new [`ExecutionEngine`] instance.
100    pub fn new(
101        clock: Rc<RefCell<dyn Clock>>,
102        cache: Rc<RefCell<Cache>>,
103        config: Option<ExecutionEngineConfig>,
104    ) -> Self {
105        let trader_id = get_message_bus().borrow().trader_id;
106        Self {
107            clock: clock.clone(),
108            cache,
109            clients: AHashMap::new(),
110            default_client: None,
111            routing_map: HashMap::new(),
112            oms_overrides: HashMap::new(),
113            external_order_claims: HashMap::new(),
114            external_clients: config
115                .as_ref()
116                .and_then(|c| c.external_clients.clone())
117                .unwrap_or_default()
118                .into_iter()
119                .collect(),
120            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
121            config: config.unwrap_or_default(),
122        }
123    }
124
125    #[must_use]
126    /// Returns the position ID count for the specified strategy.
127    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
128        self.pos_id_generator.count(strategy_id)
129    }
130
131    #[must_use]
132    /// Checks the integrity of cached execution data.
133    pub fn check_integrity(&self) -> bool {
134        self.cache.borrow_mut().check_integrity()
135    }
136
137    #[must_use]
138    /// Returns true if all registered execution clients are connected.
139    pub fn check_connected(&self) -> bool {
140        let clients_connected = self.clients.values().all(|c| c.is_connected());
141        let default_connected = self
142            .default_client
143            .as_ref()
144            .is_none_or(|c| c.is_connected());
145        clients_connected && default_connected
146    }
147
148    #[must_use]
149    /// Returns true if all registered execution clients are disconnected.
150    pub fn check_disconnected(&self) -> bool {
151        let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
152        let default_disconnected = self
153            .default_client
154            .as_ref()
155            .is_none_or(|c| !c.is_connected());
156        clients_disconnected && default_disconnected
157    }
158
159    #[must_use]
160    /// Checks for residual positions and orders in the cache.
161    pub fn check_residuals(&self) -> bool {
162        self.cache.borrow().check_residuals()
163    }
164
165    #[must_use]
166    /// Returns the set of instruments that have external order claims.
167    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
168        self.external_order_claims.keys().copied().collect()
169    }
170
171    #[must_use]
172    /// Returns the configured external client IDs.
173    pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
174        self.external_clients.clone()
175    }
176
177    #[must_use]
178    /// Returns any external order claim for the given instrument ID.
179    pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
180        self.external_order_claims.get(instrument_id).copied()
181    }
182
183    // -- REGISTRATION ----------------------------------------------------------------------------
184
185    /// Registers a new execution client.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if a client with the same ID is already registered.
190    pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
191        let client_id = client.client_id();
192        let venue = client.venue();
193
194        if self.clients.contains_key(&client_id) {
195            anyhow::bail!("Client already registered with ID {client_id}");
196        }
197
198        let adapter = ExecutionClientAdapter::new(client);
199
200        self.routing_map.insert(venue, client_id);
201
202        log::info!("Registered client {client_id}");
203        self.clients.insert(client_id, adapter);
204        Ok(())
205    }
206
207    /// Registers a default execution client for fallback routing.
208    pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
209        let client_id = client.client_id();
210        let adapter = ExecutionClientAdapter::new(client);
211
212        log::info!("Registered default client {client_id}");
213        self.default_client = Some(adapter);
214    }
215
216    #[must_use]
217    /// Returns a reference to the execution client registered with the given ID.
218    pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
219        self.clients.get(client_id).map(|a| a.client.as_ref())
220    }
221
222    #[must_use]
223    /// Returns mutable access to all registered execution clients.
224    pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
225        let mut adapters: Vec<_> = self.clients.values_mut().collect();
226        if let Some(default) = &mut self.default_client {
227            adapters.push(default);
228        }
229        adapters
230    }
231
232    #[must_use]
233    /// Returns execution clients that would handle the given orders.
234    ///
235    /// This method first attempts to resolve each order's originating client from the cache,
236    /// then falls back to venue routing for any orders without a cached client.
237    pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
238        let mut client_ids: AHashSet<ClientId> = AHashSet::new();
239        let mut venues: AHashSet<Venue> = AHashSet::new();
240
241        // Collect client IDs from cache and venues for fallback
242        for order in orders {
243            venues.insert(order.instrument_id().venue);
244            if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
245                client_ids.insert(*client_id);
246            }
247        }
248
249        let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
250
251        // Add clients for cached client IDs (orders go back to originating client)
252        for client_id in &client_ids {
253            if let Some(adapter) = self.clients.get(client_id)
254                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
255            {
256                clients.push(adapter.client.as_ref());
257            }
258        }
259
260        // Add clients for venue routing (for orders not in cache)
261        for venue in &venues {
262            if let Some(client_id) = self.routing_map.get(venue) {
263                if let Some(adapter) = self.clients.get(client_id)
264                    && !clients.iter().any(|c| c.client_id() == adapter.client_id)
265                {
266                    clients.push(adapter.client.as_ref());
267                }
268            } else if let Some(adapter) = &self.default_client
269                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
270            {
271                clients.push(adapter.client.as_ref());
272            }
273        }
274
275        clients
276    }
277
278    /// Sets routing for a specific venue to a given client ID.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the client ID is not registered.
283    pub fn register_venue_routing(
284        &mut self,
285        client_id: ClientId,
286        venue: Venue,
287    ) -> anyhow::Result<()> {
288        if !self.clients.contains_key(&client_id) {
289            anyhow::bail!("No client registered with ID {client_id}");
290        }
291
292        self.routing_map.insert(venue, client_id);
293        log::info!("Set client {client_id} routing for {venue}");
294        Ok(())
295    }
296
297    /// Registers the OMS (Order Management System) type for a strategy.
298    ///
299    /// If an OMS type is already registered for this strategy, it will be overridden.
300    pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
301        self.oms_overrides.insert(strategy_id, oms_type);
302        log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
303    }
304
305    /// Registers external order claims for a strategy.
306    ///
307    /// This operation is atomic: either all instruments are registered or none are.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if any instrument already has a registered claim.
312    pub fn register_external_order_claims(
313        &mut self,
314        strategy_id: StrategyId,
315        instrument_ids: HashSet<InstrumentId>,
316    ) -> anyhow::Result<()> {
317        // Validate all instruments first
318        for instrument_id in &instrument_ids {
319            if let Some(existing) = self.external_order_claims.get(instrument_id) {
320                anyhow::bail!(
321                    "External order claim for {instrument_id} already exists for {existing}"
322                );
323            }
324        }
325
326        // If validation passed, insert all claims
327        for instrument_id in &instrument_ids {
328            self.external_order_claims
329                .insert(*instrument_id, strategy_id);
330        }
331
332        if !instrument_ids.is_empty() {
333            log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
334        }
335
336        Ok(())
337    }
338
339    /// # Errors
340    ///
341    /// Returns an error if no client is registered with the given ID.
342    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
343        if self.clients.remove(&client_id).is_some() {
344            // Remove from routing map if present
345            self.routing_map
346                .retain(|_, mapped_id| mapped_id != &client_id);
347            log::info!("Deregistered client {client_id}");
348            Ok(())
349        } else {
350            anyhow::bail!("No client registered with ID {client_id}")
351        }
352    }
353
354    /// Connects all registered execution clients concurrently.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if any client fails to connect.
359    pub async fn connect(&mut self) -> anyhow::Result<()> {
360        let futures: Vec<_> = self
361            .get_clients_mut()
362            .into_iter()
363            .map(|client| client.connect())
364            .collect();
365
366        let results = join_all(futures).await;
367        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
368
369        if errors.is_empty() {
370            Ok(())
371        } else {
372            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
373            anyhow::bail!(
374                "Failed to connect execution clients: {}",
375                error_msgs.join("; ")
376            )
377        }
378    }
379
380    /// Disconnects all registered execution clients concurrently.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if any client fails to disconnect.
385    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
386        let futures: Vec<_> = self
387            .get_clients_mut()
388            .into_iter()
389            .map(|client| client.disconnect())
390            .collect();
391
392        let results = join_all(futures).await;
393        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
394
395        if errors.is_empty() {
396            Ok(())
397        } else {
398            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
399            anyhow::bail!(
400                "Failed to disconnect execution clients: {}",
401                error_msgs.join("; ")
402            )
403        }
404    }
405
406    /// Sets the `manage_own_order_books` configuration option.
407    pub fn set_manage_own_order_books(&mut self, value: bool) {
408        self.config.manage_own_order_books = value;
409    }
410
411    /// Sets the `convert_quote_qty_to_base` configuration option.
412    pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
413        self.config.convert_quote_qty_to_base = value;
414    }
415
416    /// Starts the position snapshot timer if configured.
417    ///
418    /// Timer functionality requires a live execution context with an active clock.
419    pub fn start_snapshot_timer(&mut self) {
420        if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
421            log::info!("Starting position snapshots timer at {interval_secs} second intervals");
422        }
423    }
424
425    /// Stops the position snapshot timer if running.
426    pub fn stop_snapshot_timer(&mut self) {
427        if self.config.snapshot_positions_interval_secs.is_some() {
428            log::info!("Canceling position snapshots timer");
429        }
430    }
431
432    /// Creates snapshots of all open positions.
433    pub fn snapshot_open_position_states(&self) {
434        let positions: Vec<Position> = self
435            .cache
436            .borrow()
437            .positions_open(None, None, None, None)
438            .into_iter()
439            .cloned()
440            .collect();
441
442        for position in positions {
443            self.create_position_state_snapshot(&position);
444        }
445    }
446
447    // -- COMMANDS --------------------------------------------------------------------------------
448
449    #[allow(clippy::await_holding_refcell_ref)]
450    /// Loads persistent state into cache and rebuilds indices.
451    ///
452    /// # Errors
453    ///
454    /// Returns an error if any cache operation fails.
455    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
456        let ts = SystemTime::now();
457
458        {
459            let mut cache = self.cache.borrow_mut();
460            cache.clear_index();
461            cache.cache_general()?;
462            self.cache.borrow_mut().cache_all().await?;
463            cache.build_index();
464            let _ = cache.check_integrity();
465
466            if self.config.manage_own_order_books {
467                for order in cache.orders(None, None, None, None) {
468                    if order.is_closed() || !should_handle_own_book_order(order) {
469                        continue;
470                    }
471                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
472                    own_book.add(order.to_own_book_order());
473                }
474            }
475        }
476
477        self.set_position_id_counts();
478
479        log::info!(
480            "Loaded cache in {}ms",
481            SystemTime::now()
482                .duration_since(ts)
483                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
484                .as_millis()
485        );
486
487        Ok(())
488    }
489
490    /// Flushes the database to persist all cached data.
491    pub fn flush_db(&self) {
492        self.cache.borrow_mut().flush_db();
493    }
494
495    /// Processes an order event, updating internal state and routing as needed.
496    pub fn process(&mut self, event: &OrderEventAny) {
497        self.handle_event(event);
498    }
499
500    /// Executes a trading command by routing it to the appropriate execution client.
501    pub fn execute(&self, command: &TradingCommand) {
502        self.execute_command(command);
503    }
504
505    // -- COMMAND HANDLERS ------------------------------------------------------------------------
506
507    fn execute_command(&self, command: &TradingCommand) {
508        if self.config.debug {
509            log::debug!("{RECV}{CMD} {command:?}");
510        }
511
512        if self.external_clients.contains(&command.client_id()) {
513            if self.config.debug {
514                let cid = command.client_id();
515                log::debug!("Skipping execution command for external client {cid}: {command:?}");
516            }
517            return;
518        }
519
520        let client = if let Some(adapter) = self
521            .clients
522            .get(&command.client_id())
523            .or_else(|| {
524                self.routing_map
525                    .get(&command.instrument_id().venue)
526                    .and_then(|client_id| self.clients.get(client_id))
527            })
528            .or(self.default_client.as_ref())
529        {
530            adapter.client.as_ref()
531        } else {
532            log::error!(
533                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
534                command.client_id(),
535                command.instrument_id().venue,
536            );
537            return;
538        };
539
540        match command {
541            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
542            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
543            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
544            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
545            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
546            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
547            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
548            TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
549        }
550    }
551
552    fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
553        let mut order = cmd.order.clone();
554        let client_order_id = order.client_order_id();
555        let instrument_id = order.instrument_id();
556
557        // Check if the order exists in the cache
558        if !self.cache.borrow().order_exists(&client_order_id) {
559            // Add order to cache in a separate scope to drop the mutable borrow
560            {
561                let mut cache = self.cache.borrow_mut();
562                if let Err(e) =
563                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
564                {
565                    log::error!("Error adding order to cache: {e}");
566                    return;
567                }
568            }
569
570            if self.config.snapshot_orders {
571                self.create_order_state_snapshot(&order);
572            }
573        }
574
575        // Get instrument in a separate scope to manage borrows
576        let instrument = {
577            let cache = self.cache.borrow();
578            if let Some(instrument) = cache.instrument(&instrument_id) {
579                instrument.clone()
580            } else {
581                log::error!(
582                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
583                );
584                return;
585            }
586        };
587
588        // Handle quote quantity conversion
589        if self.config.convert_quote_qty_to_base
590            && !instrument.is_inverse()
591            && order.is_quote_quantity()
592        {
593            log::warn!(
594                "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
595            );
596            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
597
598            if let Some(price) = last_px {
599                let base_qty = instrument.get_base_quantity(order.quantity(), price);
600                self.set_order_base_qty(&mut order, base_qty);
601            } else {
602                self.deny_order(
603                    &order,
604                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
605                );
606                return;
607            }
608        }
609
610        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
611            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
612            own_book.add(order.to_own_book_order());
613        }
614
615        // Send the order to the execution client
616        if let Err(e) = client.submit_order(cmd) {
617            log::error!("Error submitting order to client: {e}");
618            self.deny_order(
619                &cmd.order,
620                &format!("failed-to-submit-order-to-client: {e}"),
621            );
622        }
623    }
624
625    fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
626        let orders = cmd.order_list.orders.clone();
627
628        let mut cache = self.cache.borrow_mut();
629        for order in &orders {
630            if !cache.order_exists(&order.client_order_id()) {
631                if let Err(e) =
632                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
633                {
634                    log::error!("Error adding order to cache: {e}");
635                    return;
636                }
637
638                if self.config.snapshot_orders {
639                    self.create_order_state_snapshot(order);
640                }
641            }
642        }
643        drop(cache);
644
645        let instrument = {
646            let cache = self.cache.borrow();
647            if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
648                instrument.clone()
649            } else {
650                log::error!(
651                    "Cannot handle submit order list: no instrument found for {}, {cmd}",
652                    cmd.instrument_id,
653                );
654                return;
655            }
656        };
657
658        // Handle quote quantity conversion
659        if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
660            let mut conversions: Vec<(ClientOrderId, Quantity)> =
661                Vec::with_capacity(cmd.order_list.orders.len());
662
663            for order in &cmd.order_list.orders {
664                if !order.is_quote_quantity() {
665                    continue; // Base quantity already set
666                }
667
668                let last_px =
669                    self.last_px_for_conversion(&order.instrument_id(), order.order_side());
670
671                if let Some(px) = last_px {
672                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
673                    conversions.push((order.client_order_id(), base_qty));
674                } else {
675                    for order in &cmd.order_list.orders {
676                        self.deny_order(
677                            order,
678                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
679                        );
680                    }
681                    return; // Denied
682                }
683            }
684
685            if !conversions.is_empty() {
686                log::warn!(
687                    "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
688                );
689
690                let mut cache = self.cache.borrow_mut();
691                for (client_order_id, base_qty) in conversions {
692                    if let Some(mut_order) = cache.mut_order(&client_order_id) {
693                        self.set_order_base_qty(mut_order, base_qty);
694                    }
695                }
696            }
697        }
698
699        if self.config.manage_own_order_books {
700            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
701            for order in &cmd.order_list.orders {
702                if should_handle_own_book_order(order) {
703                    own_book.add(order.to_own_book_order());
704                }
705            }
706        }
707
708        // Send to execution client
709        if let Err(e) = client.submit_order_list(cmd) {
710            log::error!("Error submitting order list to client: {e}");
711            for order in &orders {
712                self.deny_order(
713                    order,
714                    &format!("failed-to-submit-order-list-to-client: {e}"),
715                );
716            }
717        }
718    }
719
720    fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
721        if let Err(e) = client.modify_order(cmd) {
722            log::error!("Error modifying order: {e}");
723        }
724    }
725
726    fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
727        if let Err(e) = client.cancel_order(cmd) {
728            log::error!("Error canceling order: {e}");
729        }
730    }
731
732    fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
733        if let Err(e) = client.cancel_all_orders(cmd) {
734            log::error!("Error canceling all orders: {e}");
735        }
736    }
737
738    fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
739        if let Err(e) = client.batch_cancel_orders(cmd) {
740            log::error!("Error batch canceling orders: {e}");
741        }
742    }
743
744    fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
745        if let Err(e) = client.query_account(cmd) {
746            log::error!("Error querying account: {e}");
747        }
748    }
749
750    fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
751        if let Err(e) = client.query_order(cmd) {
752            log::error!("Error querying order: {e}");
753        }
754    }
755
756    fn create_order_state_snapshot(&self, order: &OrderAny) {
757        if self.config.debug {
758            log::debug!("Creating order state snapshot for {order}");
759        }
760
761        if self.cache.borrow().has_backing()
762            && let Err(e) = self.cache.borrow().snapshot_order_state(order)
763        {
764            log::error!("Failed to snapshot order state: {e}");
765            return;
766        }
767
768        if get_message_bus().borrow().has_backing {
769            let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
770            msgbus::publish(topic, order);
771        }
772    }
773
774    fn create_position_state_snapshot(&self, position: &Position) {
775        if self.config.debug {
776            log::debug!("Creating position state snapshot for {position}");
777        }
778
779        // let mut position: Position = position.clone();
780        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
781        //     position.unrealized_pnl(last)
782        // }
783
784        let topic = switchboard::get_positions_snapshots_topic(position.id);
785        msgbus::publish(topic, position);
786    }
787
788    // -- EVENT HANDLERS --------------------------------------------------------------------------
789
790    fn handle_event(&mut self, event: &OrderEventAny) {
791        if self.config.debug {
792            log::debug!("{RECV}{EVT} {event:?}");
793        }
794
795        let client_order_id = event.client_order_id();
796        let cache = self.cache.borrow();
797        let mut order = if let Some(order) = cache.order(&client_order_id) {
798            order.clone()
799        } else {
800            log::warn!(
801                "Order with {} not found in the cache to apply {}",
802                event.client_order_id(),
803                event
804            );
805
806            // Try to find order by venue order ID if available
807            let venue_order_id = if let Some(id) = event.venue_order_id() {
808                id
809            } else {
810                log::error!(
811                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
812                    event.client_order_id()
813                );
814                return;
815            };
816
817            // Look up client order ID from venue order ID
818            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
819                id
820            } else {
821                log::error!(
822                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
823                    event.client_order_id(),
824                );
825                return;
826            };
827
828            // Get order using found client order ID
829            if let Some(order) = cache.order(client_order_id) {
830                log::info!("Order with {client_order_id} was found in the cache");
831                order.clone()
832            } else {
833                log::error!(
834                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
835                );
836                return;
837            }
838        };
839
840        drop(cache);
841
842        match event {
843            OrderEventAny::Filled(fill) => {
844                let oms_type = self.determine_oms_type(fill);
845                let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
846
847                let mut fill = *fill;
848                if fill.position_id.is_none() {
849                    fill.position_id = Some(position_id);
850                }
851
852                if self.apply_fill_to_order(&mut order, fill).is_ok() {
853                    self.handle_order_fill(&order, fill, oms_type);
854                }
855            }
856            _ => {
857                let _ = self.apply_event_to_order(&mut order, event.clone());
858            }
859        }
860    }
861
862    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
863        // Check for strategy OMS override
864        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
865            return *oms_type;
866        }
867
868        // Use native venue OMS
869        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
870            && let Some(client) = self.clients.get(client_id)
871        {
872            return client.oms_type();
873        }
874
875        if let Some(client) = &self.default_client {
876            return client.oms_type();
877        }
878
879        OmsType::Netting // Default fallback
880    }
881
882    fn determine_position_id(
883        &mut self,
884        fill: OrderFilled,
885        oms_type: OmsType,
886        order: Option<&OrderAny>,
887    ) -> PositionId {
888        match oms_type {
889            OmsType::Hedging => self.determine_hedging_position_id(fill, order),
890            OmsType::Netting => self.determine_netting_position_id(fill),
891            _ => self.determine_netting_position_id(fill), // Default to netting
892        }
893    }
894
895    fn determine_hedging_position_id(
896        &mut self,
897        fill: OrderFilled,
898        order: Option<&OrderAny>,
899    ) -> PositionId {
900        // Check if position ID already exists
901        if let Some(position_id) = fill.position_id {
902            if self.config.debug {
903                log::debug!("Already had a position ID of: {position_id}");
904            }
905            return position_id;
906        }
907
908        let cache = self.cache.borrow();
909
910        let order = if let Some(o) = order {
911            o
912        } else {
913            match cache.order(&fill.client_order_id()) {
914                Some(o) => o,
915                None => {
916                    panic!(
917                        "Order for {} not found to determine position ID",
918                        fill.client_order_id()
919                    );
920                }
921            }
922        };
923
924        // Check execution spawn orders
925        if let Some(spawn_id) = order.exec_spawn_id() {
926            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
927            for spawned_order in spawn_orders {
928                if let Some(pos_id) = spawned_order.position_id() {
929                    if self.config.debug {
930                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
931                    }
932                    return pos_id;
933                }
934            }
935        }
936
937        // Generate new position ID
938        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
939        if self.config.debug {
940            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
941        }
942        position_id
943    }
944
945    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
946        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
947    }
948
949    fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
950        if order.is_duplicate_fill(&fill) {
951            log::warn!(
952                "Duplicate fill: {} trade_id={} already applied, skipping",
953                order.client_order_id(),
954                fill.trade_id
955            );
956            anyhow::bail!("Duplicate fill");
957        }
958
959        self.check_overfill(order, &fill)?;
960        let event = OrderEventAny::Filled(fill);
961        self.apply_order_event(order, event)
962    }
963
964    fn apply_event_to_order(
965        &self,
966        order: &mut OrderAny,
967        event: OrderEventAny,
968    ) -> anyhow::Result<()> {
969        self.apply_order_event(order, event)
970    }
971
972    fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
973        if let Err(e) = order.apply(event.clone()) {
974            match e {
975                OrderError::InvalidStateTransition => {
976                    // Event already applied to order (e.g., from reconciliation or duplicate processing)
977                    // Log warning and continue with downstream processing (cache update, publishing, etc.)
978                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
979                }
980                OrderError::DuplicateFill(trade_id) => {
981                    // Duplicate fill detected at order level (secondary safety check)
982                    log::warn!(
983                        "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
984                    );
985                    anyhow::bail!("{e}");
986                }
987                _ => {
988                    // Protection against invalid IDs and other invariants
989                    log::error!("Error applying event: {e}, did not apply {event}");
990                    if should_handle_own_book_order(order) {
991                        self.cache.borrow_mut().update_own_order_book(order);
992                    }
993                    anyhow::bail!("{e}");
994                }
995            }
996        }
997
998        if let Err(e) = self.cache.borrow_mut().update_order(order) {
999            log::error!("Error updating order in cache: {e}");
1000        }
1001
1002        if self.config.debug {
1003            log::debug!("{SEND}{EVT} {event}");
1004        }
1005
1006        let topic = switchboard::get_event_orders_topic(event.strategy_id());
1007        msgbus::publish(topic, &event);
1008
1009        if self.config.snapshot_orders {
1010            self.create_order_state_snapshot(order);
1011        }
1012
1013        Ok(())
1014    }
1015
1016    fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1017        let potential_overfill = order.calculate_overfill(fill.last_qty);
1018
1019        if potential_overfill.is_positive() {
1020            if self.config.allow_overfills {
1021                log::warn!(
1022                    "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1023                    order.client_order_id(),
1024                    potential_overfill,
1025                    order.filled_qty(),
1026                    fill.last_qty,
1027                    order.quantity()
1028                );
1029            } else {
1030                let msg = format!(
1031                    "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1032                Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1033                    order.client_order_id(),
1034                    potential_overfill,
1035                    order.filled_qty(),
1036                    fill.last_qty,
1037                    order.quantity()
1038                );
1039                anyhow::bail!("{msg}");
1040            }
1041        }
1042
1043        Ok(())
1044    }
1045
1046    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1047        let instrument =
1048            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1049                instrument.clone()
1050            } else {
1051                log::error!(
1052                    "Cannot handle order fill: no instrument found for {}, {fill}",
1053                    fill.instrument_id,
1054                );
1055                return;
1056            };
1057
1058        if self.cache.borrow().account(&fill.account_id).is_none() {
1059            log::error!(
1060                "Cannot handle order fill: no account found for {}, {fill}",
1061                fill.instrument_id.venue,
1062            );
1063            return;
1064        }
1065
1066        // Skip portfolio position updates for combo fills (spread instruments)
1067        // Combo fills are only used for order management, not portfolio updates
1068        let position = if instrument.is_spread() {
1069            None
1070        } else {
1071            self.handle_position_update(instrument.clone(), fill, oms_type);
1072            let position_id = fill.position_id.unwrap();
1073            self.cache.borrow().position(&position_id).cloned()
1074        };
1075
1076        // Handle contingent orders for both spread and non-spread instruments
1077        // For spread instruments, contingent orders work without position linkage
1078        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1079            // For non-spread instruments, link to position if available
1080            if !instrument.is_spread()
1081                && let Some(ref pos) = position
1082                && pos.is_open()
1083            {
1084                let position_id = pos.id;
1085                for client_order_id in order.linked_order_ids().unwrap_or_default() {
1086                    let mut cache = self.cache.borrow_mut();
1087                    let contingent_order = cache.mut_order(client_order_id);
1088                    if let Some(contingent_order) = contingent_order
1089                        && contingent_order.position_id().is_none()
1090                    {
1091                        contingent_order.set_position_id(Some(position_id));
1092
1093                        if let Err(e) = self.cache.borrow_mut().add_position_id(
1094                            &position_id,
1095                            &contingent_order.instrument_id().venue,
1096                            &contingent_order.client_order_id(),
1097                            &contingent_order.strategy_id(),
1098                        ) {
1099                            log::error!("Failed to add position ID: {e}");
1100                        }
1101                    }
1102                }
1103            }
1104            // For spread instruments, contingent orders can still be triggered
1105            // but without position linkage (since no position is created for spreads)
1106        }
1107    }
1108
1109    /// Handle position creation or update for a fill.
1110    ///
1111    /// This function mirrors the Python `_handle_position_update` method.
1112    fn handle_position_update(
1113        &mut self,
1114        instrument: InstrumentAny,
1115        fill: OrderFilled,
1116        oms_type: OmsType,
1117    ) {
1118        let position_id = if let Some(position_id) = fill.position_id {
1119            position_id
1120        } else {
1121            log::error!("Cannot handle position update: no position ID found for fill {fill}");
1122            return;
1123        };
1124
1125        let position_opt = self.cache.borrow().position(&position_id).cloned();
1126
1127        match position_opt {
1128            None => {
1129                // Position is None - open new position
1130                if self.open_position(instrument, None, fill, oms_type).is_ok() {
1131                    // Position opened successfully
1132                }
1133            }
1134            Some(pos) if pos.is_closed() => {
1135                // Position is closed - open new position
1136                if self
1137                    .open_position(instrument, Some(&pos), fill, oms_type)
1138                    .is_ok()
1139                {
1140                    // Position opened successfully
1141                }
1142            }
1143            Some(mut pos) => {
1144                if self.will_flip_position(&pos, fill) {
1145                    // Position will flip
1146                    self.flip_position(instrument, &mut pos, fill, oms_type);
1147                } else {
1148                    // Update existing position
1149                    self.update_position(&mut pos, fill);
1150                }
1151            }
1152        }
1153    }
1154
1155    fn open_position(
1156        &self,
1157        instrument: InstrumentAny,
1158        position: Option<&Position>,
1159        fill: OrderFilled,
1160        oms_type: OmsType,
1161    ) -> anyhow::Result<()> {
1162        if let Some(position) = position {
1163            if Self::is_duplicate_closed_fill(position, &fill) {
1164                log::warn!(
1165                    "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1166                    fill.trade_id,
1167                    position.id,
1168                    fill.order_side,
1169                    fill.last_qty,
1170                    fill.last_px
1171                );
1172                return Ok(());
1173            }
1174            self.reopen_position(position, oms_type)?;
1175        }
1176
1177        let position = Position::new(&instrument, fill);
1178        self.cache
1179            .borrow_mut()
1180            .add_position(position.clone(), oms_type)?; // TODO: Remove clone (change method)
1181
1182        if self.config.snapshot_positions {
1183            self.create_position_state_snapshot(&position);
1184        }
1185
1186        let ts_init = self.clock.borrow().timestamp_ns();
1187        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1188        let topic = switchboard::get_event_positions_topic(event.strategy_id);
1189        msgbus::publish(topic, &event);
1190
1191        Ok(())
1192    }
1193
1194    fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1195        position.events.iter().any(|event| {
1196            event.trade_id == fill.trade_id
1197                && event.order_side == fill.order_side
1198                && event.last_px == fill.last_px
1199                && event.last_qty == fill.last_qty
1200        })
1201    }
1202
1203    fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1204        if oms_type == OmsType::Netting {
1205            if position.is_open() {
1206                anyhow::bail!(
1207                    "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1208                    position.id
1209                );
1210            }
1211            // Snapshot closed position if reopening (NETTING mode)
1212            self.cache.borrow_mut().snapshot_position(position)?;
1213        } else {
1214            // HEDGING mode
1215            log::warn!(
1216                "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1217                position.id
1218            );
1219        }
1220        Ok(())
1221    }
1222
1223    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1224        // Apply the fill to the position
1225        position.apply(&fill);
1226
1227        // Check if position is closed after applying the fill
1228        let is_closed = position.is_closed();
1229
1230        // Update position in cache - this should handle the closed state tracking
1231        if let Err(e) = self.cache.borrow_mut().update_position(position) {
1232            log::error!("Failed to update position: {e:?}");
1233            return;
1234        }
1235
1236        // Verify cache state after update
1237        let cache = self.cache.borrow();
1238
1239        drop(cache);
1240
1241        // Create position state snapshot if enabled
1242        if self.config.snapshot_positions {
1243            self.create_position_state_snapshot(position);
1244        }
1245
1246        // Create and publish appropriate position event
1247        let topic = switchboard::get_event_positions_topic(position.strategy_id);
1248        let ts_init = self.clock.borrow().timestamp_ns();
1249
1250        if is_closed {
1251            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1252            msgbus::publish(topic, &event);
1253        } else {
1254            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1255            msgbus::publish(topic, &event);
1256        }
1257    }
1258
1259    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1260        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1261    }
1262
1263    fn flip_position(
1264        &mut self,
1265        instrument: InstrumentAny,
1266        position: &mut Position,
1267        fill: OrderFilled,
1268        oms_type: OmsType,
1269    ) {
1270        let difference = match position.side {
1271            PositionSide::Long => Quantity::from_raw(
1272                fill.last_qty.raw - position.quantity.raw,
1273                position.size_precision,
1274            ),
1275            PositionSide::Short => Quantity::from_raw(
1276                position.quantity.raw.abs_diff(fill.last_qty.raw), // Equivalent to Python's abs(position.quantity - fill.last_qty)
1277                position.size_precision,
1278            ),
1279            _ => fill.last_qty,
1280        };
1281
1282        // Split commission between two positions
1283        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1284        let (commission1, commission2) = if let Some(commission) = fill.commission {
1285            let commission_currency = commission.currency;
1286            let commission1 = Money::new(commission * fill_percent, commission_currency);
1287            let commission2 = commission - commission1;
1288            (Some(commission1), Some(commission2))
1289        } else {
1290            log::error!("Commission is not available.");
1291            (None, None)
1292        };
1293
1294        let mut fill_split1: Option<OrderFilled> = None;
1295        if position.is_open() {
1296            fill_split1 = Some(OrderFilled::new(
1297                fill.trader_id,
1298                fill.strategy_id,
1299                fill.instrument_id,
1300                fill.client_order_id,
1301                fill.venue_order_id,
1302                fill.account_id,
1303                fill.trade_id,
1304                fill.order_side,
1305                fill.order_type,
1306                position.quantity,
1307                fill.last_px,
1308                fill.currency,
1309                fill.liquidity_side,
1310                UUID4::new(),
1311                fill.ts_event,
1312                fill.ts_init,
1313                fill.reconciliation,
1314                fill.position_id,
1315                commission1,
1316            ));
1317
1318            self.update_position(position, fill_split1.unwrap());
1319
1320            // Snapshot closed position before reusing ID (NETTING mode)
1321            if oms_type == OmsType::Netting
1322                && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1323            {
1324                log::error!("Failed to snapshot position during flip: {e:?}");
1325            }
1326        }
1327
1328        // Guard against flipping a position with a zero fill size
1329        if difference.raw == 0 {
1330            log::warn!(
1331                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1332            );
1333            return;
1334        }
1335
1336        let position_id_flip = if oms_type == OmsType::Hedging
1337            && let Some(position_id) = fill.position_id
1338            && position_id.is_virtual()
1339        {
1340            // Generate new position ID for flipped virtual position (Hedging OMS only)
1341            Some(self.pos_id_generator.generate(fill.strategy_id, true))
1342        } else {
1343            // Default: use the same position ID as the fill (Python behavior)
1344            fill.position_id
1345        };
1346
1347        let fill_split2 = OrderFilled::new(
1348            fill.trader_id,
1349            fill.strategy_id,
1350            fill.instrument_id,
1351            fill.client_order_id,
1352            fill.venue_order_id,
1353            fill.account_id,
1354            fill.trade_id,
1355            fill.order_side,
1356            fill.order_type,
1357            difference,
1358            fill.last_px,
1359            fill.currency,
1360            fill.liquidity_side,
1361            UUID4::new(),
1362            fill.ts_event,
1363            fill.ts_init,
1364            fill.reconciliation,
1365            position_id_flip,
1366            commission2,
1367        );
1368
1369        if oms_type == OmsType::Hedging
1370            && let Some(position_id) = fill.position_id
1371            && position_id.is_virtual()
1372        {
1373            log::warn!("Closing position {fill_split1:?}");
1374            log::warn!("Flipping position {fill_split2:?}");
1375        }
1376
1377        // Open flipped position
1378        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1379            log::error!("Failed to open flipped position: {e:?}");
1380        }
1381    }
1382
1383    // -- INTERNAL --------------------------------------------------------------------------------
1384
1385    fn set_position_id_counts(&mut self) {
1386        // For the internal position ID generator
1387        let cache = self.cache.borrow();
1388        let positions = cache.positions(None, None, None, None);
1389
1390        // Count positions per instrument_id using a HashMap
1391        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1392
1393        for position in positions {
1394            *counts.entry(position.strategy_id).or_insert(0) += 1;
1395        }
1396
1397        self.pos_id_generator.reset();
1398
1399        for (strategy_id, count) in counts {
1400            self.pos_id_generator.set_count(count, strategy_id);
1401            log::info!("Set PositionId count for {strategy_id} to {count}");
1402        }
1403    }
1404
1405    fn last_px_for_conversion(
1406        &self,
1407        instrument_id: &InstrumentId,
1408        side: OrderSide,
1409    ) -> Option<Price> {
1410        let cache = self.cache.borrow();
1411
1412        // Try to get last trade price
1413        if let Some(trade) = cache.trade(instrument_id) {
1414            return Some(trade.price);
1415        }
1416
1417        // Fall back to quote if available
1418        if let Some(quote) = cache.quote(instrument_id) {
1419            match side {
1420                OrderSide::Buy => Some(quote.ask_price),
1421                OrderSide::Sell => Some(quote.bid_price),
1422                OrderSide::NoOrderSide => None,
1423            }
1424        } else {
1425            None
1426        }
1427    }
1428
1429    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1430        log::info!(
1431            "Setting {} order quote quantity {} to base quantity {}",
1432            order.instrument_id(),
1433            order.quantity(),
1434            base_qty
1435        );
1436
1437        let original_qty = order.quantity();
1438        order.set_quantity(base_qty);
1439        order.set_leaves_qty(base_qty);
1440        order.set_is_quote_quantity(false);
1441
1442        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1443            return;
1444        }
1445
1446        if let Some(linked_order_ids) = order.linked_order_ids() {
1447            for client_order_id in linked_order_ids {
1448                match self.cache.borrow_mut().mut_order(client_order_id) {
1449                    Some(contingent_order) => {
1450                        if !contingent_order.is_quote_quantity() {
1451                            continue; // Already base quantity
1452                        }
1453
1454                        if contingent_order.quantity() != original_qty {
1455                            log::warn!(
1456                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1457                                contingent_order.quantity(),
1458                                original_qty,
1459                                base_qty
1460                            );
1461                        }
1462
1463                        log::info!(
1464                            "Setting {} order quote quantity {} to base quantity {}",
1465                            contingent_order.instrument_id(),
1466                            contingent_order.quantity(),
1467                            base_qty
1468                        );
1469
1470                        contingent_order.set_quantity(base_qty);
1471                        contingent_order.set_leaves_qty(base_qty);
1472                        contingent_order.set_is_quote_quantity(false);
1473                    }
1474                    None => {
1475                        log::error!("Contingency order {client_order_id} not found");
1476                    }
1477                }
1478            }
1479        } else {
1480            log::warn!(
1481                "No linked order IDs found for order {}",
1482                order.client_order_id()
1483            );
1484        }
1485    }
1486
1487    fn deny_order(&self, order: &OrderAny, reason: &str) {
1488        log::error!(
1489            "Order denied: {reason}, order ID: {}",
1490            order.client_order_id()
1491        );
1492
1493        let denied = OrderDenied::new(
1494            order.trader_id(),
1495            order.strategy_id(),
1496            order.instrument_id(),
1497            order.client_order_id(),
1498            reason.into(),
1499            UUID4::new(),
1500            self.clock.borrow().timestamp_ns(),
1501            self.clock.borrow().timestamp_ns(),
1502        );
1503
1504        let mut order = order.clone();
1505
1506        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1507            log::error!("Failed to apply denied event to order: {e}");
1508            return;
1509        }
1510
1511        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1512            log::error!("Failed to update order in cache: {e}");
1513            return;
1514        }
1515
1516        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1517        msgbus::publish(topic, &OrderEventAny::Denied(denied));
1518
1519        if self.config.snapshot_orders {
1520            self.create_order_state_snapshot(&order);
1521        }
1522    }
1523
1524    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1525        let mut cache = self.cache.borrow_mut();
1526        if cache.own_order_book_mut(instrument_id).is_none() {
1527            let own_book = OwnOrderBook::new(*instrument_id);
1528            cache.add_own_order_book(own_book).unwrap();
1529        }
1530
1531        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1532    }
1533}