Skip to main content

nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24pub mod stubs;
25
26use std::{
27    cell::{RefCell, RefMut},
28    collections::{HashMap, HashSet},
29    fmt::Debug,
30    rc::Rc,
31    time::SystemTime,
32};
33
34use ahash::{AHashMap, AHashSet};
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use nautilus_common::{
38    cache::Cache,
39    clients::ExecutionClient,
40    clock::Clock,
41    generators::position_id::PositionIdGenerator,
42    logging::{CMD, EVT, RECV, SEND},
43    messages::{
44        ExecutionReport,
45        execution::{
46            BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47            SubmitOrder, SubmitOrderList, TradingCommand,
48        },
49    },
50    msgbus::{
51        self, MessagingSwitchboard, TypedIntoHandler, get_message_bus,
52        switchboard::{self},
53    },
54    runner::try_get_trading_cmd_sender,
55};
56use nautilus_core::{UUID4, UnixNanos, WeakCell};
57use nautilus_model::{
58    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
59    events::{
60        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
61        PositionEvent, PositionOpened,
62    },
63    identifiers::{
64        ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue, VenueOrderId,
65    },
66    instruments::{Instrument, InstrumentAny},
67    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
68    orders::{Order, OrderAny, OrderError},
69    position::Position,
70    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
71    types::{Money, Price, Quantity},
72};
73use rust_decimal::Decimal;
74
75use crate::{
76    client::ExecutionClientAdapter,
77    reconciliation::{
78        check_position_reconciliation, reconcile_fill_report as reconcile_fill,
79        reconcile_order_report,
80    },
81};
82
83/// Central execution engine responsible for orchestrating order routing and execution.
84///
85/// The execution engine manages the entire order lifecycle from submission to completion,
86/// handling routing to appropriate execution clients, position management, and event
87/// processing. It supports multiple execution venues through registered clients and
88/// provides sophisticated order management capabilities.
89pub struct ExecutionEngine {
90    clock: Rc<RefCell<dyn Clock>>,
91    cache: Rc<RefCell<Cache>>,
92    clients: AHashMap<ClientId, ExecutionClientAdapter>,
93    default_client: Option<ExecutionClientAdapter>,
94    routing_map: HashMap<Venue, ClientId>,
95    oms_overrides: HashMap<StrategyId, OmsType>,
96    external_order_claims: HashMap<InstrumentId, StrategyId>,
97    external_clients: HashSet<ClientId>,
98    pos_id_generator: PositionIdGenerator,
99    config: ExecutionEngineConfig,
100}
101
102impl Debug for ExecutionEngine {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct(stringify!(ExecutionEngine))
105            .field("client_count", &self.clients.len())
106            .finish()
107    }
108}
109
110impl ExecutionEngine {
111    /// Creates a new [`ExecutionEngine`] instance.
112    pub fn new(
113        clock: Rc<RefCell<dyn Clock>>,
114        cache: Rc<RefCell<Cache>>,
115        config: Option<ExecutionEngineConfig>,
116    ) -> Self {
117        let trader_id = get_message_bus().borrow().trader_id;
118        Self {
119            clock: clock.clone(),
120            cache,
121            clients: AHashMap::new(),
122            default_client: None,
123            routing_map: HashMap::new(),
124            oms_overrides: HashMap::new(),
125            external_order_claims: HashMap::new(),
126            external_clients: config
127                .as_ref()
128                .and_then(|c| c.external_clients.clone())
129                .unwrap_or_default()
130                .into_iter()
131                .collect(),
132            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
133            config: config.unwrap_or_default(),
134        }
135    }
136
137    /// Registers all message bus handlers for the execution engine.
138    pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
139        let weak = WeakCell::from(Rc::downgrade(&engine));
140
141        let weak1 = weak.clone();
142        msgbus::register_trading_command_endpoint(
143            MessagingSwitchboard::exec_engine_execute(),
144            TypedIntoHandler::from(move |cmd: TradingCommand| {
145                if let Some(rc) = weak1.upgrade() {
146                    rc.borrow().execute(cmd);
147                }
148            }),
149        );
150
151        // Queued endpoint for deferred command execution (re-entrancy safe),
152        // falls back to direct endpoint if no sender is initialized (e.g., backtest/test).
153        msgbus::register_trading_command_endpoint(
154            MessagingSwitchboard::exec_engine_queue_execute(),
155            TypedIntoHandler::from(move |cmd: TradingCommand| {
156                if let Some(sender) = try_get_trading_cmd_sender() {
157                    sender.execute(cmd);
158                } else {
159                    let endpoint = MessagingSwitchboard::exec_engine_execute();
160                    msgbus::send_trading_command(endpoint, cmd);
161                }
162            }),
163        );
164
165        let weak2 = weak.clone();
166        msgbus::register_order_event_endpoint(
167            MessagingSwitchboard::exec_engine_process(),
168            TypedIntoHandler::from(move |event: OrderEventAny| {
169                if let Some(rc) = weak2.upgrade() {
170                    rc.borrow_mut().process(event);
171                }
172            }),
173        );
174
175        let weak3 = weak;
176        msgbus::register_execution_report_endpoint(
177            MessagingSwitchboard::exec_engine_reconcile_execution_report(),
178            TypedIntoHandler::from(move |report: ExecutionReport| {
179                if let Some(rc) = weak3.upgrade() {
180                    rc.borrow_mut().reconcile_execution_report(report);
181                }
182            }),
183        );
184    }
185
186    #[must_use]
187    /// Returns the position ID count for the specified strategy.
188    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
189        self.pos_id_generator.count(strategy_id)
190    }
191
192    #[must_use]
193    /// Returns a reference to the cache.
194    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
195        &self.cache
196    }
197
198    #[must_use]
199    /// Returns a reference to the configuration.
200    pub const fn config(&self) -> &ExecutionEngineConfig {
201        &self.config
202    }
203
204    #[must_use]
205    /// Checks the integrity of cached execution data.
206    pub fn check_integrity(&self) -> bool {
207        self.cache.borrow_mut().check_integrity()
208    }
209
210    #[must_use]
211    /// Returns true if all registered execution clients are connected.
212    pub fn check_connected(&self) -> bool {
213        let clients_connected = self.clients.values().all(|c| c.is_connected());
214        let default_connected = self
215            .default_client
216            .as_ref()
217            .is_none_or(|c| c.is_connected());
218        clients_connected && default_connected
219    }
220
221    #[must_use]
222    /// Returns true if all registered execution clients are disconnected.
223    pub fn check_disconnected(&self) -> bool {
224        let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
225        let default_disconnected = self
226            .default_client
227            .as_ref()
228            .is_none_or(|c| !c.is_connected());
229        clients_disconnected && default_disconnected
230    }
231
232    /// Returns connection status for each registered client.
233    #[must_use]
234    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
235        let mut status: Vec<_> = self
236            .clients
237            .values()
238            .map(|c| (c.client_id(), c.is_connected()))
239            .collect();
240
241        if let Some(default) = &self.default_client {
242            status.push((default.client_id(), default.is_connected()));
243        }
244
245        status
246    }
247
248    #[must_use]
249    /// Checks for residual positions and orders in the cache.
250    pub fn check_residuals(&self) -> bool {
251        self.cache.borrow().check_residuals()
252    }
253
254    #[must_use]
255    /// Returns the set of instruments that have external order claims.
256    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
257        self.external_order_claims.keys().copied().collect()
258    }
259
260    #[must_use]
261    /// Returns the configured external client IDs.
262    pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
263        self.external_clients.clone()
264    }
265
266    #[must_use]
267    /// Returns any external order claim for the given instrument ID.
268    pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
269        self.external_order_claims.get(instrument_id).copied()
270    }
271
272    /// Registers a new execution client.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if a client with the same ID is already registered.
277    pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
278        let client_id = client.client_id();
279        let venue = client.venue();
280
281        if self.clients.contains_key(&client_id) {
282            anyhow::bail!("Client already registered with ID {client_id}");
283        }
284
285        let adapter = ExecutionClientAdapter::new(client);
286
287        self.routing_map.insert(venue, client_id);
288
289        log::debug!("Registered client {client_id}");
290        self.clients.insert(client_id, adapter);
291        Ok(())
292    }
293
294    /// Registers a default execution client for fallback routing.
295    pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
296        let client_id = client.client_id();
297        let adapter = ExecutionClientAdapter::new(client);
298
299        log::debug!("Registered default client {client_id}");
300        self.default_client = Some(adapter);
301    }
302
303    #[must_use]
304    /// Returns a reference to the execution client registered with the given ID.
305    pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
306        self.clients.get(client_id).map(|a| a.client.as_ref())
307    }
308
309    #[must_use]
310    /// Returns a mutable reference to the execution client adapter registered with the given ID.
311    pub fn get_client_adapter_mut(
312        &mut self,
313        client_id: &ClientId,
314    ) -> Option<&mut ExecutionClientAdapter> {
315        if let Some(default) = &self.default_client
316            && &default.client_id == client_id
317        {
318            return self.default_client.as_mut();
319        }
320        self.clients.get_mut(client_id)
321    }
322
323    /// Generates mass status for the given client.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the client is not found or mass status generation fails.
328    pub async fn generate_mass_status(
329        &mut self,
330        client_id: &ClientId,
331        lookback_mins: Option<u64>,
332    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
333        if let Some(client) = self.get_client_adapter_mut(client_id) {
334            client.generate_mass_status(lookback_mins).await
335        } else {
336            anyhow::bail!("Client {client_id} not found")
337        }
338    }
339
340    /// Registers an external order with the execution client for tracking.
341    ///
342    /// This is called after reconciliation creates an external order, allowing the
343    /// execution client to track it for subsequent events (e.g., cancellations).
344    pub fn register_external_order(
345        &self,
346        client_order_id: ClientOrderId,
347        venue_order_id: VenueOrderId,
348        instrument_id: InstrumentId,
349        strategy_id: StrategyId,
350        ts_init: UnixNanos,
351    ) {
352        let venue = instrument_id.venue;
353        if let Some(client_id) = self.routing_map.get(&venue) {
354            if let Some(client) = self.clients.get(client_id) {
355                client.register_external_order(
356                    client_order_id,
357                    venue_order_id,
358                    instrument_id,
359                    strategy_id,
360                    ts_init,
361                );
362            }
363        } else if let Some(default) = &self.default_client {
364            default.register_external_order(
365                client_order_id,
366                venue_order_id,
367                instrument_id,
368                strategy_id,
369                ts_init,
370            );
371        }
372    }
373
374    #[must_use]
375    /// Returns all registered execution client IDs.
376    pub fn client_ids(&self) -> Vec<ClientId> {
377        let mut ids: Vec<_> = self.clients.keys().copied().collect();
378
379        if let Some(default) = &self.default_client {
380            ids.push(default.client_id);
381        }
382        ids
383    }
384
385    #[must_use]
386    /// Returns mutable access to all registered execution clients.
387    pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
388        let mut adapters: Vec<_> = self.clients.values_mut().collect();
389        if let Some(default) = &mut self.default_client {
390            adapters.push(default);
391        }
392        adapters
393    }
394
395    #[must_use]
396    /// Returns execution clients that would handle the given orders.
397    ///
398    /// This method first attempts to resolve each order's originating client from the cache,
399    /// then falls back to venue routing for any orders without a cached client.
400    pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
401        let mut client_ids: AHashSet<ClientId> = AHashSet::new();
402        let mut venues: AHashSet<Venue> = AHashSet::new();
403
404        // Collect client IDs from cache and venues for fallback
405        for order in orders {
406            venues.insert(order.instrument_id().venue);
407            if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
408                client_ids.insert(*client_id);
409            }
410        }
411
412        let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
413
414        // Add clients for cached client IDs (orders go back to originating client)
415        for client_id in &client_ids {
416            if let Some(adapter) = self.clients.get(client_id)
417                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
418            {
419                clients.push(adapter.client.as_ref());
420            }
421        }
422
423        // Add clients for venue routing (for orders not in cache)
424        for venue in &venues {
425            if let Some(client_id) = self.routing_map.get(venue) {
426                if let Some(adapter) = self.clients.get(client_id)
427                    && !clients.iter().any(|c| c.client_id() == adapter.client_id)
428                {
429                    clients.push(adapter.client.as_ref());
430                }
431            } else if let Some(adapter) = &self.default_client
432                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
433            {
434                clients.push(adapter.client.as_ref());
435            }
436        }
437
438        clients
439    }
440
441    /// Sets routing for a specific venue to a given client ID.
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if the client ID is not registered.
446    pub fn register_venue_routing(
447        &mut self,
448        client_id: ClientId,
449        venue: Venue,
450    ) -> anyhow::Result<()> {
451        if !self.clients.contains_key(&client_id) {
452            anyhow::bail!("No client registered with ID {client_id}");
453        }
454
455        self.routing_map.insert(venue, client_id);
456        log::info!("Set client {client_id} routing for {venue}");
457        Ok(())
458    }
459
460    /// Registers the OMS (Order Management System) type for a strategy.
461    ///
462    /// If an OMS type is already registered for this strategy, it will be overridden.
463    pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
464        self.oms_overrides.insert(strategy_id, oms_type);
465        log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
466    }
467
468    /// Registers external order claims for a strategy.
469    ///
470    /// This operation is atomic: either all instruments are registered or none are.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if any instrument already has a registered claim.
475    pub fn register_external_order_claims(
476        &mut self,
477        strategy_id: StrategyId,
478        instrument_ids: HashSet<InstrumentId>,
479    ) -> anyhow::Result<()> {
480        // Validate all instruments first
481        for instrument_id in &instrument_ids {
482            if let Some(existing) = self.external_order_claims.get(instrument_id) {
483                anyhow::bail!(
484                    "External order claim for {instrument_id} already exists for {existing}"
485                );
486            }
487        }
488
489        // If validation passed, insert all claims
490        for instrument_id in &instrument_ids {
491            self.external_order_claims
492                .insert(*instrument_id, strategy_id);
493        }
494
495        if !instrument_ids.is_empty() {
496            log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
497        }
498
499        Ok(())
500    }
501
502    /// # Errors
503    ///
504    /// Returns an error if no client is registered with the given ID.
505    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
506        if self.clients.remove(&client_id).is_some() {
507            // Remove from routing map if present
508            self.routing_map
509                .retain(|_, mapped_id| mapped_id != &client_id);
510            log::info!("Deregistered client {client_id}");
511            Ok(())
512        } else {
513            anyhow::bail!("No client registered with ID {client_id}")
514        }
515    }
516
517    /// Connects all registered execution clients concurrently.
518    ///
519    /// Connection failures are logged but do not prevent the node from running.
520    pub async fn connect(&mut self) {
521        let futures: Vec<_> = self
522            .get_clients_mut()
523            .into_iter()
524            .map(|client| client.connect())
525            .collect();
526
527        let results = join_all(futures).await;
528
529        for error in results.into_iter().filter_map(Result::err) {
530            log::error!("Failed to connect execution client: {error}");
531        }
532    }
533
534    /// Disconnects all registered execution clients concurrently.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if any client fails to disconnect.
539    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
540        let futures: Vec<_> = self
541            .get_clients_mut()
542            .into_iter()
543            .map(|client| client.disconnect())
544            .collect();
545
546        let results = join_all(futures).await;
547        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
548
549        if errors.is_empty() {
550            Ok(())
551        } else {
552            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
553            anyhow::bail!(
554                "Failed to disconnect execution clients: {}",
555                error_msgs.join("; ")
556            )
557        }
558    }
559
560    /// Sets the `manage_own_order_books` configuration option.
561    pub fn set_manage_own_order_books(&mut self, value: bool) {
562        self.config.manage_own_order_books = value;
563    }
564
565    /// Sets the `convert_quote_qty_to_base` configuration option.
566    pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
567        self.config.convert_quote_qty_to_base = value;
568    }
569
570    /// Starts the position snapshot timer if configured.
571    ///
572    /// Timer functionality requires a live execution context with an active clock.
573    pub fn start_snapshot_timer(&mut self) {
574        if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
575            log::info!("Starting position snapshots timer at {interval_secs} second intervals");
576        }
577    }
578
579    /// Stops the position snapshot timer if running.
580    pub fn stop_snapshot_timer(&mut self) {
581        if self.config.snapshot_positions_interval_secs.is_some() {
582            log::info!("Canceling position snapshots timer");
583        }
584    }
585
586    /// Creates snapshots of all open positions.
587    pub fn snapshot_open_position_states(&self) {
588        let positions: Vec<Position> = self
589            .cache
590            .borrow()
591            .positions_open(None, None, None, None, None)
592            .into_iter()
593            .cloned()
594            .collect();
595
596        for position in positions {
597            self.create_position_state_snapshot(&position);
598        }
599    }
600
601    #[allow(clippy::await_holding_refcell_ref)]
602    /// Loads persistent state into cache and rebuilds indices.
603    ///
604    /// # Errors
605    ///
606    /// Returns an error if any cache operation fails.
607    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
608        let ts = SystemTime::now();
609
610        {
611            let mut cache = self.cache.borrow_mut();
612            cache.clear_index();
613            cache.cache_general()?;
614            self.cache.borrow_mut().cache_all().await?;
615            cache.build_index();
616            let _ = cache.check_integrity();
617
618            if self.config.manage_own_order_books {
619                for order in cache.orders(None, None, None, None, None) {
620                    if order.is_closed() || !should_handle_own_book_order(order) {
621                        continue;
622                    }
623                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
624                    own_book.add(order.to_own_book_order());
625                }
626            }
627        }
628
629        self.set_position_id_counts();
630
631        log::info!(
632            "Loaded cache in {}ms",
633            SystemTime::now()
634                .duration_since(ts)
635                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
636                .as_millis()
637        );
638
639        Ok(())
640    }
641
642    /// Flushes the database to persist all cached data.
643    pub fn flush_db(&self) {
644        self.cache.borrow_mut().flush_db();
645    }
646
647    /// Reconciles an execution report.
648    pub fn reconcile_execution_report(&mut self, report: ExecutionReport) {
649        match &report {
650            ExecutionReport::Order(order_report) => {
651                self.reconcile_order_status_report(order_report);
652            }
653            ExecutionReport::Fill(fill_report) => {
654                self.reconcile_fill_report(fill_report);
655            }
656            ExecutionReport::Position(position_report) => {
657                self.reconcile_position_report(position_report);
658            }
659            ExecutionReport::MassStatus(mass_status) => {
660                self.reconcile_execution_mass_status(mass_status);
661            }
662        }
663    }
664
665    /// Reconciles an order status report received at runtime.
666    ///
667    /// Handles order status transitions by generating appropriate events when the venue
668    /// reports a different status than our local state. Supports all order states including
669    /// fills with inferred fill generation when instruments are available.
670    pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
671        let cache = self.cache.borrow();
672
673        let order = report
674            .client_order_id
675            .and_then(|id| cache.order(&id).cloned())
676            .or_else(|| {
677                cache
678                    .client_order_id(&report.venue_order_id)
679                    .and_then(|cid| cache.order(cid).cloned())
680            });
681
682        let Some(order) = order else {
683            log::debug!(
684                "Order not found in cache for reconciliation: client_order_id={:?}, venue_order_id={}",
685                report.client_order_id,
686                report.venue_order_id
687            );
688            return;
689        };
690
691        let instrument = cache.instrument(&report.instrument_id).cloned();
692
693        drop(cache);
694
695        let ts_now = self.clock.borrow().timestamp_ns();
696
697        if let Some(event) = reconcile_order_report(&order, report, instrument.as_ref(), ts_now) {
698            self.handle_event(&event);
699        }
700    }
701
702    /// Reconciles a fill report received at runtime.
703    ///
704    /// Finds the associated order, validates the fill, and generates an OrderFilled event
705    /// if the fill is not a duplicate and won't cause an overfill.
706    pub fn reconcile_fill_report(&mut self, report: &FillReport) {
707        let cache = self.cache.borrow();
708
709        let order = report
710            .client_order_id
711            .and_then(|id| cache.order(&id).cloned())
712            .or_else(|| {
713                cache
714                    .client_order_id(&report.venue_order_id)
715                    .and_then(|cid| cache.order(cid).cloned())
716            });
717
718        let Some(order) = order else {
719            log::warn!(
720                "Cannot reconcile fill report: order not found for venue_order_id={}, client_order_id={:?}",
721                report.venue_order_id,
722                report.client_order_id
723            );
724            return;
725        };
726
727        let instrument = cache.instrument(&report.instrument_id).cloned();
728
729        drop(cache);
730
731        let Some(instrument) = instrument else {
732            log::debug!(
733                "Cannot reconcile fill report for {}: instrument {} not found",
734                order.client_order_id(),
735                report.instrument_id
736            );
737            return;
738        };
739
740        let ts_now = self.clock.borrow().timestamp_ns();
741
742        if let Some(event) = reconcile_fill(
743            &order,
744            report,
745            &instrument,
746            ts_now,
747            self.config.allow_overfills,
748        ) {
749            self.handle_event(&event);
750        }
751    }
752
753    /// Reconciles a position status report received at runtime.
754    ///
755    /// Compares the venue-reported position with cached positions and logs any discrepancies.
756    /// Handles both hedging (with venue_position_id) and netting (without) modes.
757    pub fn reconcile_position_report(&mut self, report: &PositionStatusReport) {
758        let cache = self.cache.borrow();
759
760        let size_precision = cache
761            .instrument(&report.instrument_id)
762            .map(|i| i.size_precision());
763
764        if report.venue_position_id.is_some() {
765            self.reconcile_position_report_hedging(report, &cache);
766        } else {
767            self.reconcile_position_report_netting(report, &cache, size_precision);
768        }
769    }
770
771    fn reconcile_position_report_hedging(&self, report: &PositionStatusReport, cache: &Cache) {
772        let venue_position_id = report.venue_position_id.as_ref().unwrap();
773
774        log::info!(
775            "Reconciling HEDGE position for {}, venue_position_id={}",
776            report.instrument_id,
777            venue_position_id
778        );
779
780        let Some(position) = cache.position(venue_position_id) else {
781            log::error!("Cannot reconcile position: {venue_position_id} not found in cache");
782            return;
783        };
784
785        let cached_signed_qty = match position.side {
786            PositionSide::Long => position.quantity.as_decimal(),
787            PositionSide::Short => -position.quantity.as_decimal(),
788            _ => Decimal::ZERO,
789        };
790        let venue_signed_qty = report.signed_decimal_qty;
791
792        if cached_signed_qty != venue_signed_qty {
793            log::error!(
794                "Position mismatch for {} {}: cached={}, venue={}",
795                report.instrument_id,
796                venue_position_id,
797                cached_signed_qty,
798                venue_signed_qty
799            );
800        }
801    }
802
803    fn reconcile_position_report_netting(
804        &self,
805        report: &PositionStatusReport,
806        cache: &Cache,
807        size_precision: Option<u8>,
808    ) {
809        log::info!("Reconciling NET position for {}", report.instrument_id);
810
811        let positions_open =
812            cache.positions_open(None, Some(&report.instrument_id), None, None, None);
813
814        // Sum up cached position quantities using domain types to avoid f64 precision loss
815        let cached_signed_qty: Decimal = positions_open
816            .iter()
817            .map(|p| match p.side {
818                PositionSide::Long => p.quantity.as_decimal(),
819                PositionSide::Short => -p.quantity.as_decimal(),
820                _ => Decimal::ZERO,
821            })
822            .sum();
823
824        log::info!(
825            "Position report: venue_signed_qty={}, cached_signed_qty={}",
826            report.signed_decimal_qty,
827            cached_signed_qty
828        );
829
830        let _ = check_position_reconciliation(report, cached_signed_qty, size_precision);
831    }
832
833    /// Reconciles an execution mass status report.
834    ///
835    /// Processes all order reports, fill reports, and position reports contained
836    /// in the mass status.
837    pub fn reconcile_execution_mass_status(&mut self, mass_status: &ExecutionMassStatus) {
838        log::info!(
839            "Reconciling mass status for client={}, account={}, venue={}",
840            mass_status.client_id,
841            mass_status.account_id,
842            mass_status.venue
843        );
844
845        for order_report in mass_status.order_reports().values() {
846            self.reconcile_order_status_report(order_report);
847        }
848
849        for fill_reports in mass_status.fill_reports().values() {
850            for fill_report in fill_reports {
851                self.reconcile_fill_report(fill_report);
852            }
853        }
854
855        for position_reports in mass_status.position_reports().values() {
856            for position_report in position_reports {
857                self.reconcile_position_report(position_report);
858            }
859        }
860
861        log::info!(
862            "Mass status reconciliation complete: {} orders, {} fills, {} positions",
863            mass_status.order_reports().len(),
864            mass_status
865                .fill_reports()
866                .values()
867                .map(|v| v.len())
868                .sum::<usize>(),
869            mass_status
870                .position_reports()
871                .values()
872                .map(|v| v.len())
873                .sum::<usize>()
874        );
875    }
876
877    /// Executes a trading command by routing it to the appropriate execution client.
878    pub fn execute(&self, command: TradingCommand) {
879        self.execute_command(&command);
880    }
881
882    /// Processes an order event, updating internal state and routing as needed.
883    pub fn process(&mut self, event: OrderEventAny) {
884        self.handle_event(&event);
885    }
886
887    /// Starts the execution engine.
888    pub fn start(&mut self) {
889        self.start_snapshot_timer();
890
891        log::info!("Started");
892    }
893
894    /// Stops the execution engine.
895    pub fn stop(&mut self) {
896        self.stop_snapshot_timer();
897
898        log::info!("Stopped");
899    }
900
901    /// Resets the execution engine to its initial state.
902    pub fn reset(&mut self) {
903        self.pos_id_generator.reset();
904
905        log::info!("Reset");
906    }
907
908    /// Disposes of the execution engine, releasing resources.
909    pub fn dispose(&mut self) {
910        log::info!("Disposed");
911    }
912
913    fn execute_command(&self, command: &TradingCommand) {
914        if self.config.debug {
915            log::debug!("{RECV}{CMD} {command:?}");
916        }
917
918        if let Some(cid) = command.client_id()
919            && self.external_clients.contains(&cid)
920        {
921            if self.config.debug {
922                log::debug!("Skipping execution command for external client {cid}: {command:?}");
923            }
924            return;
925        }
926
927        let client = if let Some(adapter) = command
928            .client_id()
929            .and_then(|cid| self.clients.get(&cid))
930            .or_else(|| {
931                self.routing_map
932                    .get(&command.instrument_id().venue)
933                    .and_then(|client_id| self.clients.get(client_id))
934            })
935            .or(self.default_client.as_ref())
936        {
937            adapter.client.as_ref()
938        } else {
939            log::error!(
940                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
941                command.client_id(),
942                command.instrument_id().venue,
943            );
944            return;
945        };
946
947        match command {
948            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
949            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
950            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
951            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
952            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
953            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
954            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
955            TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
956        }
957    }
958
959    fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
960        let client_order_id = cmd.client_order_id;
961
962        let mut order = {
963            let cache = self.cache.borrow();
964            match cache.order(&client_order_id) {
965                Some(order) => order.clone(),
966                None => {
967                    log::error!(
968                        "Cannot handle submit order: order not found in cache for {client_order_id}"
969                    );
970                    return;
971                }
972            }
973        };
974
975        let order_venue = order.instrument_id().venue;
976        let client_venue = client.venue();
977        if order_venue != client_venue {
978            self.deny_order(
979                &order,
980                &format!("Order venue {order_venue} does not match client venue {client_venue}"),
981            );
982            return;
983        }
984
985        let instrument_id = order.instrument_id();
986
987        if self.config.snapshot_orders {
988            self.create_order_state_snapshot(&order);
989        }
990
991        let instrument = {
992            let cache = self.cache.borrow();
993            if let Some(instrument) = cache.instrument(&instrument_id) {
994                instrument.clone()
995            } else {
996                log::error!(
997                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
998                );
999                return;
1000            }
1001        };
1002
1003        // Handle quote quantity conversion
1004        if self.config.convert_quote_qty_to_base
1005            && !instrument.is_inverse()
1006            && order.is_quote_quantity()
1007        {
1008            log::warn!(
1009                "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
1010            );
1011            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
1012
1013            if let Some(price) = last_px {
1014                let base_qty = instrument.get_base_quantity(order.quantity(), price);
1015                self.set_order_base_qty(&mut order, base_qty);
1016            } else {
1017                self.deny_order(
1018                    &order,
1019                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
1020                );
1021                return;
1022            }
1023        }
1024
1025        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
1026            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
1027            own_book.add(order.to_own_book_order());
1028        }
1029
1030        if let Err(e) = client.submit_order(cmd) {
1031            self.deny_order(&order, &format!("failed-to-submit-order-to-client: {e}"));
1032        }
1033    }
1034
1035    fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
1036        let orders: Vec<OrderAny> = self
1037            .cache
1038            .borrow()
1039            .orders_for_ids(&cmd.order_list.client_order_ids, cmd);
1040
1041        if orders.len() != cmd.order_list.client_order_ids.len() {
1042            for order in &orders {
1043                self.deny_order(
1044                    order,
1045                    &format!("Incomplete order list: missing orders in cache for {cmd}"),
1046                );
1047            }
1048            return;
1049        }
1050
1051        let order_list_venue = cmd.instrument_id.venue;
1052        let client_venue = client.venue();
1053        if order_list_venue != client_venue {
1054            for order in &orders {
1055                self.deny_order(
1056                    order,
1057                    &format!("Order list venue {order_list_venue} does not match client venue {client_venue}"),
1058                );
1059            }
1060            return;
1061        }
1062
1063        if self.config.snapshot_orders {
1064            for order in &orders {
1065                self.create_order_state_snapshot(order);
1066            }
1067        }
1068
1069        let instrument = {
1070            let cache = self.cache.borrow();
1071            if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
1072                instrument.clone()
1073            } else {
1074                log::error!(
1075                    "Cannot handle submit order list: no instrument found for {}, {cmd}",
1076                    cmd.instrument_id,
1077                );
1078                return;
1079            }
1080        };
1081
1082        // Handle quote quantity conversion
1083        if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
1084            let mut conversions: Vec<(ClientOrderId, Quantity)> = Vec::with_capacity(orders.len());
1085
1086            for order in &orders {
1087                if !order.is_quote_quantity() {
1088                    continue; // Base quantity already set
1089                }
1090
1091                let last_px =
1092                    self.last_px_for_conversion(&order.instrument_id(), order.order_side());
1093
1094                if let Some(px) = last_px {
1095                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
1096                    conversions.push((order.client_order_id(), base_qty));
1097                } else {
1098                    for order in &orders {
1099                        self.deny_order(
1100                            order,
1101                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
1102                        );
1103                    }
1104                    return; // Denied
1105                }
1106            }
1107
1108            if !conversions.is_empty() {
1109                log::warn!(
1110                    "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
1111                );
1112
1113                let mut cache = self.cache.borrow_mut();
1114                for (client_order_id, base_qty) in conversions {
1115                    if let Some(mut_order) = cache.mut_order(&client_order_id) {
1116                        self.set_order_base_qty(mut_order, base_qty);
1117                    }
1118                }
1119            }
1120        }
1121
1122        if self.config.manage_own_order_books {
1123            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
1124            for order in &orders {
1125                if should_handle_own_book_order(order) {
1126                    own_book.add(order.to_own_book_order());
1127                }
1128            }
1129        }
1130
1131        if let Err(e) = client.submit_order_list(cmd) {
1132            log::error!("Error submitting order list to client: {e}");
1133            for order in &orders {
1134                self.deny_order(
1135                    order,
1136                    &format!("failed-to-submit-order-list-to-client: {e}"),
1137                );
1138            }
1139        }
1140    }
1141
1142    fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
1143        if let Err(e) = client.modify_order(cmd) {
1144            log::error!("Error modifying order: {e}");
1145        }
1146    }
1147
1148    fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
1149        if let Err(e) = client.cancel_order(cmd) {
1150            log::error!("Error canceling order: {e}");
1151        }
1152    }
1153
1154    fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
1155        if let Err(e) = client.cancel_all_orders(cmd) {
1156            log::error!("Error canceling all orders: {e}");
1157        }
1158    }
1159
1160    fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
1161        if let Err(e) = client.batch_cancel_orders(cmd) {
1162            log::error!("Error batch canceling orders: {e}");
1163        }
1164    }
1165
1166    fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
1167        if let Err(e) = client.query_account(cmd) {
1168            log::error!("Error querying account: {e}");
1169        }
1170    }
1171
1172    fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
1173        if let Err(e) = client.query_order(cmd) {
1174            log::error!("Error querying order: {e}");
1175        }
1176    }
1177
1178    fn create_order_state_snapshot(&self, order: &OrderAny) {
1179        if self.config.debug {
1180            log::debug!("Creating order state snapshot for {order}");
1181        }
1182
1183        if self.cache.borrow().has_backing()
1184            && let Err(e) = self.cache.borrow().snapshot_order_state(order)
1185        {
1186            log::error!("Failed to snapshot order state: {e}");
1187        }
1188    }
1189
1190    fn create_position_state_snapshot(&self, position: &Position) {
1191        if self.config.debug {
1192            log::debug!("Creating position state snapshot for {position}");
1193        }
1194
1195        // let mut position: Position = position.clone();
1196        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
1197        //     position.unrealized_pnl(last)
1198        // }
1199    }
1200
1201    fn handle_event(&mut self, event: &OrderEventAny) {
1202        if self.config.debug {
1203            log::debug!("{RECV}{EVT} {event:?}");
1204        }
1205
1206        let client_order_id = event.client_order_id();
1207        let cache = self.cache.borrow();
1208        let mut order = if let Some(order) = cache.order(&client_order_id) {
1209            order.clone()
1210        } else {
1211            log::warn!(
1212                "Order with {} not found in the cache to apply {}",
1213                event.client_order_id(),
1214                event
1215            );
1216
1217            // Try to find order by venue order ID if available
1218            let venue_order_id = if let Some(id) = event.venue_order_id() {
1219                id
1220            } else {
1221                log::error!(
1222                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
1223                    event.client_order_id()
1224                );
1225                return;
1226            };
1227
1228            // Look up client order ID from venue order ID
1229            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
1230                id
1231            } else {
1232                log::error!(
1233                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
1234                    event.client_order_id(),
1235                );
1236                return;
1237            };
1238
1239            // Get order using found client order ID
1240            if let Some(order) = cache.order(client_order_id) {
1241                log::info!("Order with {client_order_id} was found in the cache");
1242                order.clone()
1243            } else {
1244                log::error!(
1245                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
1246                );
1247                return;
1248            }
1249        };
1250
1251        drop(cache);
1252
1253        match event {
1254            OrderEventAny::Filled(fill) => {
1255                let oms_type = self.determine_oms_type(fill);
1256                let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
1257
1258                let mut fill = *fill;
1259                if fill.position_id.is_none() {
1260                    fill.position_id = Some(position_id);
1261                }
1262
1263                if self.apply_fill_to_order(&mut order, fill).is_ok() {
1264                    self.handle_order_fill(&order, fill, oms_type);
1265                }
1266            }
1267            _ => {
1268                let _ = self.apply_event_to_order(&mut order, event.clone());
1269            }
1270        }
1271    }
1272
1273    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
1274        // Check for strategy OMS override
1275        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
1276            return *oms_type;
1277        }
1278
1279        // Use native venue OMS
1280        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
1281            && let Some(client) = self.clients.get(client_id)
1282        {
1283            return client.oms_type();
1284        }
1285
1286        if let Some(client) = &self.default_client {
1287            return client.oms_type();
1288        }
1289
1290        OmsType::Netting // Default fallback
1291    }
1292
1293    fn determine_position_id(
1294        &mut self,
1295        fill: OrderFilled,
1296        oms_type: OmsType,
1297        order: Option<&OrderAny>,
1298    ) -> PositionId {
1299        match oms_type {
1300            OmsType::Hedging => self.determine_hedging_position_id(fill, order),
1301            OmsType::Netting => self.determine_netting_position_id(fill),
1302            _ => self.determine_netting_position_id(fill), // Default to netting
1303        }
1304    }
1305
1306    fn determine_hedging_position_id(
1307        &mut self,
1308        fill: OrderFilled,
1309        order: Option<&OrderAny>,
1310    ) -> PositionId {
1311        // Check if position ID already exists
1312        if let Some(position_id) = fill.position_id {
1313            if self.config.debug {
1314                log::debug!("Already had a position ID of: {position_id}");
1315            }
1316            return position_id;
1317        }
1318
1319        let cache = self.cache.borrow();
1320
1321        let order = if let Some(o) = order {
1322            o
1323        } else {
1324            match cache.order(&fill.client_order_id()) {
1325                Some(o) => o,
1326                None => {
1327                    panic!(
1328                        "Order for {} not found to determine position ID",
1329                        fill.client_order_id()
1330                    );
1331                }
1332            }
1333        };
1334
1335        // Check execution spawn orders
1336        if let Some(spawn_id) = order.exec_spawn_id() {
1337            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
1338            for spawned_order in spawn_orders {
1339                if let Some(pos_id) = spawned_order.position_id() {
1340                    if self.config.debug {
1341                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
1342                    }
1343                    return pos_id;
1344                }
1345            }
1346        }
1347
1348        // Generate new position ID
1349        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
1350        if self.config.debug {
1351            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
1352        }
1353        position_id
1354    }
1355
1356    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
1357        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
1358    }
1359
1360    fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
1361        if order.is_duplicate_fill(&fill) {
1362            log::warn!(
1363                "Duplicate fill: {} trade_id={} already applied, skipping",
1364                order.client_order_id(),
1365                fill.trade_id
1366            );
1367            anyhow::bail!("Duplicate fill");
1368        }
1369
1370        self.check_overfill(order, &fill)?;
1371        let event = OrderEventAny::Filled(fill);
1372        self.apply_order_event(order, event)
1373    }
1374
1375    fn apply_event_to_order(
1376        &self,
1377        order: &mut OrderAny,
1378        event: OrderEventAny,
1379    ) -> anyhow::Result<()> {
1380        self.apply_order_event(order, event)
1381    }
1382
1383    fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
1384        if let Err(e) = order.apply(event.clone()) {
1385            match e {
1386                OrderError::InvalidStateTransition => {
1387                    // Event already applied to order (e.g., from reconciliation or duplicate processing)
1388                    // Log warning and continue with downstream processing (cache update, publishing, etc.)
1389                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
1390                }
1391                OrderError::DuplicateFill(trade_id) => {
1392                    // Duplicate fill detected at order level (secondary safety check)
1393                    log::warn!(
1394                        "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
1395                    );
1396                    anyhow::bail!("{e}");
1397                }
1398                _ => {
1399                    // Protection against invalid IDs and other invariants
1400                    log::error!("Error applying event: {e}, did not apply {event}");
1401                    if should_handle_own_book_order(order) {
1402                        self.cache.borrow_mut().update_own_order_book(order);
1403                    }
1404                    anyhow::bail!("{e}");
1405                }
1406            }
1407        }
1408
1409        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1410            log::error!("Error updating order in cache: {e}");
1411        }
1412
1413        if self.config.debug {
1414            log::debug!("{SEND}{EVT} {event}");
1415        }
1416
1417        let topic = switchboard::get_event_orders_topic(event.strategy_id());
1418        msgbus::publish_order_event(topic, &event);
1419
1420        if self.config.snapshot_orders {
1421            self.create_order_state_snapshot(order);
1422        }
1423
1424        Ok(())
1425    }
1426
1427    fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1428        let potential_overfill = order.calculate_overfill(fill.last_qty);
1429
1430        if potential_overfill.is_positive() {
1431            if self.config.allow_overfills {
1432                log::warn!(
1433                    "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1434                    order.client_order_id(),
1435                    potential_overfill,
1436                    order.filled_qty(),
1437                    fill.last_qty,
1438                    order.quantity()
1439                );
1440            } else {
1441                let msg = format!(
1442                    "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1443                Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1444                    order.client_order_id(),
1445                    potential_overfill,
1446                    order.filled_qty(),
1447                    fill.last_qty,
1448                    order.quantity()
1449                );
1450                anyhow::bail!("{msg}");
1451            }
1452        }
1453
1454        Ok(())
1455    }
1456
1457    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1458        let instrument =
1459            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1460                instrument.clone()
1461            } else {
1462                log::error!(
1463                    "Cannot handle order fill: no instrument found for {}, {fill}",
1464                    fill.instrument_id,
1465                );
1466                return;
1467            };
1468
1469        if self.cache.borrow().account(&fill.account_id).is_none() {
1470            log::error!(
1471                "Cannot handle order fill: no account found for {}, {fill}",
1472                fill.instrument_id.venue,
1473            );
1474            return;
1475        }
1476
1477        // Skip portfolio position updates for combo fills (spread instruments)
1478        // Combo fills are only used for order management, not portfolio updates
1479        let position = if instrument.is_spread() {
1480            None
1481        } else {
1482            self.handle_position_update(instrument.clone(), fill, oms_type);
1483            let position_id = fill.position_id.unwrap();
1484            self.cache.borrow().position(&position_id).cloned()
1485        };
1486
1487        // Handle contingent orders for both spread and non-spread instruments
1488        // For spread instruments, contingent orders work without position linkage
1489        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1490            // For non-spread instruments, link to position if available
1491            if !instrument.is_spread()
1492                && let Some(ref pos) = position
1493                && pos.is_open()
1494            {
1495                let position_id = pos.id;
1496                for client_order_id in order.linked_order_ids().unwrap_or_default() {
1497                    let mut cache = self.cache.borrow_mut();
1498                    let contingent_order = cache.mut_order(client_order_id);
1499                    if let Some(contingent_order) = contingent_order
1500                        && contingent_order.position_id().is_none()
1501                    {
1502                        contingent_order.set_position_id(Some(position_id));
1503
1504                        if let Err(e) = self.cache.borrow_mut().add_position_id(
1505                            &position_id,
1506                            &contingent_order.instrument_id().venue,
1507                            &contingent_order.client_order_id(),
1508                            &contingent_order.strategy_id(),
1509                        ) {
1510                            log::error!("Failed to add position ID: {e}");
1511                        }
1512                    }
1513                }
1514            }
1515            // For spread instruments, contingent orders can still be triggered
1516            // but without position linkage (since no position is created for spreads)
1517        }
1518    }
1519
1520    /// Handle position creation or update for a fill.
1521    ///
1522    /// This function mirrors the Python `_handle_position_update` method.
1523    fn handle_position_update(
1524        &mut self,
1525        instrument: InstrumentAny,
1526        fill: OrderFilled,
1527        oms_type: OmsType,
1528    ) {
1529        let position_id = if let Some(position_id) = fill.position_id {
1530            position_id
1531        } else {
1532            log::error!("Cannot handle position update: no position ID found for fill {fill}");
1533            return;
1534        };
1535
1536        let position_opt = self.cache.borrow().position(&position_id).cloned();
1537
1538        match position_opt {
1539            None => {
1540                // Position is None - open new position
1541                if self.open_position(instrument, None, fill, oms_type).is_ok() {
1542                    // Position opened successfully
1543                }
1544            }
1545            Some(pos) if pos.is_closed() => {
1546                // Position is closed - open new position
1547                if self
1548                    .open_position(instrument, Some(&pos), fill, oms_type)
1549                    .is_ok()
1550                {
1551                    // Position opened successfully
1552                }
1553            }
1554            Some(mut pos) => {
1555                if self.will_flip_position(&pos, fill) {
1556                    // Position will flip
1557                    self.flip_position(instrument, &mut pos, fill, oms_type);
1558                } else {
1559                    // Update existing position
1560                    self.update_position(&mut pos, fill);
1561                }
1562            }
1563        }
1564    }
1565
1566    fn open_position(
1567        &self,
1568        instrument: InstrumentAny,
1569        position: Option<&Position>,
1570        fill: OrderFilled,
1571        oms_type: OmsType,
1572    ) -> anyhow::Result<()> {
1573        if let Some(position) = position {
1574            if Self::is_duplicate_closed_fill(position, &fill) {
1575                log::warn!(
1576                    "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1577                    fill.trade_id,
1578                    position.id,
1579                    fill.order_side,
1580                    fill.last_qty,
1581                    fill.last_px
1582                );
1583                return Ok(());
1584            }
1585            self.reopen_position(position, oms_type)?;
1586        }
1587
1588        let position = Position::new(&instrument, fill);
1589        self.cache
1590            .borrow_mut()
1591            .add_position(position.clone(), oms_type)?; // TODO: Remove clone (change method)
1592
1593        if self.config.snapshot_positions {
1594            self.create_position_state_snapshot(&position);
1595        }
1596
1597        let ts_init = self.clock.borrow().timestamp_ns();
1598        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1599        let topic = switchboard::get_event_positions_topic(event.strategy_id);
1600        msgbus::publish_position_event(topic, &PositionEvent::PositionOpened(event));
1601
1602        Ok(())
1603    }
1604
1605    fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1606        position.events.iter().any(|event| {
1607            event.trade_id == fill.trade_id
1608                && event.order_side == fill.order_side
1609                && event.last_px == fill.last_px
1610                && event.last_qty == fill.last_qty
1611        })
1612    }
1613
1614    fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1615        if oms_type == OmsType::Netting {
1616            if position.is_open() {
1617                anyhow::bail!(
1618                    "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1619                    position.id
1620                );
1621            }
1622            // Snapshot closed position if reopening (NETTING mode)
1623            self.cache.borrow_mut().snapshot_position(position)?;
1624        } else {
1625            // HEDGING mode
1626            log::warn!(
1627                "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1628                position.id
1629            );
1630        }
1631        Ok(())
1632    }
1633
1634    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1635        // Apply the fill to the position
1636        position.apply(&fill);
1637
1638        // Check if position is closed after applying the fill
1639        let is_closed = position.is_closed();
1640
1641        // Update position in cache - this should handle the closed state tracking
1642        if let Err(e) = self.cache.borrow_mut().update_position(position) {
1643            log::error!("Failed to update position: {e:?}");
1644            return;
1645        }
1646
1647        // Verify cache state after update
1648        let cache = self.cache.borrow();
1649
1650        drop(cache);
1651
1652        // Create position state snapshot if enabled
1653        if self.config.snapshot_positions {
1654            self.create_position_state_snapshot(position);
1655        }
1656
1657        // Create and publish appropriate position event
1658        let topic = switchboard::get_event_positions_topic(position.strategy_id);
1659        let ts_init = self.clock.borrow().timestamp_ns();
1660
1661        if is_closed {
1662            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1663            msgbus::publish_position_event(topic, &PositionEvent::PositionClosed(event));
1664        } else {
1665            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1666            msgbus::publish_position_event(topic, &PositionEvent::PositionChanged(event));
1667        }
1668    }
1669
1670    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1671        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1672    }
1673
1674    fn flip_position(
1675        &mut self,
1676        instrument: InstrumentAny,
1677        position: &mut Position,
1678        fill: OrderFilled,
1679        oms_type: OmsType,
1680    ) {
1681        let difference = match position.side {
1682            PositionSide::Long => Quantity::from_raw(
1683                fill.last_qty.raw - position.quantity.raw,
1684                position.size_precision,
1685            ),
1686            PositionSide::Short => Quantity::from_raw(
1687                position.quantity.raw.abs_diff(fill.last_qty.raw), // Equivalent to Python's abs(position.quantity - fill.last_qty)
1688                position.size_precision,
1689            ),
1690            _ => fill.last_qty,
1691        };
1692
1693        // Split commission between two positions
1694        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1695        let (commission1, commission2) = if let Some(commission) = fill.commission {
1696            let commission_currency = commission.currency;
1697            let commission1 = Money::new(commission * fill_percent, commission_currency);
1698            let commission2 = commission - commission1;
1699            (Some(commission1), Some(commission2))
1700        } else {
1701            log::error!("Commission is not available");
1702            (None, None)
1703        };
1704
1705        let mut fill_split1: Option<OrderFilled> = None;
1706        if position.is_open() {
1707            fill_split1 = Some(OrderFilled::new(
1708                fill.trader_id,
1709                fill.strategy_id,
1710                fill.instrument_id,
1711                fill.client_order_id,
1712                fill.venue_order_id,
1713                fill.account_id,
1714                fill.trade_id,
1715                fill.order_side,
1716                fill.order_type,
1717                position.quantity,
1718                fill.last_px,
1719                fill.currency,
1720                fill.liquidity_side,
1721                UUID4::new(),
1722                fill.ts_event,
1723                fill.ts_init,
1724                fill.reconciliation,
1725                fill.position_id,
1726                commission1,
1727            ));
1728
1729            self.update_position(position, fill_split1.unwrap());
1730
1731            // Snapshot closed position before reusing ID (NETTING mode)
1732            if oms_type == OmsType::Netting
1733                && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1734            {
1735                log::error!("Failed to snapshot position during flip: {e:?}");
1736            }
1737        }
1738
1739        // Guard against flipping a position with a zero fill size
1740        if difference.raw == 0 {
1741            log::warn!(
1742                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1743            );
1744            return;
1745        }
1746
1747        let position_id_flip = if oms_type == OmsType::Hedging
1748            && let Some(position_id) = fill.position_id
1749            && position_id.is_virtual()
1750        {
1751            // Generate new position ID for flipped virtual position (Hedging OMS only)
1752            Some(self.pos_id_generator.generate(fill.strategy_id, true))
1753        } else {
1754            // Default: use the same position ID as the fill (Python behavior)
1755            fill.position_id
1756        };
1757
1758        let fill_split2 = OrderFilled::new(
1759            fill.trader_id,
1760            fill.strategy_id,
1761            fill.instrument_id,
1762            fill.client_order_id,
1763            fill.venue_order_id,
1764            fill.account_id,
1765            fill.trade_id,
1766            fill.order_side,
1767            fill.order_type,
1768            difference,
1769            fill.last_px,
1770            fill.currency,
1771            fill.liquidity_side,
1772            UUID4::new(),
1773            fill.ts_event,
1774            fill.ts_init,
1775            fill.reconciliation,
1776            position_id_flip,
1777            commission2,
1778        );
1779
1780        if oms_type == OmsType::Hedging
1781            && let Some(position_id) = fill.position_id
1782            && position_id.is_virtual()
1783        {
1784            log::warn!("Closing position {fill_split1:?}");
1785            log::warn!("Flipping position {fill_split2:?}");
1786        }
1787
1788        // Open flipped position
1789        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1790            log::error!("Failed to open flipped position: {e:?}");
1791        }
1792    }
1793
1794    /// Sets the internal position ID generator counts based on existing cached positions.
1795    pub fn set_position_id_counts(&mut self) {
1796        let cache = self.cache.borrow();
1797        let positions = cache.positions(None, None, None, None, None);
1798
1799        // Count positions per instrument_id using a HashMap
1800        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1801
1802        for position in positions {
1803            *counts.entry(position.strategy_id).or_insert(0) += 1;
1804        }
1805
1806        self.pos_id_generator.reset();
1807
1808        for (strategy_id, count) in counts {
1809            self.pos_id_generator.set_count(count, strategy_id);
1810            log::info!("Set PositionId count for {strategy_id} to {count}");
1811        }
1812    }
1813
1814    fn last_px_for_conversion(
1815        &self,
1816        instrument_id: &InstrumentId,
1817        side: OrderSide,
1818    ) -> Option<Price> {
1819        let cache = self.cache.borrow();
1820
1821        // Try to get last trade price
1822        if let Some(trade) = cache.trade(instrument_id) {
1823            return Some(trade.price);
1824        }
1825
1826        // Fall back to quote if available
1827        if let Some(quote) = cache.quote(instrument_id) {
1828            match side {
1829                OrderSide::Buy => Some(quote.ask_price),
1830                OrderSide::Sell => Some(quote.bid_price),
1831                OrderSide::NoOrderSide => None,
1832            }
1833        } else {
1834            None
1835        }
1836    }
1837
1838    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1839        log::info!(
1840            "Setting {} order quote quantity {} to base quantity {}",
1841            order.instrument_id(),
1842            order.quantity(),
1843            base_qty
1844        );
1845
1846        let original_qty = order.quantity();
1847        order.set_quantity(base_qty);
1848        order.set_leaves_qty(base_qty);
1849        order.set_is_quote_quantity(false);
1850
1851        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1852            return;
1853        }
1854
1855        if let Some(linked_order_ids) = order.linked_order_ids() {
1856            for client_order_id in linked_order_ids {
1857                match self.cache.borrow_mut().mut_order(client_order_id) {
1858                    Some(contingent_order) => {
1859                        if !contingent_order.is_quote_quantity() {
1860                            continue; // Already base quantity
1861                        }
1862
1863                        if contingent_order.quantity() != original_qty {
1864                            log::warn!(
1865                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1866                                contingent_order.quantity(),
1867                                original_qty,
1868                                base_qty
1869                            );
1870                        }
1871
1872                        log::info!(
1873                            "Setting {} order quote quantity {} to base quantity {}",
1874                            contingent_order.instrument_id(),
1875                            contingent_order.quantity(),
1876                            base_qty
1877                        );
1878
1879                        contingent_order.set_quantity(base_qty);
1880                        contingent_order.set_leaves_qty(base_qty);
1881                        contingent_order.set_is_quote_quantity(false);
1882                    }
1883                    None => {
1884                        log::error!("Contingency order {client_order_id} not found");
1885                    }
1886                }
1887            }
1888        } else {
1889            log::warn!(
1890                "No linked order IDs found for order {}",
1891                order.client_order_id()
1892            );
1893        }
1894    }
1895
1896    fn deny_order(&self, order: &OrderAny, reason: &str) {
1897        let denied = OrderDenied::new(
1898            order.trader_id(),
1899            order.strategy_id(),
1900            order.instrument_id(),
1901            order.client_order_id(),
1902            reason.into(),
1903            UUID4::new(),
1904            self.clock.borrow().timestamp_ns(),
1905            self.clock.borrow().timestamp_ns(),
1906        );
1907
1908        let mut order = order.clone();
1909
1910        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1911            log::error!("Failed to apply denied event to order: {e}");
1912            return;
1913        }
1914
1915        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1916            log::error!("Failed to update order in cache: {e}");
1917            return;
1918        }
1919
1920        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1921        msgbus::publish_order_event(topic, &OrderEventAny::Denied(denied));
1922
1923        if self.config.snapshot_orders {
1924            self.create_order_state_snapshot(&order);
1925        }
1926    }
1927
1928    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1929        let mut cache = self.cache.borrow_mut();
1930        if cache.own_order_book_mut(instrument_id).is_none() {
1931            let own_book = OwnOrderBook::new(*instrument_id);
1932            cache.add_own_order_book(own_book).unwrap();
1933        }
1934
1935        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1936    }
1937}