nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24pub mod stubs;
25
26use std::{
27    cell::{RefCell, RefMut},
28    collections::{HashMap, HashSet},
29    fmt::Debug,
30    rc::Rc,
31    time::SystemTime,
32};
33
34use ahash::{AHashMap, AHashSet};
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use nautilus_common::{
38    cache::Cache,
39    clients::ExecutionClient,
40    clock::Clock,
41    generators::position_id::PositionIdGenerator,
42    logging::{CMD, EVT, RECV, SEND},
43    messages::{
44        ExecutionReport,
45        execution::{
46            BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47            SubmitOrder, SubmitOrderList, TradingCommand,
48        },
49    },
50    msgbus::{
51        self, get_message_bus,
52        switchboard::{self},
53    },
54};
55use nautilus_core::UUID4;
56use nautilus_model::{
57    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
58    events::{
59        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
60        PositionEvent, PositionOpened,
61    },
62    identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
63    instruments::{Instrument, InstrumentAny},
64    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
65    orders::{Order, OrderAny, OrderError},
66    position::Position,
67    reports::{ExecutionMassStatus, OrderStatusReport},
68    types::{Money, Price, Quantity},
69};
70
71use crate::{client::ExecutionClientAdapter, reconciliation::reconcile_order_report};
72
73/// Central execution engine responsible for orchestrating order routing and execution.
74///
75/// The execution engine manages the entire order lifecycle from submission to completion,
76/// handling routing to appropriate execution clients, position management, and event
77/// processing. It supports multiple execution venues through registered clients and
78/// provides sophisticated order management capabilities.
79pub struct ExecutionEngine {
80    clock: Rc<RefCell<dyn Clock>>,
81    cache: Rc<RefCell<Cache>>,
82    clients: AHashMap<ClientId, ExecutionClientAdapter>,
83    default_client: Option<ExecutionClientAdapter>,
84    routing_map: HashMap<Venue, ClientId>,
85    oms_overrides: HashMap<StrategyId, OmsType>,
86    external_order_claims: HashMap<InstrumentId, StrategyId>,
87    external_clients: HashSet<ClientId>,
88    pos_id_generator: PositionIdGenerator,
89    config: ExecutionEngineConfig,
90}
91
92impl Debug for ExecutionEngine {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct(stringify!(ExecutionEngine))
95            .field("client_count", &self.clients.len())
96            .finish()
97    }
98}
99
100impl ExecutionEngine {
101    /// Creates a new [`ExecutionEngine`] instance.
102    pub fn new(
103        clock: Rc<RefCell<dyn Clock>>,
104        cache: Rc<RefCell<Cache>>,
105        config: Option<ExecutionEngineConfig>,
106    ) -> Self {
107        let trader_id = get_message_bus().borrow().trader_id;
108        Self {
109            clock: clock.clone(),
110            cache,
111            clients: AHashMap::new(),
112            default_client: None,
113            routing_map: HashMap::new(),
114            oms_overrides: HashMap::new(),
115            external_order_claims: HashMap::new(),
116            external_clients: config
117                .as_ref()
118                .and_then(|c| c.external_clients.clone())
119                .unwrap_or_default()
120                .into_iter()
121                .collect(),
122            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
123            config: config.unwrap_or_default(),
124        }
125    }
126
127    #[must_use]
128    /// Returns the position ID count for the specified strategy.
129    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
130        self.pos_id_generator.count(strategy_id)
131    }
132
133    #[must_use]
134    /// Returns a reference to the cache.
135    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
136        &self.cache
137    }
138
139    #[must_use]
140    /// Returns a reference to the configuration.
141    pub const fn config(&self) -> &ExecutionEngineConfig {
142        &self.config
143    }
144
145    #[must_use]
146    /// Checks the integrity of cached execution data.
147    pub fn check_integrity(&self) -> bool {
148        self.cache.borrow_mut().check_integrity()
149    }
150
151    #[must_use]
152    /// Returns true if all registered execution clients are connected.
153    pub fn check_connected(&self) -> bool {
154        let clients_connected = self.clients.values().all(|c| c.is_connected());
155        let default_connected = self
156            .default_client
157            .as_ref()
158            .is_none_or(|c| c.is_connected());
159        clients_connected && default_connected
160    }
161
162    #[must_use]
163    /// Returns true if all registered execution clients are disconnected.
164    pub fn check_disconnected(&self) -> bool {
165        let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
166        let default_disconnected = self
167            .default_client
168            .as_ref()
169            .is_none_or(|c| !c.is_connected());
170        clients_disconnected && default_disconnected
171    }
172
173    #[must_use]
174    /// Checks for residual positions and orders in the cache.
175    pub fn check_residuals(&self) -> bool {
176        self.cache.borrow().check_residuals()
177    }
178
179    #[must_use]
180    /// Returns the set of instruments that have external order claims.
181    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
182        self.external_order_claims.keys().copied().collect()
183    }
184
185    #[must_use]
186    /// Returns the configured external client IDs.
187    pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
188        self.external_clients.clone()
189    }
190
191    #[must_use]
192    /// Returns any external order claim for the given instrument ID.
193    pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
194        self.external_order_claims.get(instrument_id).copied()
195    }
196
197    /// Registers a new execution client.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if a client with the same ID is already registered.
202    pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
203        let client_id = client.client_id();
204        let venue = client.venue();
205
206        if self.clients.contains_key(&client_id) {
207            anyhow::bail!("Client already registered with ID {client_id}");
208        }
209
210        let adapter = ExecutionClientAdapter::new(client);
211
212        self.routing_map.insert(venue, client_id);
213
214        log::debug!("Registered client {client_id}");
215        self.clients.insert(client_id, adapter);
216        Ok(())
217    }
218
219    /// Registers a default execution client for fallback routing.
220    pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
221        let client_id = client.client_id();
222        let adapter = ExecutionClientAdapter::new(client);
223
224        log::debug!("Registered default client {client_id}");
225        self.default_client = Some(adapter);
226    }
227
228    #[must_use]
229    /// Returns a reference to the execution client registered with the given ID.
230    pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
231        self.clients.get(client_id).map(|a| a.client.as_ref())
232    }
233
234    #[must_use]
235    /// Returns a mutable reference to the execution client adapter registered with the given ID.
236    pub fn get_client_adapter_mut(
237        &mut self,
238        client_id: &ClientId,
239    ) -> Option<&mut ExecutionClientAdapter> {
240        if let Some(default) = &self.default_client
241            && &default.client_id == client_id
242        {
243            return self.default_client.as_mut();
244        }
245        self.clients.get_mut(client_id)
246    }
247
248    /// Generates mass status for the given client.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if the client is not found or mass status generation fails.
253    pub async fn generate_mass_status(
254        &mut self,
255        client_id: &ClientId,
256        lookback_mins: Option<u64>,
257    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
258        if let Some(client) = self.get_client_adapter_mut(client_id) {
259            client.generate_mass_status(lookback_mins).await
260        } else {
261            anyhow::bail!("Client {client_id} not found")
262        }
263    }
264
265    #[must_use]
266    /// Returns all registered execution client IDs.
267    pub fn client_ids(&self) -> Vec<ClientId> {
268        let mut ids: Vec<_> = self.clients.keys().copied().collect();
269
270        if let Some(default) = &self.default_client {
271            ids.push(default.client_id);
272        }
273        ids
274    }
275
276    #[must_use]
277    /// Returns mutable access to all registered execution clients.
278    pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
279        let mut adapters: Vec<_> = self.clients.values_mut().collect();
280        if let Some(default) = &mut self.default_client {
281            adapters.push(default);
282        }
283        adapters
284    }
285
286    #[must_use]
287    /// Returns execution clients that would handle the given orders.
288    ///
289    /// This method first attempts to resolve each order's originating client from the cache,
290    /// then falls back to venue routing for any orders without a cached client.
291    pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
292        let mut client_ids: AHashSet<ClientId> = AHashSet::new();
293        let mut venues: AHashSet<Venue> = AHashSet::new();
294
295        // Collect client IDs from cache and venues for fallback
296        for order in orders {
297            venues.insert(order.instrument_id().venue);
298            if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
299                client_ids.insert(*client_id);
300            }
301        }
302
303        let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
304
305        // Add clients for cached client IDs (orders go back to originating client)
306        for client_id in &client_ids {
307            if let Some(adapter) = self.clients.get(client_id)
308                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
309            {
310                clients.push(adapter.client.as_ref());
311            }
312        }
313
314        // Add clients for venue routing (for orders not in cache)
315        for venue in &venues {
316            if let Some(client_id) = self.routing_map.get(venue) {
317                if let Some(adapter) = self.clients.get(client_id)
318                    && !clients.iter().any(|c| c.client_id() == adapter.client_id)
319                {
320                    clients.push(adapter.client.as_ref());
321                }
322            } else if let Some(adapter) = &self.default_client
323                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
324            {
325                clients.push(adapter.client.as_ref());
326            }
327        }
328
329        clients
330    }
331
332    /// Sets routing for a specific venue to a given client ID.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the client ID is not registered.
337    pub fn register_venue_routing(
338        &mut self,
339        client_id: ClientId,
340        venue: Venue,
341    ) -> anyhow::Result<()> {
342        if !self.clients.contains_key(&client_id) {
343            anyhow::bail!("No client registered with ID {client_id}");
344        }
345
346        self.routing_map.insert(venue, client_id);
347        log::info!("Set client {client_id} routing for {venue}");
348        Ok(())
349    }
350
351    /// Registers the OMS (Order Management System) type for a strategy.
352    ///
353    /// If an OMS type is already registered for this strategy, it will be overridden.
354    pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
355        self.oms_overrides.insert(strategy_id, oms_type);
356        log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
357    }
358
359    /// Registers external order claims for a strategy.
360    ///
361    /// This operation is atomic: either all instruments are registered or none are.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if any instrument already has a registered claim.
366    pub fn register_external_order_claims(
367        &mut self,
368        strategy_id: StrategyId,
369        instrument_ids: HashSet<InstrumentId>,
370    ) -> anyhow::Result<()> {
371        // Validate all instruments first
372        for instrument_id in &instrument_ids {
373            if let Some(existing) = self.external_order_claims.get(instrument_id) {
374                anyhow::bail!(
375                    "External order claim for {instrument_id} already exists for {existing}"
376                );
377            }
378        }
379
380        // If validation passed, insert all claims
381        for instrument_id in &instrument_ids {
382            self.external_order_claims
383                .insert(*instrument_id, strategy_id);
384        }
385
386        if !instrument_ids.is_empty() {
387            log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
388        }
389
390        Ok(())
391    }
392
393    /// # Errors
394    ///
395    /// Returns an error if no client is registered with the given ID.
396    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
397        if self.clients.remove(&client_id).is_some() {
398            // Remove from routing map if present
399            self.routing_map
400                .retain(|_, mapped_id| mapped_id != &client_id);
401            log::info!("Deregistered client {client_id}");
402            Ok(())
403        } else {
404            anyhow::bail!("No client registered with ID {client_id}")
405        }
406    }
407
408    /// Connects all registered execution clients concurrently.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if any client fails to connect.
413    pub async fn connect(&mut self) -> anyhow::Result<()> {
414        let futures: Vec<_> = self
415            .get_clients_mut()
416            .into_iter()
417            .map(|client| client.connect())
418            .collect();
419
420        let results = join_all(futures).await;
421        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
422
423        if errors.is_empty() {
424            Ok(())
425        } else {
426            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
427            anyhow::bail!(
428                "Failed to connect execution clients: {}",
429                error_msgs.join("; ")
430            )
431        }
432    }
433
434    /// Disconnects all registered execution clients concurrently.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if any client fails to disconnect.
439    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
440        let futures: Vec<_> = self
441            .get_clients_mut()
442            .into_iter()
443            .map(|client| client.disconnect())
444            .collect();
445
446        let results = join_all(futures).await;
447        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
448
449        if errors.is_empty() {
450            Ok(())
451        } else {
452            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
453            anyhow::bail!(
454                "Failed to disconnect execution clients: {}",
455                error_msgs.join("; ")
456            )
457        }
458    }
459
460    /// Sets the `manage_own_order_books` configuration option.
461    pub fn set_manage_own_order_books(&mut self, value: bool) {
462        self.config.manage_own_order_books = value;
463    }
464
465    /// Sets the `convert_quote_qty_to_base` configuration option.
466    pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
467        self.config.convert_quote_qty_to_base = value;
468    }
469
470    /// Starts the position snapshot timer if configured.
471    ///
472    /// Timer functionality requires a live execution context with an active clock.
473    pub fn start_snapshot_timer(&mut self) {
474        if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
475            log::info!("Starting position snapshots timer at {interval_secs} second intervals");
476        }
477    }
478
479    /// Stops the position snapshot timer if running.
480    pub fn stop_snapshot_timer(&mut self) {
481        if self.config.snapshot_positions_interval_secs.is_some() {
482            log::info!("Canceling position snapshots timer");
483        }
484    }
485
486    /// Creates snapshots of all open positions.
487    pub fn snapshot_open_position_states(&self) {
488        let positions: Vec<Position> = self
489            .cache
490            .borrow()
491            .positions_open(None, None, None, None)
492            .into_iter()
493            .cloned()
494            .collect();
495
496        for position in positions {
497            self.create_position_state_snapshot(&position);
498        }
499    }
500
501    #[allow(clippy::await_holding_refcell_ref)]
502    /// Loads persistent state into cache and rebuilds indices.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if any cache operation fails.
507    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
508        let ts = SystemTime::now();
509
510        {
511            let mut cache = self.cache.borrow_mut();
512            cache.clear_index();
513            cache.cache_general()?;
514            self.cache.borrow_mut().cache_all().await?;
515            cache.build_index();
516            let _ = cache.check_integrity();
517
518            if self.config.manage_own_order_books {
519                for order in cache.orders(None, None, None, None) {
520                    if order.is_closed() || !should_handle_own_book_order(order) {
521                        continue;
522                    }
523                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
524                    own_book.add(order.to_own_book_order());
525                }
526            }
527        }
528
529        self.set_position_id_counts();
530
531        log::info!(
532            "Loaded cache in {}ms",
533            SystemTime::now()
534                .duration_since(ts)
535                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
536                .as_millis()
537        );
538
539        Ok(())
540    }
541
542    /// Flushes the database to persist all cached data.
543    pub fn flush_db(&self) {
544        self.cache.borrow_mut().flush_db();
545    }
546
547    /// Reconciles an execution report received at runtime.
548    ///
549    /// Dispatches to the appropriate handler based on report type.
550    pub fn reconcile_execution_report(&mut self, report: &ExecutionReport) {
551        match report {
552            ExecutionReport::Order(order_report) => {
553                self.reconcile_order_status_report(order_report);
554            }
555            ExecutionReport::Fill(fill_report) => {
556                // TODO: Implement
557                log::debug!("Received fill report: {fill_report:?}");
558            }
559            ExecutionReport::Position(position_report) => {
560                // TODO: Implement
561                log::debug!("Received position report: {position_report:?}");
562            }
563            ExecutionReport::MassStatus(mass_status) => {
564                // TODO: Implement
565                log::warn!(
566                    "Unexpected mass status in reconcile_report: {:?}",
567                    mass_status.account_id
568                );
569            }
570        }
571    }
572
573    /// Reconciles an order status report received at runtime.
574    ///
575    /// Handles order status transitions by generating appropriate events when the venue
576    /// reports a different status than our local state. Supports all order states including
577    /// fills with inferred fill generation when instruments are available.
578    pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
579        let cache = self.cache.borrow();
580
581        let order = report
582            .client_order_id
583            .and_then(|id| cache.order(&id).cloned())
584            .or_else(|| {
585                cache
586                    .client_order_id(&report.venue_order_id)
587                    .and_then(|cid| cache.order(cid).cloned())
588            });
589
590        let Some(order) = order else {
591            log::debug!(
592                "Order not found in cache for reconciliation: client_order_id={:?}, venue_order_id={}",
593                report.client_order_id,
594                report.venue_order_id
595            );
596            return;
597        };
598
599        let instrument = cache.instrument(&report.instrument_id).cloned();
600
601        drop(cache);
602
603        let ts_now = self.clock.borrow().timestamp_ns();
604
605        if let Some(event) = reconcile_order_report(&order, report, instrument.as_ref(), ts_now) {
606            self.handle_event(&event);
607        }
608    }
609
610    /// Executes a trading command by routing it to the appropriate execution client.
611    pub fn execute(&self, command: &TradingCommand) {
612        self.execute_command(command);
613    }
614
615    /// Processes an order event, updating internal state and routing as needed.
616    pub fn process(&mut self, event: &OrderEventAny) {
617        self.handle_event(event);
618    }
619
620    /// Starts the execution engine.
621    pub fn start(&mut self) {
622        self.start_snapshot_timer();
623
624        log::info!("Started");
625    }
626
627    /// Stops the execution engine.
628    pub fn stop(&mut self) {
629        self.stop_snapshot_timer();
630
631        log::info!("Stopped");
632    }
633
634    /// Resets the execution engine to its initial state.
635    pub fn reset(&mut self) {
636        self.pos_id_generator.reset();
637
638        log::info!("Reset");
639    }
640
641    /// Disposes of the execution engine, releasing resources.
642    pub fn dispose(&mut self) {
643        log::info!("Disposed");
644    }
645
646    fn execute_command(&self, command: &TradingCommand) {
647        if self.config.debug {
648            log::debug!("{RECV}{CMD} {command:?}");
649        }
650
651        if let Some(cid) = command.client_id()
652            && self.external_clients.contains(&cid)
653        {
654            if self.config.debug {
655                log::debug!("Skipping execution command for external client {cid}: {command:?}");
656            }
657            return;
658        }
659
660        let client = if let Some(adapter) = command
661            .client_id()
662            .and_then(|cid| self.clients.get(&cid))
663            .or_else(|| {
664                self.routing_map
665                    .get(&command.instrument_id().venue)
666                    .and_then(|client_id| self.clients.get(client_id))
667            })
668            .or(self.default_client.as_ref())
669        {
670            adapter.client.as_ref()
671        } else {
672            log::error!(
673                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
674                command.client_id(),
675                command.instrument_id().venue,
676            );
677            return;
678        };
679
680        match command {
681            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
682            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
683            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
684            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
685            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
686            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
687            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
688            TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
689        }
690    }
691
692    fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
693        let mut order = cmd.order.clone();
694        let client_order_id = order.client_order_id();
695        let instrument_id = order.instrument_id();
696
697        // Check if the order exists in the cache
698        if !self.cache.borrow().order_exists(&client_order_id) {
699            // Add order to cache in a separate scope to drop the mutable borrow
700            {
701                let mut cache = self.cache.borrow_mut();
702                if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
703                {
704                    log::error!("Error adding order to cache: {e}");
705                    return;
706                }
707            }
708
709            if self.config.snapshot_orders {
710                self.create_order_state_snapshot(&order);
711            }
712        }
713
714        // Get instrument in a separate scope to manage borrows
715        let instrument = {
716            let cache = self.cache.borrow();
717            if let Some(instrument) = cache.instrument(&instrument_id) {
718                instrument.clone()
719            } else {
720                log::error!(
721                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
722                );
723                return;
724            }
725        };
726
727        // Handle quote quantity conversion
728        if self.config.convert_quote_qty_to_base
729            && !instrument.is_inverse()
730            && order.is_quote_quantity()
731        {
732            log::warn!(
733                "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
734            );
735            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
736
737            if let Some(price) = last_px {
738                let base_qty = instrument.get_base_quantity(order.quantity(), price);
739                self.set_order_base_qty(&mut order, base_qty);
740            } else {
741                self.deny_order(
742                    &order,
743                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
744                );
745                return;
746            }
747        }
748
749        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
750            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
751            own_book.add(order.to_own_book_order());
752        }
753
754        // Send the order to the execution client
755        if let Err(e) = client.submit_order(cmd) {
756            log::error!("Error submitting order to client: {e}");
757            self.deny_order(
758                &cmd.order,
759                &format!("failed-to-submit-order-to-client: {e}"),
760            );
761        }
762    }
763
764    fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
765        let orders = cmd.order_list.orders.clone();
766
767        let mut cache = self.cache.borrow_mut();
768        for order in &orders {
769            if !cache.order_exists(&order.client_order_id()) {
770                if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
771                {
772                    log::error!("Error adding order to cache: {e}");
773                    return;
774                }
775
776                if self.config.snapshot_orders {
777                    self.create_order_state_snapshot(order);
778                }
779            }
780        }
781        drop(cache);
782
783        let instrument = {
784            let cache = self.cache.borrow();
785            if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
786                instrument.clone()
787            } else {
788                log::error!(
789                    "Cannot handle submit order list: no instrument found for {}, {cmd}",
790                    cmd.instrument_id,
791                );
792                return;
793            }
794        };
795
796        // Handle quote quantity conversion
797        if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
798            let mut conversions: Vec<(ClientOrderId, Quantity)> =
799                Vec::with_capacity(cmd.order_list.orders.len());
800
801            for order in &cmd.order_list.orders {
802                if !order.is_quote_quantity() {
803                    continue; // Base quantity already set
804                }
805
806                let last_px =
807                    self.last_px_for_conversion(&order.instrument_id(), order.order_side());
808
809                if let Some(px) = last_px {
810                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
811                    conversions.push((order.client_order_id(), base_qty));
812                } else {
813                    for order in &cmd.order_list.orders {
814                        self.deny_order(
815                            order,
816                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
817                        );
818                    }
819                    return; // Denied
820                }
821            }
822
823            if !conversions.is_empty() {
824                log::warn!(
825                    "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
826                );
827
828                let mut cache = self.cache.borrow_mut();
829                for (client_order_id, base_qty) in conversions {
830                    if let Some(mut_order) = cache.mut_order(&client_order_id) {
831                        self.set_order_base_qty(mut_order, base_qty);
832                    }
833                }
834            }
835        }
836
837        if self.config.manage_own_order_books {
838            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
839            for order in &cmd.order_list.orders {
840                if should_handle_own_book_order(order) {
841                    own_book.add(order.to_own_book_order());
842                }
843            }
844        }
845
846        // Send to execution client
847        if let Err(e) = client.submit_order_list(cmd) {
848            log::error!("Error submitting order list to client: {e}");
849            for order in &orders {
850                self.deny_order(
851                    order,
852                    &format!("failed-to-submit-order-list-to-client: {e}"),
853                );
854            }
855        }
856    }
857
858    fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
859        if let Err(e) = client.modify_order(cmd) {
860            log::error!("Error modifying order: {e}");
861        }
862    }
863
864    fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
865        if let Err(e) = client.cancel_order(cmd) {
866            log::error!("Error canceling order: {e}");
867        }
868    }
869
870    fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
871        if let Err(e) = client.cancel_all_orders(cmd) {
872            log::error!("Error canceling all orders: {e}");
873        }
874    }
875
876    fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
877        if let Err(e) = client.batch_cancel_orders(cmd) {
878            log::error!("Error batch canceling orders: {e}");
879        }
880    }
881
882    fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
883        if let Err(e) = client.query_account(cmd) {
884            log::error!("Error querying account: {e}");
885        }
886    }
887
888    fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
889        if let Err(e) = client.query_order(cmd) {
890            log::error!("Error querying order: {e}");
891        }
892    }
893
894    fn create_order_state_snapshot(&self, order: &OrderAny) {
895        if self.config.debug {
896            log::debug!("Creating order state snapshot for {order}");
897        }
898
899        if self.cache.borrow().has_backing()
900            && let Err(e) = self.cache.borrow().snapshot_order_state(order)
901        {
902            log::error!("Failed to snapshot order state: {e}");
903            return;
904        }
905
906        if get_message_bus().borrow().has_backing {
907            let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
908            msgbus::publish(topic, order);
909        }
910    }
911
912    fn create_position_state_snapshot(&self, position: &Position) {
913        if self.config.debug {
914            log::debug!("Creating position state snapshot for {position}");
915        }
916
917        // let mut position: Position = position.clone();
918        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
919        //     position.unrealized_pnl(last)
920        // }
921
922        let topic = switchboard::get_positions_snapshots_topic(position.id);
923        msgbus::publish(topic, position);
924    }
925
926    fn handle_event(&mut self, event: &OrderEventAny) {
927        if self.config.debug {
928            log::debug!("{RECV}{EVT} {event:?}");
929        }
930
931        let client_order_id = event.client_order_id();
932        let cache = self.cache.borrow();
933        let mut order = if let Some(order) = cache.order(&client_order_id) {
934            order.clone()
935        } else {
936            log::warn!(
937                "Order with {} not found in the cache to apply {}",
938                event.client_order_id(),
939                event
940            );
941
942            // Try to find order by venue order ID if available
943            let venue_order_id = if let Some(id) = event.venue_order_id() {
944                id
945            } else {
946                log::error!(
947                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
948                    event.client_order_id()
949                );
950                return;
951            };
952
953            // Look up client order ID from venue order ID
954            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
955                id
956            } else {
957                log::error!(
958                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
959                    event.client_order_id(),
960                );
961                return;
962            };
963
964            // Get order using found client order ID
965            if let Some(order) = cache.order(client_order_id) {
966                log::info!("Order with {client_order_id} was found in the cache");
967                order.clone()
968            } else {
969                log::error!(
970                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
971                );
972                return;
973            }
974        };
975
976        drop(cache);
977
978        match event {
979            OrderEventAny::Filled(fill) => {
980                let oms_type = self.determine_oms_type(fill);
981                let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
982
983                let mut fill = *fill;
984                if fill.position_id.is_none() {
985                    fill.position_id = Some(position_id);
986                }
987
988                if self.apply_fill_to_order(&mut order, fill).is_ok() {
989                    self.handle_order_fill(&order, fill, oms_type);
990                }
991            }
992            _ => {
993                let _ = self.apply_event_to_order(&mut order, event.clone());
994            }
995        }
996    }
997
998    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
999        // Check for strategy OMS override
1000        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
1001            return *oms_type;
1002        }
1003
1004        // Use native venue OMS
1005        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
1006            && let Some(client) = self.clients.get(client_id)
1007        {
1008            return client.oms_type();
1009        }
1010
1011        if let Some(client) = &self.default_client {
1012            return client.oms_type();
1013        }
1014
1015        OmsType::Netting // Default fallback
1016    }
1017
1018    fn determine_position_id(
1019        &mut self,
1020        fill: OrderFilled,
1021        oms_type: OmsType,
1022        order: Option<&OrderAny>,
1023    ) -> PositionId {
1024        match oms_type {
1025            OmsType::Hedging => self.determine_hedging_position_id(fill, order),
1026            OmsType::Netting => self.determine_netting_position_id(fill),
1027            _ => self.determine_netting_position_id(fill), // Default to netting
1028        }
1029    }
1030
1031    fn determine_hedging_position_id(
1032        &mut self,
1033        fill: OrderFilled,
1034        order: Option<&OrderAny>,
1035    ) -> PositionId {
1036        // Check if position ID already exists
1037        if let Some(position_id) = fill.position_id {
1038            if self.config.debug {
1039                log::debug!("Already had a position ID of: {position_id}");
1040            }
1041            return position_id;
1042        }
1043
1044        let cache = self.cache.borrow();
1045
1046        let order = if let Some(o) = order {
1047            o
1048        } else {
1049            match cache.order(&fill.client_order_id()) {
1050                Some(o) => o,
1051                None => {
1052                    panic!(
1053                        "Order for {} not found to determine position ID",
1054                        fill.client_order_id()
1055                    );
1056                }
1057            }
1058        };
1059
1060        // Check execution spawn orders
1061        if let Some(spawn_id) = order.exec_spawn_id() {
1062            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
1063            for spawned_order in spawn_orders {
1064                if let Some(pos_id) = spawned_order.position_id() {
1065                    if self.config.debug {
1066                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
1067                    }
1068                    return pos_id;
1069                }
1070            }
1071        }
1072
1073        // Generate new position ID
1074        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
1075        if self.config.debug {
1076            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
1077        }
1078        position_id
1079    }
1080
1081    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
1082        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
1083    }
1084
1085    fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
1086        if order.is_duplicate_fill(&fill) {
1087            log::warn!(
1088                "Duplicate fill: {} trade_id={} already applied, skipping",
1089                order.client_order_id(),
1090                fill.trade_id
1091            );
1092            anyhow::bail!("Duplicate fill");
1093        }
1094
1095        self.check_overfill(order, &fill)?;
1096        let event = OrderEventAny::Filled(fill);
1097        self.apply_order_event(order, event)
1098    }
1099
1100    fn apply_event_to_order(
1101        &self,
1102        order: &mut OrderAny,
1103        event: OrderEventAny,
1104    ) -> anyhow::Result<()> {
1105        self.apply_order_event(order, event)
1106    }
1107
1108    fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
1109        if let Err(e) = order.apply(event.clone()) {
1110            match e {
1111                OrderError::InvalidStateTransition => {
1112                    // Event already applied to order (e.g., from reconciliation or duplicate processing)
1113                    // Log warning and continue with downstream processing (cache update, publishing, etc.)
1114                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
1115                }
1116                OrderError::DuplicateFill(trade_id) => {
1117                    // Duplicate fill detected at order level (secondary safety check)
1118                    log::warn!(
1119                        "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
1120                    );
1121                    anyhow::bail!("{e}");
1122                }
1123                _ => {
1124                    // Protection against invalid IDs and other invariants
1125                    log::error!("Error applying event: {e}, did not apply {event}");
1126                    if should_handle_own_book_order(order) {
1127                        self.cache.borrow_mut().update_own_order_book(order);
1128                    }
1129                    anyhow::bail!("{e}");
1130                }
1131            }
1132        }
1133
1134        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1135            log::error!("Error updating order in cache: {e}");
1136        }
1137
1138        if self.config.debug {
1139            log::debug!("{SEND}{EVT} {event}");
1140        }
1141
1142        let topic = switchboard::get_event_orders_topic(event.strategy_id());
1143        msgbus::publish(topic, &event);
1144
1145        if self.config.snapshot_orders {
1146            self.create_order_state_snapshot(order);
1147        }
1148
1149        Ok(())
1150    }
1151
1152    fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1153        let potential_overfill = order.calculate_overfill(fill.last_qty);
1154
1155        if potential_overfill.is_positive() {
1156            if self.config.allow_overfills {
1157                log::warn!(
1158                    "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1159                    order.client_order_id(),
1160                    potential_overfill,
1161                    order.filled_qty(),
1162                    fill.last_qty,
1163                    order.quantity()
1164                );
1165            } else {
1166                let msg = format!(
1167                    "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1168                Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1169                    order.client_order_id(),
1170                    potential_overfill,
1171                    order.filled_qty(),
1172                    fill.last_qty,
1173                    order.quantity()
1174                );
1175                anyhow::bail!("{msg}");
1176            }
1177        }
1178
1179        Ok(())
1180    }
1181
1182    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1183        let instrument =
1184            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1185                instrument.clone()
1186            } else {
1187                log::error!(
1188                    "Cannot handle order fill: no instrument found for {}, {fill}",
1189                    fill.instrument_id,
1190                );
1191                return;
1192            };
1193
1194        if self.cache.borrow().account(&fill.account_id).is_none() {
1195            log::error!(
1196                "Cannot handle order fill: no account found for {}, {fill}",
1197                fill.instrument_id.venue,
1198            );
1199            return;
1200        }
1201
1202        // Skip portfolio position updates for combo fills (spread instruments)
1203        // Combo fills are only used for order management, not portfolio updates
1204        let position = if instrument.is_spread() {
1205            None
1206        } else {
1207            self.handle_position_update(instrument.clone(), fill, oms_type);
1208            let position_id = fill.position_id.unwrap();
1209            self.cache.borrow().position(&position_id).cloned()
1210        };
1211
1212        // Handle contingent orders for both spread and non-spread instruments
1213        // For spread instruments, contingent orders work without position linkage
1214        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1215            // For non-spread instruments, link to position if available
1216            if !instrument.is_spread()
1217                && let Some(ref pos) = position
1218                && pos.is_open()
1219            {
1220                let position_id = pos.id;
1221                for client_order_id in order.linked_order_ids().unwrap_or_default() {
1222                    let mut cache = self.cache.borrow_mut();
1223                    let contingent_order = cache.mut_order(client_order_id);
1224                    if let Some(contingent_order) = contingent_order
1225                        && contingent_order.position_id().is_none()
1226                    {
1227                        contingent_order.set_position_id(Some(position_id));
1228
1229                        if let Err(e) = self.cache.borrow_mut().add_position_id(
1230                            &position_id,
1231                            &contingent_order.instrument_id().venue,
1232                            &contingent_order.client_order_id(),
1233                            &contingent_order.strategy_id(),
1234                        ) {
1235                            log::error!("Failed to add position ID: {e}");
1236                        }
1237                    }
1238                }
1239            }
1240            // For spread instruments, contingent orders can still be triggered
1241            // but without position linkage (since no position is created for spreads)
1242        }
1243    }
1244
1245    /// Handle position creation or update for a fill.
1246    ///
1247    /// This function mirrors the Python `_handle_position_update` method.
1248    fn handle_position_update(
1249        &mut self,
1250        instrument: InstrumentAny,
1251        fill: OrderFilled,
1252        oms_type: OmsType,
1253    ) {
1254        let position_id = if let Some(position_id) = fill.position_id {
1255            position_id
1256        } else {
1257            log::error!("Cannot handle position update: no position ID found for fill {fill}");
1258            return;
1259        };
1260
1261        let position_opt = self.cache.borrow().position(&position_id).cloned();
1262
1263        match position_opt {
1264            None => {
1265                // Position is None - open new position
1266                if self.open_position(instrument, None, fill, oms_type).is_ok() {
1267                    // Position opened successfully
1268                }
1269            }
1270            Some(pos) if pos.is_closed() => {
1271                // Position is closed - open new position
1272                if self
1273                    .open_position(instrument, Some(&pos), fill, oms_type)
1274                    .is_ok()
1275                {
1276                    // Position opened successfully
1277                }
1278            }
1279            Some(mut pos) => {
1280                if self.will_flip_position(&pos, fill) {
1281                    // Position will flip
1282                    self.flip_position(instrument, &mut pos, fill, oms_type);
1283                } else {
1284                    // Update existing position
1285                    self.update_position(&mut pos, fill);
1286                }
1287            }
1288        }
1289    }
1290
1291    fn open_position(
1292        &self,
1293        instrument: InstrumentAny,
1294        position: Option<&Position>,
1295        fill: OrderFilled,
1296        oms_type: OmsType,
1297    ) -> anyhow::Result<()> {
1298        if let Some(position) = position {
1299            if Self::is_duplicate_closed_fill(position, &fill) {
1300                log::warn!(
1301                    "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1302                    fill.trade_id,
1303                    position.id,
1304                    fill.order_side,
1305                    fill.last_qty,
1306                    fill.last_px
1307                );
1308                return Ok(());
1309            }
1310            self.reopen_position(position, oms_type)?;
1311        }
1312
1313        let position = Position::new(&instrument, fill);
1314        self.cache
1315            .borrow_mut()
1316            .add_position(position.clone(), oms_type)?; // TODO: Remove clone (change method)
1317
1318        if self.config.snapshot_positions {
1319            self.create_position_state_snapshot(&position);
1320        }
1321
1322        let ts_init = self.clock.borrow().timestamp_ns();
1323        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1324        let topic = switchboard::get_event_positions_topic(event.strategy_id);
1325        msgbus::publish(topic, &PositionEvent::PositionOpened(event));
1326
1327        Ok(())
1328    }
1329
1330    fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1331        position.events.iter().any(|event| {
1332            event.trade_id == fill.trade_id
1333                && event.order_side == fill.order_side
1334                && event.last_px == fill.last_px
1335                && event.last_qty == fill.last_qty
1336        })
1337    }
1338
1339    fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1340        if oms_type == OmsType::Netting {
1341            if position.is_open() {
1342                anyhow::bail!(
1343                    "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1344                    position.id
1345                );
1346            }
1347            // Snapshot closed position if reopening (NETTING mode)
1348            self.cache.borrow_mut().snapshot_position(position)?;
1349        } else {
1350            // HEDGING mode
1351            log::warn!(
1352                "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1353                position.id
1354            );
1355        }
1356        Ok(())
1357    }
1358
1359    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1360        // Apply the fill to the position
1361        position.apply(&fill);
1362
1363        // Check if position is closed after applying the fill
1364        let is_closed = position.is_closed();
1365
1366        // Update position in cache - this should handle the closed state tracking
1367        if let Err(e) = self.cache.borrow_mut().update_position(position) {
1368            log::error!("Failed to update position: {e:?}");
1369            return;
1370        }
1371
1372        // Verify cache state after update
1373        let cache = self.cache.borrow();
1374
1375        drop(cache);
1376
1377        // Create position state snapshot if enabled
1378        if self.config.snapshot_positions {
1379            self.create_position_state_snapshot(position);
1380        }
1381
1382        // Create and publish appropriate position event
1383        let topic = switchboard::get_event_positions_topic(position.strategy_id);
1384        let ts_init = self.clock.borrow().timestamp_ns();
1385
1386        if is_closed {
1387            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1388            msgbus::publish(topic, &PositionEvent::PositionClosed(event));
1389        } else {
1390            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1391            msgbus::publish(topic, &PositionEvent::PositionChanged(event));
1392        }
1393    }
1394
1395    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1396        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1397    }
1398
1399    fn flip_position(
1400        &mut self,
1401        instrument: InstrumentAny,
1402        position: &mut Position,
1403        fill: OrderFilled,
1404        oms_type: OmsType,
1405    ) {
1406        let difference = match position.side {
1407            PositionSide::Long => Quantity::from_raw(
1408                fill.last_qty.raw - position.quantity.raw,
1409                position.size_precision,
1410            ),
1411            PositionSide::Short => Quantity::from_raw(
1412                position.quantity.raw.abs_diff(fill.last_qty.raw), // Equivalent to Python's abs(position.quantity - fill.last_qty)
1413                position.size_precision,
1414            ),
1415            _ => fill.last_qty,
1416        };
1417
1418        // Split commission between two positions
1419        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1420        let (commission1, commission2) = if let Some(commission) = fill.commission {
1421            let commission_currency = commission.currency;
1422            let commission1 = Money::new(commission * fill_percent, commission_currency);
1423            let commission2 = commission - commission1;
1424            (Some(commission1), Some(commission2))
1425        } else {
1426            log::error!("Commission is not available.");
1427            (None, None)
1428        };
1429
1430        let mut fill_split1: Option<OrderFilled> = None;
1431        if position.is_open() {
1432            fill_split1 = Some(OrderFilled::new(
1433                fill.trader_id,
1434                fill.strategy_id,
1435                fill.instrument_id,
1436                fill.client_order_id,
1437                fill.venue_order_id,
1438                fill.account_id,
1439                fill.trade_id,
1440                fill.order_side,
1441                fill.order_type,
1442                position.quantity,
1443                fill.last_px,
1444                fill.currency,
1445                fill.liquidity_side,
1446                UUID4::new(),
1447                fill.ts_event,
1448                fill.ts_init,
1449                fill.reconciliation,
1450                fill.position_id,
1451                commission1,
1452            ));
1453
1454            self.update_position(position, fill_split1.unwrap());
1455
1456            // Snapshot closed position before reusing ID (NETTING mode)
1457            if oms_type == OmsType::Netting
1458                && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1459            {
1460                log::error!("Failed to snapshot position during flip: {e:?}");
1461            }
1462        }
1463
1464        // Guard against flipping a position with a zero fill size
1465        if difference.raw == 0 {
1466            log::warn!(
1467                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1468            );
1469            return;
1470        }
1471
1472        let position_id_flip = if oms_type == OmsType::Hedging
1473            && let Some(position_id) = fill.position_id
1474            && position_id.is_virtual()
1475        {
1476            // Generate new position ID for flipped virtual position (Hedging OMS only)
1477            Some(self.pos_id_generator.generate(fill.strategy_id, true))
1478        } else {
1479            // Default: use the same position ID as the fill (Python behavior)
1480            fill.position_id
1481        };
1482
1483        let fill_split2 = OrderFilled::new(
1484            fill.trader_id,
1485            fill.strategy_id,
1486            fill.instrument_id,
1487            fill.client_order_id,
1488            fill.venue_order_id,
1489            fill.account_id,
1490            fill.trade_id,
1491            fill.order_side,
1492            fill.order_type,
1493            difference,
1494            fill.last_px,
1495            fill.currency,
1496            fill.liquidity_side,
1497            UUID4::new(),
1498            fill.ts_event,
1499            fill.ts_init,
1500            fill.reconciliation,
1501            position_id_flip,
1502            commission2,
1503        );
1504
1505        if oms_type == OmsType::Hedging
1506            && let Some(position_id) = fill.position_id
1507            && position_id.is_virtual()
1508        {
1509            log::warn!("Closing position {fill_split1:?}");
1510            log::warn!("Flipping position {fill_split2:?}");
1511        }
1512
1513        // Open flipped position
1514        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1515            log::error!("Failed to open flipped position: {e:?}");
1516        }
1517    }
1518
1519    /// Sets the internal position ID generator counts based on existing cached positions.
1520    pub fn set_position_id_counts(&mut self) {
1521        let cache = self.cache.borrow();
1522        let positions = cache.positions(None, None, None, None);
1523
1524        // Count positions per instrument_id using a HashMap
1525        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1526
1527        for position in positions {
1528            *counts.entry(position.strategy_id).or_insert(0) += 1;
1529        }
1530
1531        self.pos_id_generator.reset();
1532
1533        for (strategy_id, count) in counts {
1534            self.pos_id_generator.set_count(count, strategy_id);
1535            log::info!("Set PositionId count for {strategy_id} to {count}");
1536        }
1537    }
1538
1539    fn last_px_for_conversion(
1540        &self,
1541        instrument_id: &InstrumentId,
1542        side: OrderSide,
1543    ) -> Option<Price> {
1544        let cache = self.cache.borrow();
1545
1546        // Try to get last trade price
1547        if let Some(trade) = cache.trade(instrument_id) {
1548            return Some(trade.price);
1549        }
1550
1551        // Fall back to quote if available
1552        if let Some(quote) = cache.quote(instrument_id) {
1553            match side {
1554                OrderSide::Buy => Some(quote.ask_price),
1555                OrderSide::Sell => Some(quote.bid_price),
1556                OrderSide::NoOrderSide => None,
1557            }
1558        } else {
1559            None
1560        }
1561    }
1562
1563    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1564        log::info!(
1565            "Setting {} order quote quantity {} to base quantity {}",
1566            order.instrument_id(),
1567            order.quantity(),
1568            base_qty
1569        );
1570
1571        let original_qty = order.quantity();
1572        order.set_quantity(base_qty);
1573        order.set_leaves_qty(base_qty);
1574        order.set_is_quote_quantity(false);
1575
1576        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1577            return;
1578        }
1579
1580        if let Some(linked_order_ids) = order.linked_order_ids() {
1581            for client_order_id in linked_order_ids {
1582                match self.cache.borrow_mut().mut_order(client_order_id) {
1583                    Some(contingent_order) => {
1584                        if !contingent_order.is_quote_quantity() {
1585                            continue; // Already base quantity
1586                        }
1587
1588                        if contingent_order.quantity() != original_qty {
1589                            log::warn!(
1590                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1591                                contingent_order.quantity(),
1592                                original_qty,
1593                                base_qty
1594                            );
1595                        }
1596
1597                        log::info!(
1598                            "Setting {} order quote quantity {} to base quantity {}",
1599                            contingent_order.instrument_id(),
1600                            contingent_order.quantity(),
1601                            base_qty
1602                        );
1603
1604                        contingent_order.set_quantity(base_qty);
1605                        contingent_order.set_leaves_qty(base_qty);
1606                        contingent_order.set_is_quote_quantity(false);
1607                    }
1608                    None => {
1609                        log::error!("Contingency order {client_order_id} not found");
1610                    }
1611                }
1612            }
1613        } else {
1614            log::warn!(
1615                "No linked order IDs found for order {}",
1616                order.client_order_id()
1617            );
1618        }
1619    }
1620
1621    fn deny_order(&self, order: &OrderAny, reason: &str) {
1622        log::error!(
1623            "Order denied: {reason}, order ID: {}",
1624            order.client_order_id()
1625        );
1626
1627        let denied = OrderDenied::new(
1628            order.trader_id(),
1629            order.strategy_id(),
1630            order.instrument_id(),
1631            order.client_order_id(),
1632            reason.into(),
1633            UUID4::new(),
1634            self.clock.borrow().timestamp_ns(),
1635            self.clock.borrow().timestamp_ns(),
1636        );
1637
1638        let mut order = order.clone();
1639
1640        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1641            log::error!("Failed to apply denied event to order: {e}");
1642            return;
1643        }
1644
1645        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1646            log::error!("Failed to update order in cache: {e}");
1647            return;
1648        }
1649
1650        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1651        msgbus::publish(topic, &OrderEventAny::Denied(denied));
1652
1653        if self.config.snapshot_orders {
1654            self.create_order_state_snapshot(&order);
1655        }
1656    }
1657
1658    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1659        let mut cache = self.cache.borrow_mut();
1660        if cache.own_order_book_mut(instrument_id).is_none() {
1661            let own_book = OwnOrderBook::new(*instrument_id);
1662            cache.add_own_order_book(own_book).unwrap();
1663        }
1664
1665        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1666    }
1667}