nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24
25use std::{
26    cell::{RefCell, RefMut},
27    collections::{HashMap, HashSet},
28    rc::Rc,
29    time::SystemTime,
30};
31
32use config::ExecutionEngineConfig;
33use nautilus_common::{
34    cache::Cache,
35    clock::Clock,
36    generators::position_id::PositionIdGenerator,
37    logging::{CMD, EVT, RECV},
38    msgbus::{
39        self, get_message_bus,
40        switchboard::{self},
41    },
42};
43use nautilus_core::UUID4;
44use nautilus_model::{
45    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
46    events::{
47        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
48        PositionOpened,
49    },
50    identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
51    instruments::{Instrument, InstrumentAny},
52    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
53    orders::{Order, OrderAny, OrderError},
54    position::Position,
55    types::{Money, Price, Quantity},
56};
57
58use crate::{
59    client::ExecutionClient,
60    messages::{
61        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
62        SubmitOrderList, TradingCommand,
63    },
64};
65
66pub struct ExecutionEngine {
67    clock: Rc<RefCell<dyn Clock>>,
68    cache: Rc<RefCell<Cache>>,
69    clients: HashMap<ClientId, Rc<dyn ExecutionClient>>,
70    default_client: Option<Rc<dyn ExecutionClient>>,
71    routing_map: HashMap<Venue, ClientId>,
72    oms_overrides: HashMap<StrategyId, OmsType>,
73    external_order_claims: HashMap<InstrumentId, StrategyId>,
74    pos_id_generator: PositionIdGenerator,
75    config: ExecutionEngineConfig,
76}
77
78impl ExecutionEngine {
79    pub fn new(
80        clock: Rc<RefCell<dyn Clock>>,
81        cache: Rc<RefCell<Cache>>,
82        config: Option<ExecutionEngineConfig>,
83    ) -> Self {
84        let trader_id = get_message_bus().borrow().trader_id;
85        Self {
86            clock: clock.clone(),
87            cache,
88            clients: HashMap::new(),
89            default_client: None,
90            routing_map: HashMap::new(),
91            oms_overrides: HashMap::new(),
92            external_order_claims: HashMap::new(),
93            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
94            config: config.unwrap_or_default(),
95        }
96    }
97
98    #[must_use]
99    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
100        self.pos_id_generator.count(strategy_id)
101    }
102
103    #[must_use]
104    pub fn check_integrity(&self) -> bool {
105        self.cache.borrow_mut().check_integrity()
106    }
107
108    #[must_use]
109    pub fn check_connected(&self) -> bool {
110        self.clients.values().all(|c| c.is_connected())
111    }
112
113    #[must_use]
114    pub fn check_disconnected(&self) -> bool {
115        self.clients.values().all(|c| !c.is_connected())
116    }
117
118    #[must_use]
119    pub fn check_residuals(&self) -> bool {
120        self.cache.borrow().check_residuals()
121    }
122
123    #[must_use]
124    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
125        self.external_order_claims.keys().copied().collect()
126    }
127
128    // -- REGISTRATION ----------------------------------------------------------------------------
129
130    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) -> anyhow::Result<()> {
131        if self.clients.contains_key(&client.client_id()) {
132            anyhow::bail!("Client already registered with ID {}", client.client_id());
133        }
134
135        // If client has venue, register routing
136        self.routing_map.insert(client.venue(), client.client_id());
137
138        log::info!("Registered client {}", client.client_id());
139        self.clients.insert(client.client_id(), client);
140        Ok(())
141    }
142
143    pub fn register_default_client(&mut self, client: Rc<dyn ExecutionClient>) {
144        log::info!("Registered default client {}", client.client_id());
145        self.default_client = Some(client);
146    }
147
148    #[must_use]
149    pub fn get_client(&self, client_id: &ClientId) -> Option<Rc<dyn ExecutionClient>> {
150        self.clients.get(client_id).cloned()
151    }
152
153    pub fn register_venue_routing(
154        &mut self,
155        client_id: ClientId,
156        venue: Venue,
157    ) -> anyhow::Result<()> {
158        if !self.clients.contains_key(&client_id) {
159            anyhow::bail!("No client registered with ID {client_id}");
160        }
161
162        self.routing_map.insert(venue, client_id);
163        log::info!("Set client {client_id} routing for {venue}");
164        Ok(())
165    }
166
167    // TODO: Implement `Strategy`
168    // pub fn register_external_order_claims(&mut self, strategy: Strategy) -> anyhow::Result<()> {
169    //     todo!();
170    // }
171
172    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
173        if self.clients.remove(&client_id).is_some() {
174            // Remove from routing map if present
175            self.routing_map
176                .retain(|_, mapped_id| mapped_id != &client_id);
177            log::info!("Deregistered client {client_id}");
178            Ok(())
179        } else {
180            anyhow::bail!("No client registered with ID {client_id}")
181        }
182    }
183
184    // -- COMMANDS --------------------------------------------------------------------------------
185
186    #[allow(clippy::await_holding_refcell_ref)]
187    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
188        let ts = SystemTime::now();
189
190        {
191            let mut cache = self.cache.borrow_mut();
192            cache.clear_index();
193            cache.cache_general()?;
194            self.cache.borrow_mut().cache_all().await?;
195            cache.build_index();
196            let _ = cache.check_integrity();
197
198            if self.config.manage_own_order_books {
199                for order in cache.orders(None, None, None, None) {
200                    if order.is_closed() || !should_handle_own_book_order(order) {
201                        continue;
202                    }
203                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
204                    own_book.add(order.to_own_book_order());
205                }
206            }
207        }
208
209        self.set_position_id_counts();
210
211        log::info!(
212            "Loaded cache in {}ms",
213            SystemTime::now()
214                .duration_since(ts)
215                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
216                .as_millis()
217        );
218
219        Ok(())
220    }
221
222    pub fn flush_db(&self) {
223        self.cache.borrow_mut().flush_db();
224    }
225
226    pub fn process(&mut self, event: &OrderEventAny) {
227        self.handle_event(event);
228    }
229
230    pub fn execute(&self, command: TradingCommand) {
231        self.execute_command(command);
232    }
233
234    // -- COMMAND HANDLERS ------------------------------------------------------------------------
235
236    fn execute_command(&self, command: TradingCommand) {
237        if self.config.debug {
238            log::debug!("{RECV}{CMD} {command:?}");
239        }
240
241        let client: Rc<dyn ExecutionClient> = if let Some(client) = self
242            .clients
243            .get(&command.client_id())
244            .or_else(|| {
245                self.routing_map
246                    .get(&command.instrument_id().venue)
247                    .and_then(|client_id| self.clients.get(client_id))
248            })
249            .or(self.default_client.as_ref())
250        {
251            client.clone()
252        } else {
253            log::error!(
254                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
255                command.client_id(),
256                command.instrument_id().venue,
257            );
258            return;
259        };
260
261        match command {
262            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
263            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
264            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
265            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
266            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
267            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
268            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
269        }
270    }
271
272    fn handle_submit_order(&self, client: Rc<dyn ExecutionClient>, command: SubmitOrder) {
273        let mut order = command.order.clone();
274        let client_order_id = order.client_order_id();
275        let instrument_id = order.instrument_id();
276
277        // Check if the order exists in the cache
278        if !self.cache.borrow().order_exists(&client_order_id) {
279            // Add order to cache in a separate scope to drop the mutable borrow
280            {
281                let mut cache = self.cache.borrow_mut();
282                if let Err(e) = cache.add_order(
283                    order.clone(),
284                    command.position_id,
285                    Some(command.client_id),
286                    true,
287                ) {
288                    log::error!("Error adding order to cache: {e}");
289                    return;
290                }
291            }
292
293            if self.config.snapshot_orders {
294                self.create_order_state_snapshot(&order);
295            }
296        }
297
298        // Get instrument in a separate scope to manage borrows
299        let instrument = {
300            let cache = self.cache.borrow();
301            if let Some(instrument) = cache.instrument(&instrument_id) {
302                instrument.clone()
303            } else {
304                log::error!(
305                    "Cannot handle submit order: no instrument found for {instrument_id}, {command}",
306                );
307                return;
308            }
309        };
310
311        // Handle quote quantity conversion
312        if !instrument.is_inverse() && order.is_quote_quantity() {
313            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
314
315            if let Some(price) = last_px {
316                let base_qty = instrument.get_base_quantity(order.quantity(), price);
317                self.set_order_base_qty(&mut order, base_qty);
318            } else {
319                self.deny_order(
320                    &order,
321                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
322                );
323                return;
324            }
325        }
326
327        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
328            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
329            own_book.add(order.to_own_book_order());
330        }
331
332        // Send the order to the execution client
333        // TODO: Avoid this clone
334        if let Err(e) = client.submit_order(command.clone()) {
335            log::error!("Error submitting order to client: {e}");
336            self.deny_order(
337                &command.order,
338                &format!("failed-to-submit-order-to-client: {e}"),
339            );
340        }
341    }
342
343    fn handle_submit_order_list(
344        &self,
345        client: Rc<dyn ExecutionClient>,
346        mut command: SubmitOrderList,
347    ) {
348        let orders = command.order_list.orders.clone();
349
350        // Cache orders
351        let mut cache = self.cache.borrow_mut();
352        for order in &orders {
353            if !cache.order_exists(&order.client_order_id()) {
354                if let Err(e) = cache.add_order(
355                    order.clone(),
356                    command.position_id,
357                    Some(command.client_id),
358                    true,
359                ) {
360                    log::error!("Error adding order to cache: {e}");
361                    return;
362                }
363
364                if self.config.snapshot_orders {
365                    self.create_order_state_snapshot(order);
366                }
367            }
368        }
369        drop(cache);
370
371        // Get instrument from cache
372        let cache = self.cache.borrow();
373        let instrument = if let Some(instrument) = cache.instrument(&command.instrument_id) {
374            instrument
375        } else {
376            log::error!(
377                "Cannot handle submit order list: no instrument found for {}, {command}",
378                command.instrument_id,
379            );
380            return;
381        };
382
383        // Check if converting quote quantity
384        if !instrument.is_inverse() && command.order_list.orders[0].is_quote_quantity() {
385            let mut quote_qty = None;
386            let mut last_px = None;
387
388            for order in &mut command.order_list.orders {
389                if !order.is_quote_quantity() {
390                    continue; // Base quantity already set
391                }
392
393                if Some(order.quantity()) != quote_qty {
394                    last_px =
395                        self.last_px_for_conversion(&order.instrument_id(), order.order_side());
396                    quote_qty = Some(order.quantity());
397                }
398
399                if let Some(px) = last_px {
400                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
401                    self.set_order_base_qty(order, base_qty);
402                } else {
403                    for order in &command.order_list.orders {
404                        self.deny_order(
405                            order,
406                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
407                        );
408                    }
409                    return; // Denied
410                }
411            }
412        }
413
414        if self.config.manage_own_order_books {
415            let mut own_book = self.get_or_init_own_order_book(&command.instrument_id);
416            for order in &command.order_list.orders {
417                if should_handle_own_book_order(order) {
418                    own_book.add(order.to_own_book_order());
419                }
420            }
421        }
422
423        // Send to execution client
424        if let Err(e) = client.submit_order_list(command) {
425            log::error!("Error submitting order list to client: {e}");
426            for order in &orders {
427                self.deny_order(
428                    order,
429                    &format!("failed-to-submit-order-list-to-client: {e}"),
430                );
431            }
432        }
433    }
434
435    fn handle_modify_order(&self, client: Rc<dyn ExecutionClient>, command: ModifyOrder) {
436        if let Err(e) = client.modify_order(command) {
437            log::error!("Error modifying order: {e}");
438        }
439    }
440
441    fn handle_cancel_order(&self, client: Rc<dyn ExecutionClient>, command: CancelOrder) {
442        if let Err(e) = client.cancel_order(command) {
443            log::error!("Error canceling order: {e}");
444        }
445    }
446
447    fn handle_cancel_all_orders(&self, client: Rc<dyn ExecutionClient>, command: CancelAllOrders) {
448        if let Err(e) = client.cancel_all_orders(command) {
449            log::error!("Error canceling all orders: {e}");
450        }
451    }
452
453    fn handle_batch_cancel_orders(
454        &self,
455        client: Rc<dyn ExecutionClient>,
456        command: BatchCancelOrders,
457    ) {
458        if let Err(e) = client.batch_cancel_orders(command) {
459            log::error!("Error batch canceling orders: {e}");
460        }
461    }
462
463    fn handle_query_order(&self, client: Rc<dyn ExecutionClient>, command: QueryOrder) {
464        if let Err(e) = client.query_order(command) {
465            log::error!("Error querying order: {e}");
466        }
467    }
468
469    fn create_order_state_snapshot(&self, order: &OrderAny) {
470        if self.config.debug {
471            log::debug!("Creating order state snapshot for {order}");
472        }
473
474        if self.cache.borrow().has_backing() {
475            if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
476                log::error!("Failed to snapshot order state: {e}");
477                return;
478            }
479        }
480
481        if get_message_bus().borrow().has_backing {
482            let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
483            msgbus::publish(&topic, order);
484        }
485    }
486
487    fn create_position_state_snapshot(&self, position: &Position) {
488        if self.config.debug {
489            log::debug!("Creating position state snapshot for {position}");
490        }
491
492        // let mut position: Position = position.clone();
493        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
494        //     position.unrealized_pnl(last)
495        // }
496
497        let topic = switchboard::get_positions_snapshots_topic(position.id);
498        msgbus::publish(&topic, position);
499    }
500
501    // -- EVENT HANDLERS --------------------------------------------------------------------------
502
503    fn handle_event(&mut self, event: &OrderEventAny) {
504        if self.config.debug {
505            log::debug!("{RECV}{EVT} {event:?}");
506        }
507
508        let client_order_id = event.client_order_id();
509        let cache = self.cache.borrow();
510        let mut order = if let Some(order) = cache.order(&client_order_id) {
511            order.clone()
512        } else {
513            log::warn!(
514                "Order with {} not found in the cache to apply {}",
515                event.client_order_id(),
516                event
517            );
518
519            // Try to find order by venue order ID if available
520            let venue_order_id = if let Some(id) = event.venue_order_id() {
521                id
522            } else {
523                log::error!(
524                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
525                    event.client_order_id()
526                );
527                return;
528            };
529
530            // Look up client order ID from venue order ID
531            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
532                id
533            } else {
534                log::error!(
535                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
536                    event.client_order_id(),
537                );
538                return;
539            };
540
541            // Get order using found client order ID
542            if let Some(order) = cache.order(client_order_id) {
543                log::info!("Order with {client_order_id} was found in the cache");
544                order.clone()
545            } else {
546                log::error!(
547                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
548                );
549                return;
550            }
551        };
552
553        drop(cache);
554        match event {
555            OrderEventAny::Filled(order_filled) => {
556                let oms_type = self.determine_oms_type(order_filled);
557                let position_id = self.determine_position_id(*order_filled, oms_type);
558
559                // Create a new fill with the determined position ID
560                let mut order_filled = *order_filled;
561                if order_filled.position_id.is_none() {
562                    order_filled.position_id = Some(position_id);
563                }
564
565                self.apply_event_to_order(&mut order, OrderEventAny::Filled(order_filled));
566                self.handle_order_fill(&order, order_filled, oms_type);
567            }
568            _ => {
569                self.apply_event_to_order(&mut order, event.clone());
570            }
571        }
572    }
573
574    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
575        // Check for strategy OMS override
576        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
577            return *oms_type;
578        }
579
580        // Use native venue OMS
581        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
582            if let Some(client) = self.clients.get(client_id) {
583                return client.oms_type();
584            }
585        }
586
587        if let Some(client) = &self.default_client {
588            return client.oms_type();
589        }
590
591        OmsType::Netting // Default fallback
592    }
593
594    fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
595        match oms_type {
596            OmsType::Hedging => self.determine_hedging_position_id(fill),
597            OmsType::Netting => self.determine_netting_position_id(fill),
598            _ => self.determine_netting_position_id(fill), // Default to netting
599        }
600    }
601
602    fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
603        // Check if position ID already exists
604        if let Some(position_id) = fill.position_id {
605            if self.config.debug {
606                log::debug!("Already had a position ID of: {position_id}");
607            }
608            return position_id;
609        }
610
611        // Check for order
612        let cache = self.cache.borrow();
613        let order = match cache.order(&fill.client_order_id()) {
614            Some(o) => o,
615            None => {
616                panic!(
617                    "Order for {} not found to determine position ID",
618                    fill.client_order_id()
619                );
620            }
621        };
622
623        // Check execution spawn orders
624        if let Some(spawn_id) = order.exec_spawn_id() {
625            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
626            for spawned_order in spawn_orders {
627                if let Some(pos_id) = spawned_order.position_id() {
628                    if self.config.debug {
629                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
630                    }
631                    return pos_id;
632                }
633            }
634        }
635
636        // Generate new position ID
637        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
638        if self.config.debug {
639            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
640        }
641        position_id
642    }
643
644    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
645        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
646    }
647
648    fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
649        if let Err(e) = order.apply(event.clone()) {
650            match e {
651                OrderError::InvalidStateTransition => {
652                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
653                }
654                _ => {
655                    log::error!("Error applying event: {e}, did not apply {event}");
656                }
657            }
658            return;
659        }
660
661        if let Err(e) = self.cache.borrow_mut().update_order(order) {
662            log::error!("Error updating order in cache: {e}");
663        }
664
665        let topic = switchboard::get_event_orders_topic(event.strategy_id());
666        msgbus::publish(&topic, order);
667
668        if self.config.snapshot_orders {
669            self.create_order_state_snapshot(order);
670        }
671    }
672
673    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
674        let instrument =
675            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
676                instrument.clone()
677            } else {
678                log::error!(
679                    "Cannot handle order fill: no instrument found for {}, {fill}",
680                    fill.instrument_id,
681                );
682                return;
683            };
684
685        if self.cache.borrow().account(&fill.account_id).is_none() {
686            log::error!(
687                "Cannot handle order fill: no account found for {}, {fill}",
688                fill.instrument_id.venue,
689            );
690            return;
691        }
692
693        let position_id = if let Some(position_id) = fill.position_id {
694            position_id
695        } else {
696            log::error!("Cannot handle order fill: no position ID found for fill {fill}");
697            return;
698        };
699
700        let mut position = match self.cache.borrow().position(&position_id) {
701            Some(pos) if !pos.is_closed() => pos.clone(),
702            _ => self
703                .open_position(instrument.clone(), None, fill, oms_type)
704                .unwrap(),
705        };
706
707        if self.will_flip_position(&position, fill) {
708            self.flip_position(instrument, &mut position, fill, oms_type);
709        } else {
710            self.update_position(&mut position, fill);
711        }
712
713        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
714            for client_order_id in order.linked_order_ids().unwrap_or_default() {
715                let mut cache = self.cache.borrow_mut();
716                let contingent_order = cache.mut_order(client_order_id);
717                if let Some(contingent_order) = contingent_order {
718                    if contingent_order.position_id().is_none() {
719                        contingent_order.set_position_id(Some(position_id));
720
721                        if let Err(e) = self.cache.borrow_mut().add_position_id(
722                            &position_id,
723                            &contingent_order.instrument_id().venue,
724                            &contingent_order.client_order_id(),
725                            &contingent_order.strategy_id(),
726                        ) {
727                            log::error!("Failed to add position ID: {e}");
728                        }
729                    }
730                }
731            }
732        }
733    }
734
735    fn open_position(
736        &self,
737        instrument: InstrumentAny,
738        position: Option<&Position>,
739        fill: OrderFilled,
740        oms_type: OmsType,
741    ) -> anyhow::Result<Position> {
742        let position = if let Some(position) = position {
743            // Always snapshot opening positions to handle NETTING OMS
744            self.cache.borrow_mut().snapshot_position(position)?;
745            let mut position = position.clone();
746            position.apply(&fill);
747            self.cache.borrow_mut().update_position(&position)?;
748            position
749        } else {
750            let position = Position::new(&instrument, fill);
751            self.cache
752                .borrow_mut()
753                .add_position(position.clone(), oms_type)?;
754            if self.config.snapshot_positions {
755                self.create_position_state_snapshot(&position);
756            }
757            position
758        };
759
760        let ts_init = self.clock.borrow().timestamp_ns();
761        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
762        let topic = switchboard::get_event_positions_topic(event.strategy_id);
763        msgbus::publish(&topic, &event);
764
765        Ok(position)
766    }
767
768    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
769        position.apply(&fill);
770
771        if let Err(e) = self.cache.borrow_mut().update_position(position) {
772            log::error!("Failed to update position: {e:?}");
773            return;
774        }
775
776        if self.config.snapshot_positions {
777            self.create_position_state_snapshot(position);
778        }
779
780        let topic = switchboard::get_event_positions_topic(position.strategy_id);
781        let ts_init = self.clock.borrow().timestamp_ns();
782
783        if position.is_closed() {
784            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
785            msgbus::publish(&topic, &event);
786        } else {
787            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
788            msgbus::publish(&topic, &event);
789        }
790    }
791
792    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
793        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
794    }
795
796    fn flip_position(
797        &mut self,
798        instrument: InstrumentAny,
799        position: &mut Position,
800        fill: OrderFilled,
801        oms_type: OmsType,
802    ) {
803        let difference = match position.side {
804            PositionSide::Long => Quantity::from_raw(
805                fill.last_qty.raw - position.quantity.raw,
806                position.size_precision,
807            ),
808            PositionSide::Short => Quantity::from_raw(
809                position.quantity.raw - fill.last_qty.raw,
810                position.size_precision,
811            ),
812            _ => fill.last_qty,
813        };
814
815        // Split commission between two positions
816        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
817        let (commission1, commission2) = if let Some(commission) = fill.commission {
818            let commission_currency = commission.currency;
819            let commission1 = Money::new(commission * fill_percent, commission_currency);
820            let commission2 = commission - commission1;
821            (Some(commission1), Some(commission2))
822        } else {
823            log::error!("Commission is not available.");
824            (None, None)
825        };
826
827        let mut fill_split1: Option<OrderFilled> = None;
828        if position.is_open() {
829            fill_split1 = Some(OrderFilled::new(
830                fill.trader_id,
831                fill.strategy_id,
832                fill.instrument_id,
833                fill.client_order_id,
834                fill.venue_order_id,
835                fill.account_id,
836                fill.trade_id,
837                fill.order_side,
838                fill.order_type,
839                position.quantity,
840                fill.last_px,
841                fill.currency,
842                fill.liquidity_side,
843                UUID4::new(),
844                fill.ts_event,
845                fill.ts_init,
846                fill.reconciliation,
847                fill.position_id,
848                commission1,
849            ));
850
851            self.update_position(position, fill_split1.unwrap());
852        }
853
854        // Guard against flipping a position with a zero fill size
855        if difference.raw == 0 {
856            log::warn!(
857                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
858            );
859            return;
860        }
861
862        let position_id_flip = if oms_type == OmsType::Hedging {
863            if let Some(position_id) = fill.position_id {
864                if position_id.is_virtual() {
865                    // Generate new position ID for flipped virtual position
866                    Some(self.pos_id_generator.generate(fill.strategy_id, true))
867                } else {
868                    Some(position_id)
869                }
870            } else {
871                None
872            }
873        } else {
874            fill.position_id
875        };
876
877        let fill_split2 = OrderFilled::new(
878            fill.trader_id,
879            fill.strategy_id,
880            fill.instrument_id,
881            fill.client_order_id,
882            fill.venue_order_id,
883            fill.account_id,
884            fill.trade_id,
885            fill.order_side,
886            fill.order_type,
887            difference,
888            fill.last_px,
889            fill.currency,
890            fill.liquidity_side,
891            UUID4::new(),
892            fill.ts_event,
893            fill.ts_init,
894            fill.reconciliation,
895            position_id_flip,
896            commission2,
897        );
898
899        if oms_type == OmsType::Hedging {
900            if let Some(position_id) = fill.position_id {
901                if position_id.is_virtual() {
902                    log::warn!("Closing position {fill_split1:?}");
903                    log::warn!("Flipping position {fill_split2:?}");
904                }
905            }
906        }
907
908        // Open flipped position
909        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
910            log::error!("Failed to open flipped position: {e:?}");
911        }
912    }
913
914    // -- INTERNAL --------------------------------------------------------------------------------
915
916    fn set_position_id_counts(&mut self) {
917        // For the internal position ID generator
918        let cache = self.cache.borrow();
919        let positions = cache.positions(None, None, None, None);
920
921        // Count positions per instrument_id using a HashMap
922        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
923
924        for position in positions {
925            *counts.entry(position.strategy_id).or_insert(0) += 1;
926        }
927
928        self.pos_id_generator.reset();
929
930        for (strategy_id, count) in counts {
931            self.pos_id_generator.set_count(count, strategy_id);
932            log::info!("Set PositionId count for {strategy_id} to {count}");
933        }
934    }
935
936    fn last_px_for_conversion(
937        &self,
938        instrument_id: &InstrumentId,
939        side: OrderSide,
940    ) -> Option<Price> {
941        let cache = self.cache.borrow();
942
943        // Try to get last trade price
944        if let Some(trade) = cache.trade(instrument_id) {
945            return Some(trade.price);
946        }
947
948        // Fall back to quote if available
949        if let Some(quote) = cache.quote(instrument_id) {
950            match side {
951                OrderSide::Buy => Some(quote.ask_price),
952                OrderSide::Sell => Some(quote.bid_price),
953                OrderSide::NoOrderSide => None,
954            }
955        } else {
956            None
957        }
958    }
959
960    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
961        log::info!(
962            "Setting {} order quote quantity {} to base quantity {}",
963            order.instrument_id(),
964            order.quantity(),
965            base_qty
966        );
967
968        let original_qty = order.quantity();
969        order.set_quantity(base_qty);
970        order.set_leaves_qty(base_qty);
971        order.set_is_quote_quantity(false);
972
973        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
974            return;
975        }
976
977        if let Some(linked_order_ids) = order.linked_order_ids() {
978            for client_order_id in linked_order_ids {
979                match self.cache.borrow_mut().mut_order(client_order_id) {
980                    Some(contingent_order) => {
981                        if !contingent_order.is_quote_quantity() {
982                            continue; // Already base quantity
983                        }
984
985                        if contingent_order.quantity() != original_qty {
986                            log::warn!(
987                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
988                                contingent_order.quantity(),
989                                original_qty,
990                                base_qty
991                            );
992                        }
993
994                        log::info!(
995                            "Setting {} order quote quantity {} to base quantity {}",
996                            contingent_order.instrument_id(),
997                            contingent_order.quantity(),
998                            base_qty
999                        );
1000
1001                        contingent_order.set_quantity(base_qty);
1002                        contingent_order.set_leaves_qty(base_qty);
1003                        contingent_order.set_is_quote_quantity(false);
1004                    }
1005                    None => {
1006                        log::error!("Contingency order {client_order_id} not found");
1007                    }
1008                }
1009            }
1010        } else {
1011            log::warn!(
1012                "No linked order IDs found for order {}",
1013                order.client_order_id()
1014            );
1015        }
1016    }
1017
1018    fn deny_order(&self, order: &OrderAny, reason: &str) {
1019        log::error!(
1020            "Order denied: {reason}, order ID: {}",
1021            order.client_order_id()
1022        );
1023
1024        let denied = OrderDenied::new(
1025            order.trader_id(),
1026            order.strategy_id(),
1027            order.instrument_id(),
1028            order.client_order_id(),
1029            reason.into(),
1030            UUID4::new(),
1031            self.clock.borrow().timestamp_ns(),
1032            self.clock.borrow().timestamp_ns(),
1033        );
1034
1035        let mut order = order.clone();
1036
1037        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1038            log::error!("Failed to apply denied event to order: {e}");
1039            return;
1040        }
1041
1042        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1043            log::error!("Failed to update order in cache: {e}");
1044            return;
1045        }
1046
1047        let topic = switchboard::get_event_orders_topic(order.strategy_id());
1048        msgbus::publish(&topic, &denied);
1049
1050        if self.config.snapshot_orders {
1051            self.create_order_state_snapshot(&order);
1052        }
1053    }
1054
1055    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<OwnOrderBook> {
1056        let mut cache = self.cache.borrow_mut();
1057        if cache.own_order_book_mut(instrument_id).is_none() {
1058            let own_book = OwnOrderBook::new(*instrument_id);
1059            cache.add_own_order_book(own_book).unwrap();
1060        }
1061
1062        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1063    }
1064}
1065
1066////////////////////////////////////////////////////////////////////////////////
1067// Tests
1068////////////////////////////////////////////////////////////////////////////////
1069#[cfg(test)]
1070mod tests {
1071    use std::{cell::RefCell, rc::Rc};
1072
1073    use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1074    use rstest::fixture;
1075
1076    use super::*;
1077
1078    #[fixture]
1079    fn msgbus() -> MessageBus {
1080        MessageBus::default()
1081    }
1082
1083    #[fixture]
1084    fn simple_cache() -> Cache {
1085        Cache::new(None, None)
1086    }
1087
1088    #[fixture]
1089    fn clock() -> TestClock {
1090        TestClock::new()
1091    }
1092
1093    // Helpers
1094    fn _get_exec_engine(
1095        cache: Rc<RefCell<Cache>>,
1096        clock: Rc<RefCell<TestClock>>,
1097        config: Option<ExecutionEngineConfig>,
1098    ) -> ExecutionEngine {
1099        ExecutionEngine::new(clock, cache, config)
1100    }
1101
1102    // TODO: After Implementing ExecutionClient & Strategy
1103}