nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24pub mod stubs;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30    cell::{RefCell, RefMut},
31    collections::{HashMap, HashSet},
32    fmt::Debug,
33    rc::Rc,
34    time::SystemTime,
35};
36
37use ahash::{AHashMap, AHashSet};
38use config::ExecutionEngineConfig;
39use futures::future::join_all;
40use nautilus_common::{
41    cache::Cache,
42    clock::Clock,
43    generators::position_id::PositionIdGenerator,
44    logging::{CMD, EVT, RECV, SEND},
45    messages::execution::{
46        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47        SubmitOrder, SubmitOrderList, TradingCommand,
48    },
49    msgbus::{
50        self, get_message_bus,
51        switchboard::{self},
52    },
53};
54use nautilus_core::UUID4;
55use nautilus_model::{
56    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
57    events::{
58        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
59        PositionEvent, PositionOpened,
60    },
61    identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
62    instruments::{Instrument, InstrumentAny},
63    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
64    orders::{Order, OrderAny, OrderError},
65    position::Position,
66    reports::ExecutionMassStatus,
67    types::{Money, Price, Quantity},
68};
69
70use crate::client::{ExecutionClient, ExecutionClientAdapter};
71
72/// Central execution engine responsible for orchestrating order routing and execution.
73///
74/// The execution engine manages the entire order lifecycle from submission to completion,
75/// handling routing to appropriate execution clients, position management, and event
76/// processing. It supports multiple execution venues through registered clients and
77/// provides sophisticated order management capabilities.
78pub struct ExecutionEngine {
79    clock: Rc<RefCell<dyn Clock>>,
80    cache: Rc<RefCell<Cache>>,
81    clients: AHashMap<ClientId, ExecutionClientAdapter>,
82    default_client: Option<ExecutionClientAdapter>,
83    routing_map: HashMap<Venue, ClientId>,
84    oms_overrides: HashMap<StrategyId, OmsType>,
85    external_order_claims: HashMap<InstrumentId, StrategyId>,
86    external_clients: HashSet<ClientId>,
87    pos_id_generator: PositionIdGenerator,
88    config: ExecutionEngineConfig,
89}
90
91impl Debug for ExecutionEngine {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct(stringify!(ExecutionEngine))
94            .field("client_count", &self.clients.len())
95            .finish()
96    }
97}
98
99impl ExecutionEngine {
100    /// Creates a new [`ExecutionEngine`] instance.
101    pub fn new(
102        clock: Rc<RefCell<dyn Clock>>,
103        cache: Rc<RefCell<Cache>>,
104        config: Option<ExecutionEngineConfig>,
105    ) -> Self {
106        let trader_id = get_message_bus().borrow().trader_id;
107        Self {
108            clock: clock.clone(),
109            cache,
110            clients: AHashMap::new(),
111            default_client: None,
112            routing_map: HashMap::new(),
113            oms_overrides: HashMap::new(),
114            external_order_claims: HashMap::new(),
115            external_clients: config
116                .as_ref()
117                .and_then(|c| c.external_clients.clone())
118                .unwrap_or_default()
119                .into_iter()
120                .collect(),
121            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
122            config: config.unwrap_or_default(),
123        }
124    }
125
126    #[must_use]
127    /// Returns the position ID count for the specified strategy.
128    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
129        self.pos_id_generator.count(strategy_id)
130    }
131
132    #[must_use]
133    /// Checks the integrity of cached execution data.
134    pub fn check_integrity(&self) -> bool {
135        self.cache.borrow_mut().check_integrity()
136    }
137
138    #[must_use]
139    /// Returns true if all registered execution clients are connected.
140    pub fn check_connected(&self) -> bool {
141        let clients_connected = self.clients.values().all(|c| c.is_connected());
142        let default_connected = self
143            .default_client
144            .as_ref()
145            .is_none_or(|c| c.is_connected());
146        clients_connected && default_connected
147    }
148
149    #[must_use]
150    /// Returns true if all registered execution clients are disconnected.
151    pub fn check_disconnected(&self) -> bool {
152        let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
153        let default_disconnected = self
154            .default_client
155            .as_ref()
156            .is_none_or(|c| !c.is_connected());
157        clients_disconnected && default_disconnected
158    }
159
160    #[must_use]
161    /// Checks for residual positions and orders in the cache.
162    pub fn check_residuals(&self) -> bool {
163        self.cache.borrow().check_residuals()
164    }
165
166    #[must_use]
167    /// Returns the set of instruments that have external order claims.
168    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
169        self.external_order_claims.keys().copied().collect()
170    }
171
172    #[must_use]
173    /// Returns the configured external client IDs.
174    pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
175        self.external_clients.clone()
176    }
177
178    #[must_use]
179    /// Returns any external order claim for the given instrument ID.
180    pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
181        self.external_order_claims.get(instrument_id).copied()
182    }
183
184    // -- REGISTRATION ----------------------------------------------------------------------------
185
186    /// Registers a new execution client.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if a client with the same ID is already registered.
191    pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
192        let client_id = client.client_id();
193        let venue = client.venue();
194
195        if self.clients.contains_key(&client_id) {
196            anyhow::bail!("Client already registered with ID {client_id}");
197        }
198
199        let adapter = ExecutionClientAdapter::new(client);
200
201        self.routing_map.insert(venue, client_id);
202
203        log::debug!("Registered client {client_id}");
204        self.clients.insert(client_id, adapter);
205        Ok(())
206    }
207
208    /// Registers a default execution client for fallback routing.
209    pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
210        let client_id = client.client_id();
211        let adapter = ExecutionClientAdapter::new(client);
212
213        log::debug!("Registered default client {client_id}");
214        self.default_client = Some(adapter);
215    }
216
217    #[must_use]
218    /// Returns a reference to the execution client registered with the given ID.
219    pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
220        self.clients.get(client_id).map(|a| a.client.as_ref())
221    }
222
223    #[must_use]
224    /// Returns a mutable reference to the execution client adapter registered with the given ID.
225    pub fn get_client_adapter_mut(
226        &mut self,
227        client_id: &ClientId,
228    ) -> Option<&mut ExecutionClientAdapter> {
229        if let Some(default) = &self.default_client
230            && &default.client_id == client_id
231        {
232            return self.default_client.as_mut();
233        }
234        self.clients.get_mut(client_id)
235    }
236
237    /// Generates mass status for the given client.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if the client is not found or mass status generation fails.
242    pub async fn generate_mass_status(
243        &mut self,
244        client_id: &ClientId,
245        lookback_mins: Option<u64>,
246    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
247        if let Some(client) = self.get_client_adapter_mut(client_id) {
248            client.generate_mass_status(lookback_mins).await
249        } else {
250            anyhow::bail!("Client {client_id} not found")
251        }
252    }
253
254    #[must_use]
255    /// Returns all registered execution client IDs.
256    pub fn client_ids(&self) -> Vec<ClientId> {
257        let mut ids: Vec<_> = self.clients.keys().copied().collect();
258
259        if let Some(default) = &self.default_client {
260            ids.push(default.client_id);
261        }
262        ids
263    }
264
265    #[must_use]
266    /// Returns mutable access to all registered execution clients.
267    pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
268        let mut adapters: Vec<_> = self.clients.values_mut().collect();
269        if let Some(default) = &mut self.default_client {
270            adapters.push(default);
271        }
272        adapters
273    }
274
275    #[must_use]
276    /// Returns execution clients that would handle the given orders.
277    ///
278    /// This method first attempts to resolve each order's originating client from the cache,
279    /// then falls back to venue routing for any orders without a cached client.
280    pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
281        let mut client_ids: AHashSet<ClientId> = AHashSet::new();
282        let mut venues: AHashSet<Venue> = AHashSet::new();
283
284        // Collect client IDs from cache and venues for fallback
285        for order in orders {
286            venues.insert(order.instrument_id().venue);
287            if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
288                client_ids.insert(*client_id);
289            }
290        }
291
292        let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
293
294        // Add clients for cached client IDs (orders go back to originating client)
295        for client_id in &client_ids {
296            if let Some(adapter) = self.clients.get(client_id)
297                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
298            {
299                clients.push(adapter.client.as_ref());
300            }
301        }
302
303        // Add clients for venue routing (for orders not in cache)
304        for venue in &venues {
305            if let Some(client_id) = self.routing_map.get(venue) {
306                if let Some(adapter) = self.clients.get(client_id)
307                    && !clients.iter().any(|c| c.client_id() == adapter.client_id)
308                {
309                    clients.push(adapter.client.as_ref());
310                }
311            } else if let Some(adapter) = &self.default_client
312                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
313            {
314                clients.push(adapter.client.as_ref());
315            }
316        }
317
318        clients
319    }
320
321    /// Sets routing for a specific venue to a given client ID.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the client ID is not registered.
326    pub fn register_venue_routing(
327        &mut self,
328        client_id: ClientId,
329        venue: Venue,
330    ) -> anyhow::Result<()> {
331        if !self.clients.contains_key(&client_id) {
332            anyhow::bail!("No client registered with ID {client_id}");
333        }
334
335        self.routing_map.insert(venue, client_id);
336        log::info!("Set client {client_id} routing for {venue}");
337        Ok(())
338    }
339
340    /// Registers the OMS (Order Management System) type for a strategy.
341    ///
342    /// If an OMS type is already registered for this strategy, it will be overridden.
343    pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
344        self.oms_overrides.insert(strategy_id, oms_type);
345        log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
346    }
347
348    /// Registers external order claims for a strategy.
349    ///
350    /// This operation is atomic: either all instruments are registered or none are.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if any instrument already has a registered claim.
355    pub fn register_external_order_claims(
356        &mut self,
357        strategy_id: StrategyId,
358        instrument_ids: HashSet<InstrumentId>,
359    ) -> anyhow::Result<()> {
360        // Validate all instruments first
361        for instrument_id in &instrument_ids {
362            if let Some(existing) = self.external_order_claims.get(instrument_id) {
363                anyhow::bail!(
364                    "External order claim for {instrument_id} already exists for {existing}"
365                );
366            }
367        }
368
369        // If validation passed, insert all claims
370        for instrument_id in &instrument_ids {
371            self.external_order_claims
372                .insert(*instrument_id, strategy_id);
373        }
374
375        if !instrument_ids.is_empty() {
376            log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
377        }
378
379        Ok(())
380    }
381
382    /// # Errors
383    ///
384    /// Returns an error if no client is registered with the given ID.
385    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
386        if self.clients.remove(&client_id).is_some() {
387            // Remove from routing map if present
388            self.routing_map
389                .retain(|_, mapped_id| mapped_id != &client_id);
390            log::info!("Deregistered client {client_id}");
391            Ok(())
392        } else {
393            anyhow::bail!("No client registered with ID {client_id}")
394        }
395    }
396
397    /// Connects all registered execution clients concurrently.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if any client fails to connect.
402    pub async fn connect(&mut self) -> anyhow::Result<()> {
403        let futures: Vec<_> = self
404            .get_clients_mut()
405            .into_iter()
406            .map(|client| client.connect())
407            .collect();
408
409        let results = join_all(futures).await;
410        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
411
412        if errors.is_empty() {
413            Ok(())
414        } else {
415            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
416            anyhow::bail!(
417                "Failed to connect execution clients: {}",
418                error_msgs.join("; ")
419            )
420        }
421    }
422
423    /// Disconnects all registered execution clients concurrently.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if any client fails to disconnect.
428    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
429        let futures: Vec<_> = self
430            .get_clients_mut()
431            .into_iter()
432            .map(|client| client.disconnect())
433            .collect();
434
435        let results = join_all(futures).await;
436        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
437
438        if errors.is_empty() {
439            Ok(())
440        } else {
441            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
442            anyhow::bail!(
443                "Failed to disconnect execution clients: {}",
444                error_msgs.join("; ")
445            )
446        }
447    }
448
449    /// Sets the `manage_own_order_books` configuration option.
450    pub fn set_manage_own_order_books(&mut self, value: bool) {
451        self.config.manage_own_order_books = value;
452    }
453
454    /// Sets the `convert_quote_qty_to_base` configuration option.
455    pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
456        self.config.convert_quote_qty_to_base = value;
457    }
458
459    /// Starts the position snapshot timer if configured.
460    ///
461    /// Timer functionality requires a live execution context with an active clock.
462    pub fn start_snapshot_timer(&mut self) {
463        if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
464            log::info!("Starting position snapshots timer at {interval_secs} second intervals");
465        }
466    }
467
468    /// Stops the position snapshot timer if running.
469    pub fn stop_snapshot_timer(&mut self) {
470        if self.config.snapshot_positions_interval_secs.is_some() {
471            log::info!("Canceling position snapshots timer");
472        }
473    }
474
475    /// Creates snapshots of all open positions.
476    pub fn snapshot_open_position_states(&self) {
477        let positions: Vec<Position> = self
478            .cache
479            .borrow()
480            .positions_open(None, None, None, None)
481            .into_iter()
482            .cloned()
483            .collect();
484
485        for position in positions {
486            self.create_position_state_snapshot(&position);
487        }
488    }
489
490    // -- COMMANDS --------------------------------------------------------------------------------
491
492    #[allow(clippy::await_holding_refcell_ref)]
493    /// Loads persistent state into cache and rebuilds indices.
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if any cache operation fails.
498    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
499        let ts = SystemTime::now();
500
501        {
502            let mut cache = self.cache.borrow_mut();
503            cache.clear_index();
504            cache.cache_general()?;
505            self.cache.borrow_mut().cache_all().await?;
506            cache.build_index();
507            let _ = cache.check_integrity();
508
509            if self.config.manage_own_order_books {
510                for order in cache.orders(None, None, None, None) {
511                    if order.is_closed() || !should_handle_own_book_order(order) {
512                        continue;
513                    }
514                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
515                    own_book.add(order.to_own_book_order());
516                }
517            }
518        }
519
520        self.set_position_id_counts();
521
522        log::info!(
523            "Loaded cache in {}ms",
524            SystemTime::now()
525                .duration_since(ts)
526                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
527                .as_millis()
528        );
529
530        Ok(())
531    }
532
533    /// Flushes the database to persist all cached data.
534    pub fn flush_db(&self) {
535        self.cache.borrow_mut().flush_db();
536    }
537
538    /// Processes an order event, updating internal state and routing as needed.
539    pub fn process(&mut self, event: &OrderEventAny) {
540        self.handle_event(event);
541    }
542
543    /// Executes a trading command by routing it to the appropriate execution client.
544    pub fn execute(&self, command: &TradingCommand) {
545        self.execute_command(command);
546    }
547
548    // -- COMMAND HANDLERS ------------------------------------------------------------------------
549
550    fn execute_command(&self, command: &TradingCommand) {
551        if self.config.debug {
552            log::debug!("{RECV}{CMD} {command:?}");
553        }
554
555        if let Some(cid) = command.client_id()
556            && self.external_clients.contains(&cid)
557        {
558            if self.config.debug {
559                log::debug!("Skipping execution command for external client {cid}: {command:?}");
560            }
561            return;
562        }
563
564        let client = if let Some(adapter) = command
565            .client_id()
566            .and_then(|cid| self.clients.get(&cid))
567            .or_else(|| {
568                self.routing_map
569                    .get(&command.instrument_id().venue)
570                    .and_then(|client_id| self.clients.get(client_id))
571            })
572            .or(self.default_client.as_ref())
573        {
574            adapter.client.as_ref()
575        } else {
576            log::error!(
577                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
578                command.client_id(),
579                command.instrument_id().venue,
580            );
581            return;
582        };
583
584        match command {
585            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
586            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
587            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
588            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
589            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
590            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
591            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
592            TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
593        }
594    }
595
596    fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
597        let mut order = cmd.order.clone();
598        let client_order_id = order.client_order_id();
599        let instrument_id = order.instrument_id();
600
601        // Check if the order exists in the cache
602        if !self.cache.borrow().order_exists(&client_order_id) {
603            // Add order to cache in a separate scope to drop the mutable borrow
604            {
605                let mut cache = self.cache.borrow_mut();
606                if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
607                {
608                    log::error!("Error adding order to cache: {e}");
609                    return;
610                }
611            }
612
613            if self.config.snapshot_orders {
614                self.create_order_state_snapshot(&order);
615            }
616        }
617
618        // Get instrument in a separate scope to manage borrows
619        let instrument = {
620            let cache = self.cache.borrow();
621            if let Some(instrument) = cache.instrument(&instrument_id) {
622                instrument.clone()
623            } else {
624                log::error!(
625                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
626                );
627                return;
628            }
629        };
630
631        // Handle quote quantity conversion
632        if self.config.convert_quote_qty_to_base
633            && !instrument.is_inverse()
634            && order.is_quote_quantity()
635        {
636            log::warn!(
637                "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
638            );
639            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
640
641            if let Some(price) = last_px {
642                let base_qty = instrument.get_base_quantity(order.quantity(), price);
643                self.set_order_base_qty(&mut order, base_qty);
644            } else {
645                self.deny_order(
646                    &order,
647                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
648                );
649                return;
650            }
651        }
652
653        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
654            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
655            own_book.add(order.to_own_book_order());
656        }
657
658        // Send the order to the execution client
659        if let Err(e) = client.submit_order(cmd) {
660            log::error!("Error submitting order to client: {e}");
661            self.deny_order(
662                &cmd.order,
663                &format!("failed-to-submit-order-to-client: {e}"),
664            );
665        }
666    }
667
668    fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
669        let orders = cmd.order_list.orders.clone();
670
671        let mut cache = self.cache.borrow_mut();
672        for order in &orders {
673            if !cache.order_exists(&order.client_order_id()) {
674                if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
675                {
676                    log::error!("Error adding order to cache: {e}");
677                    return;
678                }
679
680                if self.config.snapshot_orders {
681                    self.create_order_state_snapshot(order);
682                }
683            }
684        }
685        drop(cache);
686
687        let instrument = {
688            let cache = self.cache.borrow();
689            if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
690                instrument.clone()
691            } else {
692                log::error!(
693                    "Cannot handle submit order list: no instrument found for {}, {cmd}",
694                    cmd.instrument_id,
695                );
696                return;
697            }
698        };
699
700        // Handle quote quantity conversion
701        if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
702            let mut conversions: Vec<(ClientOrderId, Quantity)> =
703                Vec::with_capacity(cmd.order_list.orders.len());
704
705            for order in &cmd.order_list.orders {
706                if !order.is_quote_quantity() {
707                    continue; // Base quantity already set
708                }
709
710                let last_px =
711                    self.last_px_for_conversion(&order.instrument_id(), order.order_side());
712
713                if let Some(px) = last_px {
714                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
715                    conversions.push((order.client_order_id(), base_qty));
716                } else {
717                    for order in &cmd.order_list.orders {
718                        self.deny_order(
719                            order,
720                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
721                        );
722                    }
723                    return; // Denied
724                }
725            }
726
727            if !conversions.is_empty() {
728                log::warn!(
729                    "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
730                );
731
732                let mut cache = self.cache.borrow_mut();
733                for (client_order_id, base_qty) in conversions {
734                    if let Some(mut_order) = cache.mut_order(&client_order_id) {
735                        self.set_order_base_qty(mut_order, base_qty);
736                    }
737                }
738            }
739        }
740
741        if self.config.manage_own_order_books {
742            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
743            for order in &cmd.order_list.orders {
744                if should_handle_own_book_order(order) {
745                    own_book.add(order.to_own_book_order());
746                }
747            }
748        }
749
750        // Send to execution client
751        if let Err(e) = client.submit_order_list(cmd) {
752            log::error!("Error submitting order list to client: {e}");
753            for order in &orders {
754                self.deny_order(
755                    order,
756                    &format!("failed-to-submit-order-list-to-client: {e}"),
757                );
758            }
759        }
760    }
761
762    fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
763        if let Err(e) = client.modify_order(cmd) {
764            log::error!("Error modifying order: {e}");
765        }
766    }
767
768    fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
769        if let Err(e) = client.cancel_order(cmd) {
770            log::error!("Error canceling order: {e}");
771        }
772    }
773
774    fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
775        if let Err(e) = client.cancel_all_orders(cmd) {
776            log::error!("Error canceling all orders: {e}");
777        }
778    }
779
780    fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
781        if let Err(e) = client.batch_cancel_orders(cmd) {
782            log::error!("Error batch canceling orders: {e}");
783        }
784    }
785
786    fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
787        if let Err(e) = client.query_account(cmd) {
788            log::error!("Error querying account: {e}");
789        }
790    }
791
792    fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
793        if let Err(e) = client.query_order(cmd) {
794            log::error!("Error querying order: {e}");
795        }
796    }
797
798    fn create_order_state_snapshot(&self, order: &OrderAny) {
799        if self.config.debug {
800            log::debug!("Creating order state snapshot for {order}");
801        }
802
803        if self.cache.borrow().has_backing()
804            && let Err(e) = self.cache.borrow().snapshot_order_state(order)
805        {
806            log::error!("Failed to snapshot order state: {e}");
807            return;
808        }
809
810        if get_message_bus().borrow().has_backing {
811            let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
812            msgbus::publish(topic, order);
813        }
814    }
815
816    fn create_position_state_snapshot(&self, position: &Position) {
817        if self.config.debug {
818            log::debug!("Creating position state snapshot for {position}");
819        }
820
821        // let mut position: Position = position.clone();
822        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
823        //     position.unrealized_pnl(last)
824        // }
825
826        let topic = switchboard::get_positions_snapshots_topic(position.id);
827        msgbus::publish(topic, position);
828    }
829
830    // -- EVENT HANDLERS --------------------------------------------------------------------------
831
832    fn handle_event(&mut self, event: &OrderEventAny) {
833        if self.config.debug {
834            log::debug!("{RECV}{EVT} {event:?}");
835        }
836
837        let client_order_id = event.client_order_id();
838        let cache = self.cache.borrow();
839        let mut order = if let Some(order) = cache.order(&client_order_id) {
840            order.clone()
841        } else {
842            log::warn!(
843                "Order with {} not found in the cache to apply {}",
844                event.client_order_id(),
845                event
846            );
847
848            // Try to find order by venue order ID if available
849            let venue_order_id = if let Some(id) = event.venue_order_id() {
850                id
851            } else {
852                log::error!(
853                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
854                    event.client_order_id()
855                );
856                return;
857            };
858
859            // Look up client order ID from venue order ID
860            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
861                id
862            } else {
863                log::error!(
864                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
865                    event.client_order_id(),
866                );
867                return;
868            };
869
870            // Get order using found client order ID
871            if let Some(order) = cache.order(client_order_id) {
872                log::info!("Order with {client_order_id} was found in the cache");
873                order.clone()
874            } else {
875                log::error!(
876                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
877                );
878                return;
879            }
880        };
881
882        drop(cache);
883
884        match event {
885            OrderEventAny::Filled(fill) => {
886                let oms_type = self.determine_oms_type(fill);
887                let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
888
889                let mut fill = *fill;
890                if fill.position_id.is_none() {
891                    fill.position_id = Some(position_id);
892                }
893
894                if self.apply_fill_to_order(&mut order, fill).is_ok() {
895                    self.handle_order_fill(&order, fill, oms_type);
896                }
897            }
898            _ => {
899                let _ = self.apply_event_to_order(&mut order, event.clone());
900            }
901        }
902    }
903
904    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
905        // Check for strategy OMS override
906        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
907            return *oms_type;
908        }
909
910        // Use native venue OMS
911        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
912            && let Some(client) = self.clients.get(client_id)
913        {
914            return client.oms_type();
915        }
916
917        if let Some(client) = &self.default_client {
918            return client.oms_type();
919        }
920
921        OmsType::Netting // Default fallback
922    }
923
924    fn determine_position_id(
925        &mut self,
926        fill: OrderFilled,
927        oms_type: OmsType,
928        order: Option<&OrderAny>,
929    ) -> PositionId {
930        match oms_type {
931            OmsType::Hedging => self.determine_hedging_position_id(fill, order),
932            OmsType::Netting => self.determine_netting_position_id(fill),
933            _ => self.determine_netting_position_id(fill), // Default to netting
934        }
935    }
936
937    fn determine_hedging_position_id(
938        &mut self,
939        fill: OrderFilled,
940        order: Option<&OrderAny>,
941    ) -> PositionId {
942        // Check if position ID already exists
943        if let Some(position_id) = fill.position_id {
944            if self.config.debug {
945                log::debug!("Already had a position ID of: {position_id}");
946            }
947            return position_id;
948        }
949
950        let cache = self.cache.borrow();
951
952        let order = if let Some(o) = order {
953            o
954        } else {
955            match cache.order(&fill.client_order_id()) {
956                Some(o) => o,
957                None => {
958                    panic!(
959                        "Order for {} not found to determine position ID",
960                        fill.client_order_id()
961                    );
962                }
963            }
964        };
965
966        // Check execution spawn orders
967        if let Some(spawn_id) = order.exec_spawn_id() {
968            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
969            for spawned_order in spawn_orders {
970                if let Some(pos_id) = spawned_order.position_id() {
971                    if self.config.debug {
972                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
973                    }
974                    return pos_id;
975                }
976            }
977        }
978
979        // Generate new position ID
980        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
981        if self.config.debug {
982            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
983        }
984        position_id
985    }
986
987    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
988        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
989    }
990
991    fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
992        if order.is_duplicate_fill(&fill) {
993            log::warn!(
994                "Duplicate fill: {} trade_id={} already applied, skipping",
995                order.client_order_id(),
996                fill.trade_id
997            );
998            anyhow::bail!("Duplicate fill");
999        }
1000
1001        self.check_overfill(order, &fill)?;
1002        let event = OrderEventAny::Filled(fill);
1003        self.apply_order_event(order, event)
1004    }
1005
1006    fn apply_event_to_order(
1007        &self,
1008        order: &mut OrderAny,
1009        event: OrderEventAny,
1010    ) -> anyhow::Result<()> {
1011        self.apply_order_event(order, event)
1012    }
1013
1014    fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
1015        if let Err(e) = order.apply(event.clone()) {
1016            match e {
1017                OrderError::InvalidStateTransition => {
1018                    // Event already applied to order (e.g., from reconciliation or duplicate processing)
1019                    // Log warning and continue with downstream processing (cache update, publishing, etc.)
1020                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
1021                }
1022                OrderError::DuplicateFill(trade_id) => {
1023                    // Duplicate fill detected at order level (secondary safety check)
1024                    log::warn!(
1025                        "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
1026                    );
1027                    anyhow::bail!("{e}");
1028                }
1029                _ => {
1030                    // Protection against invalid IDs and other invariants
1031                    log::error!("Error applying event: {e}, did not apply {event}");
1032                    if should_handle_own_book_order(order) {
1033                        self.cache.borrow_mut().update_own_order_book(order);
1034                    }
1035                    anyhow::bail!("{e}");
1036                }
1037            }
1038        }
1039
1040        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1041            log::error!("Error updating order in cache: {e}");
1042        }
1043
1044        if self.config.debug {
1045            log::debug!("{SEND}{EVT} {event}");
1046        }
1047
1048        let topic = switchboard::get_event_orders_topic(event.strategy_id());
1049        msgbus::publish(topic, &event);
1050
1051        if self.config.snapshot_orders {
1052            self.create_order_state_snapshot(order);
1053        }
1054
1055        Ok(())
1056    }
1057
1058    fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1059        let potential_overfill = order.calculate_overfill(fill.last_qty);
1060
1061        if potential_overfill.is_positive() {
1062            if self.config.allow_overfills {
1063                log::warn!(
1064                    "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1065                    order.client_order_id(),
1066                    potential_overfill,
1067                    order.filled_qty(),
1068                    fill.last_qty,
1069                    order.quantity()
1070                );
1071            } else {
1072                let msg = format!(
1073                    "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1074                Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1075                    order.client_order_id(),
1076                    potential_overfill,
1077                    order.filled_qty(),
1078                    fill.last_qty,
1079                    order.quantity()
1080                );
1081                anyhow::bail!("{msg}");
1082            }
1083        }
1084
1085        Ok(())
1086    }
1087
1088    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1089        let instrument =
1090            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1091                instrument.clone()
1092            } else {
1093                log::error!(
1094                    "Cannot handle order fill: no instrument found for {}, {fill}",
1095                    fill.instrument_id,
1096                );
1097                return;
1098            };
1099
1100        if self.cache.borrow().account(&fill.account_id).is_none() {
1101            log::error!(
1102                "Cannot handle order fill: no account found for {}, {fill}",
1103                fill.instrument_id.venue,
1104            );
1105            return;
1106        }
1107
1108        // Skip portfolio position updates for combo fills (spread instruments)
1109        // Combo fills are only used for order management, not portfolio updates
1110        let position = if instrument.is_spread() {
1111            None
1112        } else {
1113            self.handle_position_update(instrument.clone(), fill, oms_type);
1114            let position_id = fill.position_id.unwrap();
1115            self.cache.borrow().position(&position_id).cloned()
1116        };
1117
1118        // Handle contingent orders for both spread and non-spread instruments
1119        // For spread instruments, contingent orders work without position linkage
1120        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1121            // For non-spread instruments, link to position if available
1122            if !instrument.is_spread()
1123                && let Some(ref pos) = position
1124                && pos.is_open()
1125            {
1126                let position_id = pos.id;
1127                for client_order_id in order.linked_order_ids().unwrap_or_default() {
1128                    let mut cache = self.cache.borrow_mut();
1129                    let contingent_order = cache.mut_order(client_order_id);
1130                    if let Some(contingent_order) = contingent_order
1131                        && contingent_order.position_id().is_none()
1132                    {
1133                        contingent_order.set_position_id(Some(position_id));
1134
1135                        if let Err(e) = self.cache.borrow_mut().add_position_id(
1136                            &position_id,
1137                            &contingent_order.instrument_id().venue,
1138                            &contingent_order.client_order_id(),
1139                            &contingent_order.strategy_id(),
1140                        ) {
1141                            log::error!("Failed to add position ID: {e}");
1142                        }
1143                    }
1144                }
1145            }
1146            // For spread instruments, contingent orders can still be triggered
1147            // but without position linkage (since no position is created for spreads)
1148        }
1149    }
1150
1151    /// Handle position creation or update for a fill.
1152    ///
1153    /// This function mirrors the Python `_handle_position_update` method.
1154    fn handle_position_update(
1155        &mut self,
1156        instrument: InstrumentAny,
1157        fill: OrderFilled,
1158        oms_type: OmsType,
1159    ) {
1160        let position_id = if let Some(position_id) = fill.position_id {
1161            position_id
1162        } else {
1163            log::error!("Cannot handle position update: no position ID found for fill {fill}");
1164            return;
1165        };
1166
1167        let position_opt = self.cache.borrow().position(&position_id).cloned();
1168
1169        match position_opt {
1170            None => {
1171                // Position is None - open new position
1172                if self.open_position(instrument, None, fill, oms_type).is_ok() {
1173                    // Position opened successfully
1174                }
1175            }
1176            Some(pos) if pos.is_closed() => {
1177                // Position is closed - open new position
1178                if self
1179                    .open_position(instrument, Some(&pos), fill, oms_type)
1180                    .is_ok()
1181                {
1182                    // Position opened successfully
1183                }
1184            }
1185            Some(mut pos) => {
1186                if self.will_flip_position(&pos, fill) {
1187                    // Position will flip
1188                    self.flip_position(instrument, &mut pos, fill, oms_type);
1189                } else {
1190                    // Update existing position
1191                    self.update_position(&mut pos, fill);
1192                }
1193            }
1194        }
1195    }
1196
1197    fn open_position(
1198        &self,
1199        instrument: InstrumentAny,
1200        position: Option<&Position>,
1201        fill: OrderFilled,
1202        oms_type: OmsType,
1203    ) -> anyhow::Result<()> {
1204        if let Some(position) = position {
1205            if Self::is_duplicate_closed_fill(position, &fill) {
1206                log::warn!(
1207                    "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1208                    fill.trade_id,
1209                    position.id,
1210                    fill.order_side,
1211                    fill.last_qty,
1212                    fill.last_px
1213                );
1214                return Ok(());
1215            }
1216            self.reopen_position(position, oms_type)?;
1217        }
1218
1219        let position = Position::new(&instrument, fill);
1220        self.cache
1221            .borrow_mut()
1222            .add_position(position.clone(), oms_type)?; // TODO: Remove clone (change method)
1223
1224        if self.config.snapshot_positions {
1225            self.create_position_state_snapshot(&position);
1226        }
1227
1228        let ts_init = self.clock.borrow().timestamp_ns();
1229        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1230        let topic = switchboard::get_event_positions_topic(event.strategy_id);
1231        msgbus::publish(topic, &PositionEvent::PositionOpened(event));
1232
1233        Ok(())
1234    }
1235
1236    fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1237        position.events.iter().any(|event| {
1238            event.trade_id == fill.trade_id
1239                && event.order_side == fill.order_side
1240                && event.last_px == fill.last_px
1241                && event.last_qty == fill.last_qty
1242        })
1243    }
1244
1245    fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1246        if oms_type == OmsType::Netting {
1247            if position.is_open() {
1248                anyhow::bail!(
1249                    "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1250                    position.id
1251                );
1252            }
1253            // Snapshot closed position if reopening (NETTING mode)
1254            self.cache.borrow_mut().snapshot_position(position)?;
1255        } else {
1256            // HEDGING mode
1257            log::warn!(
1258                "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1259                position.id
1260            );
1261        }
1262        Ok(())
1263    }
1264
1265    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1266        // Apply the fill to the position
1267        position.apply(&fill);
1268
1269        // Check if position is closed after applying the fill
1270        let is_closed = position.is_closed();
1271
1272        // Update position in cache - this should handle the closed state tracking
1273        if let Err(e) = self.cache.borrow_mut().update_position(position) {
1274            log::error!("Failed to update position: {e:?}");
1275            return;
1276        }
1277
1278        // Verify cache state after update
1279        let cache = self.cache.borrow();
1280
1281        drop(cache);
1282
1283        // Create position state snapshot if enabled
1284        if self.config.snapshot_positions {
1285            self.create_position_state_snapshot(position);
1286        }
1287
1288        // Create and publish appropriate position event
1289        let topic = switchboard::get_event_positions_topic(position.strategy_id);
1290        let ts_init = self.clock.borrow().timestamp_ns();
1291
1292        if is_closed {
1293            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1294            msgbus::publish(topic, &PositionEvent::PositionClosed(event));
1295        } else {
1296            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1297            msgbus::publish(topic, &PositionEvent::PositionChanged(event));
1298        }
1299    }
1300
1301    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1302        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1303    }
1304
1305    fn flip_position(
1306        &mut self,
1307        instrument: InstrumentAny,
1308        position: &mut Position,
1309        fill: OrderFilled,
1310        oms_type: OmsType,
1311    ) {
1312        let difference = match position.side {
1313            PositionSide::Long => Quantity::from_raw(
1314                fill.last_qty.raw - position.quantity.raw,
1315                position.size_precision,
1316            ),
1317            PositionSide::Short => Quantity::from_raw(
1318                position.quantity.raw.abs_diff(fill.last_qty.raw), // Equivalent to Python's abs(position.quantity - fill.last_qty)
1319                position.size_precision,
1320            ),
1321            _ => fill.last_qty,
1322        };
1323
1324        // Split commission between two positions
1325        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1326        let (commission1, commission2) = if let Some(commission) = fill.commission {
1327            let commission_currency = commission.currency;
1328            let commission1 = Money::new(commission * fill_percent, commission_currency);
1329            let commission2 = commission - commission1;
1330            (Some(commission1), Some(commission2))
1331        } else {
1332            log::error!("Commission is not available.");
1333            (None, None)
1334        };
1335
1336        let mut fill_split1: Option<OrderFilled> = None;
1337        if position.is_open() {
1338            fill_split1 = Some(OrderFilled::new(
1339                fill.trader_id,
1340                fill.strategy_id,
1341                fill.instrument_id,
1342                fill.client_order_id,
1343                fill.venue_order_id,
1344                fill.account_id,
1345                fill.trade_id,
1346                fill.order_side,
1347                fill.order_type,
1348                position.quantity,
1349                fill.last_px,
1350                fill.currency,
1351                fill.liquidity_side,
1352                UUID4::new(),
1353                fill.ts_event,
1354                fill.ts_init,
1355                fill.reconciliation,
1356                fill.position_id,
1357                commission1,
1358            ));
1359
1360            self.update_position(position, fill_split1.unwrap());
1361
1362            // Snapshot closed position before reusing ID (NETTING mode)
1363            if oms_type == OmsType::Netting
1364                && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1365            {
1366                log::error!("Failed to snapshot position during flip: {e:?}");
1367            }
1368        }
1369
1370        // Guard against flipping a position with a zero fill size
1371        if difference.raw == 0 {
1372            log::warn!(
1373                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1374            );
1375            return;
1376        }
1377
1378        let position_id_flip = if oms_type == OmsType::Hedging
1379            && let Some(position_id) = fill.position_id
1380            && position_id.is_virtual()
1381        {
1382            // Generate new position ID for flipped virtual position (Hedging OMS only)
1383            Some(self.pos_id_generator.generate(fill.strategy_id, true))
1384        } else {
1385            // Default: use the same position ID as the fill (Python behavior)
1386            fill.position_id
1387        };
1388
1389        let fill_split2 = OrderFilled::new(
1390            fill.trader_id,
1391            fill.strategy_id,
1392            fill.instrument_id,
1393            fill.client_order_id,
1394            fill.venue_order_id,
1395            fill.account_id,
1396            fill.trade_id,
1397            fill.order_side,
1398            fill.order_type,
1399            difference,
1400            fill.last_px,
1401            fill.currency,
1402            fill.liquidity_side,
1403            UUID4::new(),
1404            fill.ts_event,
1405            fill.ts_init,
1406            fill.reconciliation,
1407            position_id_flip,
1408            commission2,
1409        );
1410
1411        if oms_type == OmsType::Hedging
1412            && let Some(position_id) = fill.position_id
1413            && position_id.is_virtual()
1414        {
1415            log::warn!("Closing position {fill_split1:?}");
1416            log::warn!("Flipping position {fill_split2:?}");
1417        }
1418
1419        // Open flipped position
1420        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1421            log::error!("Failed to open flipped position: {e:?}");
1422        }
1423    }
1424
1425    // -- INTERNAL --------------------------------------------------------------------------------
1426
1427    fn set_position_id_counts(&mut self) {
1428        // For the internal position ID generator
1429        let cache = self.cache.borrow();
1430        let positions = cache.positions(None, None, None, None);
1431
1432        // Count positions per instrument_id using a HashMap
1433        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1434
1435        for position in positions {
1436            *counts.entry(position.strategy_id).or_insert(0) += 1;
1437        }
1438
1439        self.pos_id_generator.reset();
1440
1441        for (strategy_id, count) in counts {
1442            self.pos_id_generator.set_count(count, strategy_id);
1443            log::info!("Set PositionId count for {strategy_id} to {count}");
1444        }
1445    }
1446
1447    fn last_px_for_conversion(
1448        &self,
1449        instrument_id: &InstrumentId,
1450        side: OrderSide,
1451    ) -> Option<Price> {
1452        let cache = self.cache.borrow();
1453
1454        // Try to get last trade price
1455        if let Some(trade) = cache.trade(instrument_id) {
1456            return Some(trade.price);
1457        }
1458
1459        // Fall back to quote if available
1460        if let Some(quote) = cache.quote(instrument_id) {
1461            match side {
1462                OrderSide::Buy => Some(quote.ask_price),
1463                OrderSide::Sell => Some(quote.bid_price),
1464                OrderSide::NoOrderSide => None,
1465            }
1466        } else {
1467            None
1468        }
1469    }
1470
1471    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1472        log::info!(
1473            "Setting {} order quote quantity {} to base quantity {}",
1474            order.instrument_id(),
1475            order.quantity(),
1476            base_qty
1477        );
1478
1479        let original_qty = order.quantity();
1480        order.set_quantity(base_qty);
1481        order.set_leaves_qty(base_qty);
1482        order.set_is_quote_quantity(false);
1483
1484        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1485            return;
1486        }
1487
1488        if let Some(linked_order_ids) = order.linked_order_ids() {
1489            for client_order_id in linked_order_ids {
1490                match self.cache.borrow_mut().mut_order(client_order_id) {
1491                    Some(contingent_order) => {
1492                        if !contingent_order.is_quote_quantity() {
1493                            continue; // Already base quantity
1494                        }
1495
1496                        if contingent_order.quantity() != original_qty {
1497                            log::warn!(
1498                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1499                                contingent_order.quantity(),
1500                                original_qty,
1501                                base_qty
1502                            );
1503                        }
1504
1505                        log::info!(
1506                            "Setting {} order quote quantity {} to base quantity {}",
1507                            contingent_order.instrument_id(),
1508                            contingent_order.quantity(),
1509                            base_qty
1510                        );
1511
1512                        contingent_order.set_quantity(base_qty);
1513                        contingent_order.set_leaves_qty(base_qty);
1514                        contingent_order.set_is_quote_quantity(false);
1515                    }
1516                    None => {
1517                        log::error!("Contingency order {client_order_id} not found");
1518                    }
1519                }
1520            }
1521        } else {
1522            log::warn!(
1523                "No linked order IDs found for order {}",
1524                order.client_order_id()
1525            );
1526        }
1527    }
1528
1529    fn deny_order(&self, order: &OrderAny, reason: &str) {
1530        log::error!(
1531            "Order denied: {reason}, order ID: {}",
1532            order.client_order_id()
1533        );
1534
1535        let denied = OrderDenied::new(
1536            order.trader_id(),
1537            order.strategy_id(),
1538            order.instrument_id(),
1539            order.client_order_id(),
1540            reason.into(),
1541            UUID4::new(),
1542            self.clock.borrow().timestamp_ns(),
1543            self.clock.borrow().timestamp_ns(),
1544        );
1545
1546        let mut order = order.clone();
1547
1548        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1549            log::error!("Failed to apply denied event to order: {e}");
1550            return;
1551        }
1552
1553        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1554            log::error!("Failed to update order in cache: {e}");
1555            return;
1556        }
1557
1558        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1559        msgbus::publish(topic, &OrderEventAny::Denied(denied));
1560
1561        if self.config.snapshot_orders {
1562            self.create_order_state_snapshot(&order);
1563        }
1564    }
1565
1566    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1567        let mut cache = self.cache.borrow_mut();
1568        if cache.own_order_book_mut(instrument_id).is_none() {
1569            let own_book = OwnOrderBook::new(*instrument_id);
1570            cache.add_own_order_book(own_book).unwrap();
1571        }
1572
1573        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1574    }
1575}