nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24
25use std::{
26    cell::{RefCell, RefMut},
27    collections::{HashMap, HashSet},
28    fmt::Debug,
29    rc::Rc,
30    time::SystemTime,
31};
32
33use config::ExecutionEngineConfig;
34use nautilus_common::{
35    cache::Cache,
36    clock::Clock,
37    generators::position_id::PositionIdGenerator,
38    logging::{CMD, EVT, RECV},
39    messages::execution::{
40        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
41        SubmitOrder, SubmitOrderList, TradingCommand,
42    },
43    msgbus::{
44        self, get_message_bus,
45        switchboard::{self},
46    },
47};
48use nautilus_core::UUID4;
49use nautilus_model::{
50    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
51    events::{
52        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
53        PositionOpened,
54    },
55    identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
56    instruments::{Instrument, InstrumentAny},
57    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
58    orders::{Order, OrderAny, OrderError},
59    position::Position,
60    types::{Money, Price, Quantity},
61};
62
63use crate::client::ExecutionClient;
64
65/// Central execution engine responsible for orchestrating order routing and execution.
66///
67/// The execution engine manages the entire order lifecycle from submission to completion,
68/// handling routing to appropriate execution clients, position management, and event
69/// processing. It supports multiple execution venues through registered clients and
70/// provides sophisticated order management capabilities.
71pub struct ExecutionEngine {
72    clock: Rc<RefCell<dyn Clock>>,
73    cache: Rc<RefCell<Cache>>,
74    clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
75    default_client: Option<Rc<dyn ExecutionClient>>,
76    routing_map: HashMap<Venue, ClientId>,
77    oms_overrides: HashMap<StrategyId, OmsType>,
78    external_order_claims: HashMap<InstrumentId, StrategyId>,
79    external_clients: HashSet<ClientId>,
80    pos_id_generator: PositionIdGenerator,
81    config: ExecutionEngineConfig,
82}
83
84impl Debug for ExecutionEngine {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct(stringify!(ExecutionEngine))
87            .field("client_count", &self.clients.len())
88            .finish()
89    }
90}
91
92impl ExecutionEngine {
93    /// Creates a new [`ExecutionEngine`] instance.
94    pub fn new(
95        clock: Rc<RefCell<dyn Clock>>,
96        cache: Rc<RefCell<Cache>>,
97        config: Option<ExecutionEngineConfig>,
98    ) -> Self {
99        let trader_id = get_message_bus().borrow().trader_id;
100        Self {
101            clock: clock.clone(),
102            cache,
103            clients: HashMap::new(),
104            default_client: None,
105            routing_map: HashMap::new(),
106            oms_overrides: HashMap::new(),
107            external_order_claims: HashMap::new(),
108            external_clients: config
109                .as_ref()
110                .and_then(|c| c.external_clients.clone())
111                .unwrap_or_default()
112                .into_iter()
113                .collect(),
114            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
115            config: config.unwrap_or_default(),
116        }
117    }
118
119    #[must_use]
120    /// Returns the position ID count for the specified strategy.
121    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
122        self.pos_id_generator.count(strategy_id)
123    }
124
125    #[must_use]
126    /// Checks the integrity of cached execution data.
127    pub fn check_integrity(&self) -> bool {
128        self.cache.borrow_mut().check_integrity()
129    }
130
131    #[must_use]
132    /// Returns true if all registered execution clients are connected.
133    pub fn check_connected(&self) -> bool {
134        self.clients.values().all(|c| c.is_connected())
135    }
136
137    #[must_use]
138    /// Returns true if all registered execution clients are disconnected.
139    pub fn check_disconnected(&self) -> bool {
140        self.clients.values().all(|c| !c.is_connected())
141    }
142
143    #[must_use]
144    /// Checks for residual positions and orders in the cache.
145    pub fn check_residuals(&self) -> bool {
146        self.cache.borrow().check_residuals()
147    }
148
149    #[must_use]
150    /// Returns the set of instruments that have external order claims.
151    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
152        self.external_order_claims.keys().copied().collect()
153    }
154
155    // -- REGISTRATION ----------------------------------------------------------------------------
156
157    /// Registers a new execution client.
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if a client with the same ID is already registered.
162    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
163        if self.clients.contains_key(&client.client_id()) {
164            anyhow::bail!("Client already registered with ID {}", client.client_id());
165        }
166
167        // If client has venue, register routing
168        self.routing_map.insert(client.venue(), client.client_id());
169
170        log::info!("Registered client {}", client.client_id());
171        self.clients.insert(client.client_id(), client);
172        Ok(())
173    }
174
175    /// Registers a default execution client for fallback routing.
176    pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
177        log::info!("Registered default client {}", client.client_id());
178        self.default_client = Some(client);
179    }
180
181    #[must_use]
182    /// Returns the execution client registered with the given ID.
183    pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
184        self.clients.get(client_id).cloned()
185    }
186
187    /// Sets routing for a specific venue to a given client ID.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the client ID is not registered.
192    pub fn register_venue_routing(
193        &mut self,
194        client_id: ClientId,
195        venue: Venue,
196    ) -> anyhow::Result<()> {
197        if !self.clients.contains_key(&client_id) {
198            anyhow::bail!("No client registered with ID {client_id}");
199        }
200
201        self.routing_map.insert(venue, client_id);
202        log::info!("Set client {client_id} routing for {venue}");
203        Ok(())
204    }
205
206    // TODO: Implement `Strategy`
207    // pub fn register_external_order_claims(&mut self, strategy: Strategy) -> anyhow::Result<()> {
208    //     todo!();
209    // }
210
211    /// # Errors
212    ///
213    /// Returns an error if no client is registered with the given ID.
214    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
215        if self.clients.remove(&client_id).is_some() {
216            // Remove from routing map if present
217            self.routing_map
218                .retain(|_, mapped_id| mapped_id != &client_id);
219            log::info!("Deregistered client {client_id}");
220            Ok(())
221        } else {
222            anyhow::bail!("No client registered with ID {client_id}")
223        }
224    }
225
226    // -- COMMANDS --------------------------------------------------------------------------------
227
228    #[allow(clippy::await_holding_refcell_ref)]
229    /// Loads persistent state into cache and rebuilds indices.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if any cache operation fails.
234    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
235        let ts = SystemTime::now();
236
237        {
238            let mut cache = self.cache.borrow_mut();
239            cache.clear_index();
240            cache.cache_general()?;
241            self.cache.borrow_mut().cache_all().await?;
242            cache.build_index();
243            let _ = cache.check_integrity();
244
245            if self.config.manage_own_order_books {
246                for order in cache.orders(None, None, None, None) {
247                    if order.is_closed() || !should_handle_own_book_order(order) {
248                        continue;
249                    }
250                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
251                    own_book.add(order.to_own_book_order());
252                }
253            }
254        }
255
256        self.set_position_id_counts();
257
258        log::info!(
259            "Loaded cache in {}ms",
260            SystemTime::now()
261                .duration_since(ts)
262                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
263                .as_millis()
264        );
265
266        Ok(())
267    }
268
269    /// Flushes the database to persist all cached data.
270    pub fn flush_db(&self) {
271        self.cache.borrow_mut().flush_db();
272    }
273
274    /// Processes an order event, updating internal state and routing as needed.
275    pub fn process(&mut self, event: &OrderEventAny) {
276        self.handle_event(event);
277    }
278
279    /// Executes a trading command by routing it to the appropriate execution client.
280    pub fn execute(&self, command: &TradingCommand) {
281        self.execute_command(command);
282    }
283
284    // -- COMMAND HANDLERS ------------------------------------------------------------------------
285
286    fn execute_command(&self, command: &TradingCommand) {
287        if self.config.debug {
288            log::debug!("{RECV}{CMD} {command:?}");
289        }
290
291        if self.external_clients.contains(&command.client_id()) {
292            if self.config.debug {
293                let cid = command.client_id();
294                log::debug!("Skipping execution command for external client {cid}: {command:?}");
295            }
296            return;
297        }
298
299        let client: Rc<dyn ExecutionClient> = if let Some(client) = self
300            .clients
301            .get(&command.client_id())
302            .or_else(|| {
303                self.routing_map
304                    .get(&command.instrument_id().venue)
305                    .and_then(|client_id| self.clients.get(client_id))
306            })
307            .or(self.default_client.as_ref())
308        {
309            client.clone()
310        } else {
311            log::error!(
312                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
313                command.client_id(),
314                command.instrument_id().venue,
315            );
316            return;
317        };
318
319        match command {
320            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
321            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
322            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
323            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
324            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
325            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
326            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
327            TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
328        }
329    }
330
331    fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrder) {
332        let mut order = cmd.order.clone();
333        let client_order_id = order.client_order_id();
334        let instrument_id = order.instrument_id();
335
336        // Check if the order exists in the cache
337        if !self.cache.borrow().order_exists(&client_order_id) {
338            // Add order to cache in a separate scope to drop the mutable borrow
339            {
340                let mut cache = self.cache.borrow_mut();
341                if let Err(e) =
342                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
343                {
344                    log::error!("Error adding order to cache: {e}");
345                    return;
346                }
347            }
348
349            if self.config.snapshot_orders {
350                self.create_order_state_snapshot(&order);
351            }
352        }
353
354        // Get instrument in a separate scope to manage borrows
355        let instrument = {
356            let cache = self.cache.borrow();
357            if let Some(instrument) = cache.instrument(&instrument_id) {
358                instrument.clone()
359            } else {
360                log::error!(
361                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
362                );
363                return;
364            }
365        };
366
367        // Handle quote quantity conversion
368        if self.config.convert_quote_qty_to_base
369            && !instrument.is_inverse()
370            && order.is_quote_quantity()
371        {
372            log::warn!(
373                "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
374            );
375            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
376
377            if let Some(price) = last_px {
378                let base_qty = instrument.get_base_quantity(order.quantity(), price);
379                self.set_order_base_qty(&mut order, base_qty);
380            } else {
381                self.deny_order(
382                    &order,
383                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
384                );
385                return;
386            }
387        }
388
389        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
390            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
391            own_book.add(order.to_own_book_order());
392        }
393
394        // Send the order to the execution client
395        if let Err(e) = client.submit_order(cmd) {
396            log::error!("Error submitting order to client: {e}");
397            self.deny_order(
398                &cmd.order,
399                &format!("failed-to-submit-order-to-client: {e}"),
400            );
401        }
402    }
403
404    fn handle_submit_order_list(&self, client: Rc<dyn ExecutionClient>, cmd: &SubmitOrderList) {
405        let orders = cmd.order_list.orders.clone();
406
407        // Cache orders
408        let mut cache = self.cache.borrow_mut();
409        for order in &orders {
410            if !cache.order_exists(&order.client_order_id()) {
411                if let Err(e) =
412                    cache.add_order(order.clone(), cmd.position_id, Some(cmd.client_id), true)
413                {
414                    log::error!("Error adding order to cache: {e}");
415                    return;
416                }
417
418                if self.config.snapshot_orders {
419                    self.create_order_state_snapshot(order);
420                }
421            }
422        }
423        drop(cache);
424
425        // Get instrument from cache
426        let instrument = {
427            let cache = self.cache.borrow();
428            if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
429                instrument.clone()
430            } else {
431                log::error!(
432                    "Cannot handle submit order list: no instrument found for {}, {cmd}",
433                    cmd.instrument_id,
434                );
435                return;
436            }
437        };
438
439        // Handle quote quantity conversion
440        if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
441            let mut conversions: Vec<(ClientOrderId, Quantity)> =
442                Vec::with_capacity(cmd.order_list.orders.len());
443
444            for order in &cmd.order_list.orders {
445                if !order.is_quote_quantity() {
446                    continue; // Base quantity already set
447                }
448
449                let last_px =
450                    self.last_px_for_conversion(&order.instrument_id(), order.order_side());
451
452                if let Some(px) = last_px {
453                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
454                    conversions.push((order.client_order_id(), base_qty));
455                } else {
456                    for order in &cmd.order_list.orders {
457                        self.deny_order(
458                            order,
459                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
460                        );
461                    }
462                    return; // Denied
463                }
464            }
465
466            if !conversions.is_empty() {
467                log::warn!(
468                    "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
469                );
470
471                let mut cache = self.cache.borrow_mut();
472                for (client_order_id, base_qty) in conversions {
473                    if let Some(mut_order) = cache.mut_order(&client_order_id) {
474                        self.set_order_base_qty(mut_order, base_qty);
475                    }
476                }
477            }
478        }
479
480        if self.config.manage_own_order_books {
481            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
482            for order in &cmd.order_list.orders {
483                if should_handle_own_book_order(order) {
484                    own_book.add(order.to_own_book_order());
485                }
486            }
487        }
488
489        // Send to execution client
490        if let Err(e) = client.submit_order_list(cmd) {
491            log::error!("Error submitting order list to client: {e}");
492            for order in &orders {
493                self.deny_order(
494                    order,
495                    &format!("failed-to-submit-order-list-to-client: {e}"),
496                );
497            }
498        }
499    }
500
501    fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, cmd: &ModifyOrder) {
502        if let Err(e) = client.modify_order(cmd) {
503            log::error!("Error modifying order: {e}");
504        }
505    }
506
507    fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelOrder) {
508        if let Err(e) = client.cancel_order(cmd) {
509            log::error!("Error canceling order: {e}");
510        }
511    }
512
513    fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &CancelAllOrders) {
514        if let Err(e) = client.cancel_all_orders(cmd) {
515            log::error!("Error canceling all orders: {e}");
516        }
517    }
518
519    fn handle_batch_cancel_orders(&self, client: Rc<dyn ExecutionClient>, cmd: &BatchCancelOrders) {
520        if let Err(e) = client.batch_cancel_orders(cmd) {
521            log::error!("Error batch canceling orders: {e}");
522        }
523    }
524
525    fn handle_query_account(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryAccount) {
526        if let Err(e) = client.query_account(cmd) {
527            log::error!("Error querying account: {e}");
528        }
529    }
530
531    fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, cmd: &QueryOrder) {
532        if let Err(e) = client.query_order(cmd) {
533            log::error!("Error querying order: {e}");
534        }
535    }
536
537    fn create_order_state_snapshot(&self, order: &OrderAny) {
538        if self.config.debug {
539            log::debug!("Creating order state snapshot for {order}");
540        }
541
542        if self.cache.borrow().has_backing()
543            && let Err(e) = self.cache.borrow().snapshot_order_state(order)
544        {
545            log::error!("Failed to snapshot order state: {e}");
546            return;
547        }
548
549        if get_message_bus().borrow().has_backing {
550            let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
551            msgbus::publish(topic, order);
552        }
553    }
554
555    fn create_position_state_snapshot(&self, position: &Position) {
556        if self.config.debug {
557            log::debug!("Creating position state snapshot for {position}");
558        }
559
560        // let mut position: Position = position.clone();
561        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
562        //     position.unrealized_pnl(last)
563        // }
564
565        let topic = switchboard::get_positions_snapshots_topic(position.id);
566        msgbus::publish(topic, position);
567    }
568
569    // -- EVENT HANDLERS --------------------------------------------------------------------------
570
571    fn handle_event(&mut self, event: &OrderEventAny) {
572        if self.config.debug {
573            log::debug!("{RECV}{EVT} {event:?}");
574        }
575
576        let client_order_id = event.client_order_id();
577        let cache = self.cache.borrow();
578        let mut order = if let Some(order) = cache.order(&client_order_id) {
579            order.clone()
580        } else {
581            log::warn!(
582                "Order with {} not found in the cache to apply {}",
583                event.client_order_id(),
584                event
585            );
586
587            // Try to find order by venue order ID if available
588            let venue_order_id = if let Some(id) = event.venue_order_id() {
589                id
590            } else {
591                log::error!(
592                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
593                    event.client_order_id()
594                );
595                return;
596            };
597
598            // Look up client order ID from venue order ID
599            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
600                id
601            } else {
602                log::error!(
603                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
604                    event.client_order_id(),
605                );
606                return;
607            };
608
609            // Get order using found client order ID
610            if let Some(order) = cache.order(client_order_id) {
611                log::info!("Order with {client_order_id} was found in the cache");
612                order.clone()
613            } else {
614                log::error!(
615                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
616                );
617                return;
618            }
619        };
620
621        drop(cache);
622        match event {
623            OrderEventAny::Filled(fill) => {
624                let oms_type = self.determine_oms_type(fill);
625                let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
626
627                // Create a new fill with the determined position ID
628                let mut fill = *fill;
629                if fill.position_id.is_none() {
630                    fill.position_id = Some(position_id);
631                }
632
633                self.apply_event_to_order(&mut order, OrderEventAny::Filled(fill));
634                self.handle_order_fill(&order, fill, oms_type);
635            }
636            _ => {
637                self.apply_event_to_order(&mut order, event.clone());
638            }
639        }
640    }
641
642    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
643        // Check for strategy OMS override
644        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
645            return *oms_type;
646        }
647
648        // Use native venue OMS
649        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
650            && let Some(client) = self.clients.get(client_id)
651        {
652            return client.oms_type();
653        }
654
655        if let Some(client) = &self.default_client {
656            return client.oms_type();
657        }
658
659        OmsType::Netting // Default fallback
660    }
661
662    fn determine_position_id(
663        &mut self,
664        fill: OrderFilled,
665        oms_type: OmsType,
666        order: Option<&OrderAny>,
667    ) -> PositionId {
668        match oms_type {
669            OmsType::Hedging => self.determine_hedging_position_id(fill, order),
670            OmsType::Netting => self.determine_netting_position_id(fill),
671            _ => self.determine_netting_position_id(fill), // Default to netting
672        }
673    }
674
675    fn determine_hedging_position_id(
676        &mut self,
677        fill: OrderFilled,
678        order: Option<&OrderAny>,
679    ) -> PositionId {
680        // Check if position ID already exists
681        if let Some(position_id) = fill.position_id {
682            if self.config.debug {
683                log::debug!("Already had a position ID of: {position_id}");
684            }
685            return position_id;
686        }
687
688        let cache = self.cache.borrow();
689
690        let order = if let Some(o) = order {
691            o
692        } else {
693            match cache.order(&fill.client_order_id()) {
694                Some(o) => o,
695                None => {
696                    panic!(
697                        "Order for {} not found to determine position ID",
698                        fill.client_order_id()
699                    );
700                }
701            }
702        };
703
704        // Check execution spawn orders
705        if let Some(spawn_id) = order.exec_spawn_id() {
706            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
707            for spawned_order in spawn_orders {
708                if let Some(pos_id) = spawned_order.position_id() {
709                    if self.config.debug {
710                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
711                    }
712                    return pos_id;
713                }
714            }
715        }
716
717        // Generate new position ID
718        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
719        if self.config.debug {
720            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
721        }
722        position_id
723    }
724
725    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
726        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
727    }
728
729    fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
730        if let Err(e) = order.apply(event.clone()) {
731            if matches!(e, OrderError::InvalidStateTransition) {
732                log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
733            } else {
734                // ValueError: Protection against invalid IDs
735                // KeyError: Protection against duplicate fills
736                log::error!("Error applying event: {e}, did not apply {event}");
737                if should_handle_own_book_order(order) {
738                    self.cache.borrow_mut().update_own_order_book(order);
739                }
740            }
741            return;
742        }
743
744        if let Err(e) = self.cache.borrow_mut().update_order(order) {
745            log::error!("Error updating order in cache: {e}");
746        }
747
748        let topic = switchboard::get_event_orders_topic(event.strategy_id());
749        msgbus::publish(topic, order);
750
751        if self.config.snapshot_orders {
752            self.create_order_state_snapshot(order);
753        }
754    }
755
756    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
757        let instrument =
758            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
759                instrument.clone()
760            } else {
761                log::error!(
762                    "Cannot handle order fill: no instrument found for {}, {fill}",
763                    fill.instrument_id,
764                );
765                return;
766            };
767
768        if self.cache.borrow().account(&fill.account_id).is_none() {
769            log::error!(
770                "Cannot handle order fill: no account found for {}, {fill}",
771                fill.instrument_id.venue,
772            );
773            return;
774        }
775
776        // Skip portfolio position updates for combo fills (spread instruments)
777        // Combo fills are only used for order management, not portfolio updates
778        let position = if instrument.is_spread() {
779            None
780        } else {
781            self.handle_position_update(instrument.clone(), fill, oms_type);
782            let position_id = fill.position_id.unwrap();
783            self.cache.borrow().position(&position_id).cloned()
784        };
785
786        // Handle contingent orders for both spread and non-spread instruments
787        // For spread instruments, contingent orders work without position linkage
788        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
789            // For non-spread instruments, link to position if available
790            if !instrument.is_spread()
791                && let Some(ref pos) = position
792                && pos.is_open()
793            {
794                let position_id = pos.id;
795                for client_order_id in order.linked_order_ids().unwrap_or_default() {
796                    let mut cache = self.cache.borrow_mut();
797                    let contingent_order = cache.mut_order(client_order_id);
798                    if let Some(contingent_order) = contingent_order
799                        && contingent_order.position_id().is_none()
800                    {
801                        contingent_order.set_position_id(Some(position_id));
802
803                        if let Err(e) = self.cache.borrow_mut().add_position_id(
804                            &position_id,
805                            &contingent_order.instrument_id().venue,
806                            &contingent_order.client_order_id(),
807                            &contingent_order.strategy_id(),
808                        ) {
809                            log::error!("Failed to add position ID: {e}");
810                        }
811                    }
812                }
813            }
814            // For spread instruments, contingent orders can still be triggered
815            // but without position linkage (since no position is created for spreads)
816        }
817    }
818
819    /// Handle position creation or update for a fill.
820    ///
821    /// This function mirrors the Python `_handle_position_update` method.
822    fn handle_position_update(
823        &mut self,
824        instrument: InstrumentAny,
825        fill: OrderFilled,
826        oms_type: OmsType,
827    ) {
828        let position_id = if let Some(position_id) = fill.position_id {
829            position_id
830        } else {
831            log::error!("Cannot handle position update: no position ID found for fill {fill}");
832            return;
833        };
834
835        let position_opt = self.cache.borrow().position(&position_id).cloned();
836
837        match position_opt {
838            None => {
839                // Position is None - open new position
840                if self.open_position(instrument, None, fill, oms_type).is_ok() {
841                    // Position opened successfully
842                }
843            }
844            Some(pos) if pos.is_closed() => {
845                // Position is closed - open new position
846                if self
847                    .open_position(instrument, Some(&pos), fill, oms_type)
848                    .is_ok()
849                {
850                    // Position opened successfully
851                }
852            }
853            Some(mut pos) => {
854                if self.will_flip_position(&pos, fill) {
855                    // Position will flip
856                    self.flip_position(instrument, &mut pos, fill, oms_type);
857                } else {
858                    // Update existing position
859                    self.update_position(&mut pos, fill);
860                }
861            }
862        }
863    }
864
865    fn open_position(
866        &self,
867        instrument: InstrumentAny,
868        position: Option<&Position>,
869        fill: OrderFilled,
870        oms_type: OmsType,
871    ) -> anyhow::Result<Position> {
872        let position = if let Some(position) = position {
873            // Always snapshot opening positions to handle NETTING OMS
874            self.cache.borrow_mut().snapshot_position(position)?;
875            let mut position = position.clone();
876            position.apply(&fill);
877            self.cache.borrow_mut().update_position(&position)?;
878            position
879        } else {
880            let position = Position::new(&instrument, fill);
881            self.cache
882                .borrow_mut()
883                .add_position(position.clone(), oms_type)?;
884            if self.config.snapshot_positions {
885                self.create_position_state_snapshot(&position);
886            }
887            position
888        };
889
890        let ts_init = self.clock.borrow().timestamp_ns();
891        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
892        let topic = switchboard::get_event_positions_topic(event.strategy_id);
893        msgbus::publish(topic, &event);
894
895        Ok(position)
896    }
897
898    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
899        // Apply the fill to the position
900        position.apply(&fill);
901
902        // Check if position is closed after applying the fill
903        let is_closed = position.is_closed();
904
905        // Update position in cache - this should handle the closed state tracking
906        if let Err(e) = self.cache.borrow_mut().update_position(position) {
907            log::error!("Failed to update position: {e:?}");
908            return;
909        }
910
911        // Verify cache state after update
912        let cache = self.cache.borrow();
913
914        drop(cache);
915
916        // Create position state snapshot if enabled
917        if self.config.snapshot_positions {
918            self.create_position_state_snapshot(position);
919        }
920
921        // Create and publish appropriate position event
922        let topic = switchboard::get_event_positions_topic(position.strategy_id);
923        let ts_init = self.clock.borrow().timestamp_ns();
924
925        if is_closed {
926            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
927            msgbus::publish(topic, &event);
928        } else {
929            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
930            msgbus::publish(topic, &event);
931        }
932    }
933
934    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
935        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
936    }
937
938    fn flip_position(
939        &mut self,
940        instrument: InstrumentAny,
941        position: &mut Position,
942        fill: OrderFilled,
943        oms_type: OmsType,
944    ) {
945        let difference = match position.side {
946            PositionSide::Long => Quantity::from_raw(
947                fill.last_qty.raw - position.quantity.raw,
948                position.size_precision,
949            ),
950            PositionSide::Short => Quantity::from_raw(
951                position.quantity.raw.abs_diff(fill.last_qty.raw), // Equivalent to Python's abs(position.quantity - fill.last_qty)
952                position.size_precision,
953            ),
954            _ => fill.last_qty,
955        };
956
957        // Split commission between two positions
958        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
959        let (commission1, commission2) = if let Some(commission) = fill.commission {
960            let commission_currency = commission.currency;
961            let commission1 = Money::new(commission * fill_percent, commission_currency);
962            let commission2 = commission - commission1;
963            (Some(commission1), Some(commission2))
964        } else {
965            log::error!("Commission is not available.");
966            (None, None)
967        };
968
969        let mut fill_split1: Option<OrderFilled> = None;
970        if position.is_open() {
971            fill_split1 = Some(OrderFilled::new(
972                fill.trader_id,
973                fill.strategy_id,
974                fill.instrument_id,
975                fill.client_order_id,
976                fill.venue_order_id,
977                fill.account_id,
978                fill.trade_id,
979                fill.order_side,
980                fill.order_type,
981                position.quantity,
982                fill.last_px,
983                fill.currency,
984                fill.liquidity_side,
985                UUID4::new(),
986                fill.ts_event,
987                fill.ts_init,
988                fill.reconciliation,
989                fill.position_id,
990                commission1,
991            ));
992
993            self.update_position(position, fill_split1.unwrap());
994        }
995
996        // Guard against flipping a position with a zero fill size
997        if difference.raw == 0 {
998            log::warn!(
999                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1000            );
1001            return;
1002        }
1003
1004        let position_id_flip = if oms_type == OmsType::Hedging
1005            && let Some(position_id) = fill.position_id
1006            && position_id.is_virtual()
1007        {
1008            // Generate new position ID for flipped virtual position (Hedging OMS only)
1009            Some(self.pos_id_generator.generate(fill.strategy_id, true))
1010        } else {
1011            // Default: use the same position ID as the fill (Python behavior)
1012            fill.position_id
1013        };
1014
1015        let fill_split2 = OrderFilled::new(
1016            fill.trader_id,
1017            fill.strategy_id,
1018            fill.instrument_id,
1019            fill.client_order_id,
1020            fill.venue_order_id,
1021            fill.account_id,
1022            fill.trade_id,
1023            fill.order_side,
1024            fill.order_type,
1025            difference,
1026            fill.last_px,
1027            fill.currency,
1028            fill.liquidity_side,
1029            UUID4::new(),
1030            fill.ts_event,
1031            fill.ts_init,
1032            fill.reconciliation,
1033            position_id_flip,
1034            commission2,
1035        );
1036
1037        if oms_type == OmsType::Hedging
1038            && let Some(position_id) = fill.position_id
1039            && position_id.is_virtual()
1040        {
1041            log::warn!("Closing position {fill_split1:?}");
1042            log::warn!("Flipping position {fill_split2:?}");
1043        }
1044        // Open flipped position
1045        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1046            log::error!("Failed to open flipped position: {e:?}");
1047        }
1048    }
1049
1050    // -- INTERNAL --------------------------------------------------------------------------------
1051
1052    fn set_position_id_counts(&mut self) {
1053        // For the internal position ID generator
1054        let cache = self.cache.borrow();
1055        let positions = cache.positions(None, None, None, None);
1056
1057        // Count positions per instrument_id using a HashMap
1058        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1059
1060        for position in positions {
1061            *counts.entry(position.strategy_id).or_insert(0) += 1;
1062        }
1063
1064        self.pos_id_generator.reset();
1065
1066        for (strategy_id, count) in counts {
1067            self.pos_id_generator.set_count(count, strategy_id);
1068            log::info!("Set PositionId count for {strategy_id} to {count}");
1069        }
1070    }
1071
1072    fn last_px_for_conversion(
1073        &self,
1074        instrument_id: &InstrumentId,
1075        side: OrderSide,
1076    ) -> Option<Price> {
1077        let cache = self.cache.borrow();
1078
1079        // Try to get last trade price
1080        if let Some(trade) = cache.trade(instrument_id) {
1081            return Some(trade.price);
1082        }
1083
1084        // Fall back to quote if available
1085        if let Some(quote) = cache.quote(instrument_id) {
1086            match side {
1087                OrderSide::Buy => Some(quote.ask_price),
1088                OrderSide::Sell => Some(quote.bid_price),
1089                OrderSide::NoOrderSide => None,
1090            }
1091        } else {
1092            None
1093        }
1094    }
1095
1096    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1097        log::info!(
1098            "Setting {} order quote quantity {} to base quantity {}",
1099            order.instrument_id(),
1100            order.quantity(),
1101            base_qty
1102        );
1103
1104        let original_qty = order.quantity();
1105        order.set_quantity(base_qty);
1106        order.set_leaves_qty(base_qty);
1107        order.set_is_quote_quantity(false);
1108
1109        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1110            return;
1111        }
1112
1113        if let Some(linked_order_ids) = order.linked_order_ids() {
1114            for client_order_id in linked_order_ids {
1115                match self.cache.borrow_mut().mut_order(client_order_id) {
1116                    Some(contingent_order) => {
1117                        if !contingent_order.is_quote_quantity() {
1118                            continue; // Already base quantity
1119                        }
1120
1121                        if contingent_order.quantity() != original_qty {
1122                            log::warn!(
1123                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1124                                contingent_order.quantity(),
1125                                original_qty,
1126                                base_qty
1127                            );
1128                        }
1129
1130                        log::info!(
1131                            "Setting {} order quote quantity {} to base quantity {}",
1132                            contingent_order.instrument_id(),
1133                            contingent_order.quantity(),
1134                            base_qty
1135                        );
1136
1137                        contingent_order.set_quantity(base_qty);
1138                        contingent_order.set_leaves_qty(base_qty);
1139                        contingent_order.set_is_quote_quantity(false);
1140                    }
1141                    None => {
1142                        log::error!("Contingency order {client_order_id} not found");
1143                    }
1144                }
1145            }
1146        } else {
1147            log::warn!(
1148                "No linked order IDs found for order {}",
1149                order.client_order_id()
1150            );
1151        }
1152    }
1153
1154    fn deny_order(&self, order: &OrderAny, reason: &str) {
1155        log::error!(
1156            "Order denied: {reason}, order ID: {}",
1157            order.client_order_id()
1158        );
1159
1160        let denied = OrderDenied::new(
1161            order.trader_id(),
1162            order.strategy_id(),
1163            order.instrument_id(),
1164            order.client_order_id(),
1165            reason.into(),
1166            UUID4::new(),
1167            self.clock.borrow().timestamp_ns(),
1168            self.clock.borrow().timestamp_ns(),
1169        );
1170
1171        let mut order = order.clone();
1172
1173        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1174            log::error!("Failed to apply denied event to order: {e}");
1175            return;
1176        }
1177
1178        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1179            log::error!("Failed to update order in cache: {e}");
1180            return;
1181        }
1182
1183        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1184        msgbus::publish(topic, &denied);
1185
1186        if self.config.snapshot_orders {
1187            self.create_order_state_snapshot(&order);
1188        }
1189    }
1190
1191    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1192        let mut cache = self.cache.borrow_mut();
1193        if cache.own_order_book_mut(instrument_id).is_none() {
1194            let own_book = OwnOrderBook::new(*instrument_id);
1195            cache.add_own_order_book(own_book).unwrap();
1196        }
1197
1198        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1199    }
1200}
1201
1202////////////////////////////////////////////////////////////////////////////////
1203// Tests
1204////////////////////////////////////////////////////////////////////////////////
1205mod stubs;
1206#[cfg(test)]
1207mod tests;