nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24
25use std::{
26    cell::RefCell,
27    collections::{HashMap, HashSet},
28    rc::Rc,
29    time::SystemTime,
30};
31
32use config::ExecutionEngineConfig;
33use nautilus_common::{
34    cache::Cache,
35    clock::Clock,
36    generators::position_id::PositionIdGenerator,
37    logging::{CMD, EVT, RECV},
38    msgbus::MessageBus,
39};
40use nautilus_core::UUID4;
41use nautilus_model::{
42    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
43    events::{
44        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
45        PositionOpened,
46    },
47    identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
48    instruments::InstrumentAny,
49    orders::{OrderAny, OrderError},
50    position::Position,
51    types::{Money, Price, Quantity},
52};
53
54use crate::{
55    client::ExecutionClient,
56    messages::{
57        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
58        SubmitOrderList, TradingCommand,
59    },
60};
61
62pub struct ExecutionEngine {
63    clock: Rc<RefCell<dyn Clock>>,
64    cache: Rc<RefCell<Cache>>,
65    msgbus: Rc<RefCell<MessageBus>>,
66    clients: HashMap<ClientId, ExecutionClient>,
67    default_client: Option<ExecutionClient>,
68    routing_map: HashMap<Venue, ClientId>,
69    oms_overrides: HashMap<StrategyId, OmsType>,
70    external_order_claims: HashMap<InstrumentId, StrategyId>,
71    pos_id_generator: PositionIdGenerator,
72    config: ExecutionEngineConfig,
73}
74
75impl ExecutionEngine {
76    pub fn new(
77        clock: Rc<RefCell<dyn Clock>>,
78        cache: Rc<RefCell<Cache>>,
79        msgbus: Rc<RefCell<MessageBus>>,
80        config: ExecutionEngineConfig,
81    ) -> Self {
82        let trader_id = msgbus.borrow().trader_id;
83        Self {
84            clock: clock.clone(),
85            cache,
86            msgbus,
87            clients: HashMap::new(),
88            default_client: None,
89            routing_map: HashMap::new(),
90            oms_overrides: HashMap::new(),
91            external_order_claims: HashMap::new(),
92            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
93            config,
94        }
95    }
96
97    #[must_use]
98    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
99        self.pos_id_generator.count(strategy_id)
100    }
101
102    #[must_use]
103    pub fn check_integrity(&self) -> bool {
104        self.cache.borrow_mut().check_integrity()
105    }
106
107    #[must_use]
108    pub fn check_connected(&self) -> bool {
109        self.clients.values().all(|c| c.is_connected)
110    }
111
112    #[must_use]
113    pub fn check_disconnected(&self) -> bool {
114        self.clients.values().all(|c| !c.is_connected)
115    }
116
117    #[must_use]
118    pub fn check_residuals(&self) -> bool {
119        self.cache.borrow().check_residuals()
120    }
121
122    #[must_use]
123    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
124        self.external_order_claims.keys().copied().collect()
125    }
126
127    // -- REGISTRATION --------------------------------------------------------
128
129    pub fn register_client(&mut self, client: ExecutionClient) -> anyhow::Result<()> {
130        if self.clients.contains_key(&client.client_id) {
131            anyhow::bail!("Client already registered with ID {}", client.client_id);
132        }
133
134        // If client has venue, register routing
135        self.routing_map.insert(client.venue, client.client_id);
136
137        log::info!("Registered client {}", client.client_id);
138        self.clients.insert(client.client_id, client);
139        Ok(())
140    }
141
142    pub fn register_default_client(&mut self, client: ExecutionClient) {
143        log::info!("Registered default client {}", client.client_id);
144        self.default_client = Some(client);
145    }
146
147    pub fn register_venue_routing(
148        &mut self,
149        client_id: ClientId,
150        venue: Venue,
151    ) -> anyhow::Result<()> {
152        if !self.clients.contains_key(&client_id) {
153            anyhow::bail!("No client registered with ID {client_id}");
154        }
155
156        self.routing_map.insert(venue, client_id);
157        log::info!("Set client {client_id} routing for {venue}");
158        Ok(())
159    }
160
161    // TODO: Implement `Strategy`
162    // pub fn register_external_order_claims(&mut self, strategy: Strategy) -> anyhow::Result<()> {
163    //     todo!();
164    // }
165
166    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
167        if self.clients.remove(&client_id).is_some() {
168            // Remove from routing map if present
169            self.routing_map
170                .retain(|_, mapped_id| mapped_id != &client_id);
171            log::info!("Deregistered client {client_id}");
172            Ok(())
173        } else {
174            anyhow::bail!("No client registered with ID {client_id}")
175        }
176    }
177
178    // -- COMMANDS ------------------------------------------------------------
179
180    pub fn load_cache(&mut self) -> anyhow::Result<()> {
181        let ts = SystemTime::now();
182
183        {
184            let mut cache = self.cache.borrow_mut();
185            cache.cache_general()?;
186            cache.cache_currencies()?;
187            cache.cache_instruments()?;
188            cache.cache_accounts()?;
189            cache.cache_orders()?;
190            cache.cache_positions()?;
191
192            cache.build_index();
193            let _ = cache.check_integrity();
194        }
195
196        self.set_position_id_counts();
197
198        log::info!(
199            "Loaded cache in {}ms",
200            SystemTime::now()
201                .duration_since(ts)
202                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {}", e))?
203                .as_millis()
204        );
205
206        Ok(())
207    }
208
209    pub fn flush_db(&self) {
210        self.cache.borrow_mut().flush_db();
211    }
212
213    pub fn process(&mut self, event: &OrderEventAny) {
214        self.handle_event(event);
215    }
216
217    pub fn execute(&self, command: TradingCommand) {
218        self.execute_command(command);
219    }
220
221    // -- COMMAND HANDLERS ----------------------------------------------------
222
223    fn execute_command(&self, command: TradingCommand) {
224        if self.config.debug {
225            log::debug!("{RECV}{CMD} {command:?}");
226        }
227
228        let client = if let Some(client) = self
229            .clients
230            .get(&command.client_id())
231            .or_else(|| {
232                self.routing_map
233                    .get(&command.instrument_id().venue)
234                    .and_then(|client_id| self.clients.get(client_id))
235            })
236            .or(self.default_client.as_ref())
237        {
238            client
239        } else {
240            log::error!(
241                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
242                command.client_id(),
243                command.instrument_id().venue,
244            );
245            return;
246        };
247
248        match command {
249            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
250            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
251            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
252            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
253            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
254            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
255            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
256        }
257    }
258
259    fn handle_submit_order(&self, client: &ExecutionClient, command: SubmitOrder) {
260        let mut command = command;
261        let mut order = command.order.clone();
262        let client_order_id = order.client_order_id();
263        let instrument_id = order.instrument_id();
264
265        // Check if the order exists in the cache
266        if !self.cache.borrow().order_exists(&client_order_id) {
267            // Add order to cache in a separate scope to drop the mutable borrow
268            {
269                let mut cache = self.cache.borrow_mut();
270                if let Err(e) = cache.add_order(
271                    order.clone(),
272                    command.position_id,
273                    Some(command.client_id),
274                    true,
275                ) {
276                    log::error!("Error adding order to cache: {e}");
277                    return;
278                }
279            }
280
281            if self.config.snapshot_orders {
282                self.create_order_state_snapshot(&order);
283            }
284        }
285
286        // Get instrument in a separate scope to manage borrows
287        let instrument = {
288            let cache = self.cache.borrow();
289            if let Some(instrument) = cache.instrument(&instrument_id) {
290                instrument.clone()
291            } else {
292                log::error!(
293                    "Cannot handle submit order: no instrument found for {instrument_id}, {command}",
294                );
295                return;
296            }
297        };
298
299        // Handle quote quantity conversion
300        if !instrument.is_inverse() && order.is_quote_quantity() {
301            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
302
303            if let Some(price) = last_px {
304                let base_qty = instrument.get_base_quantity(order.quantity(), price);
305                self.set_order_base_qty(&mut order, base_qty);
306            } else {
307                self.deny_order(
308                    &order,
309                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
310                );
311                return;
312            }
313        }
314
315        command.order = order;
316
317        // Send the order to the execution client
318        if let Err(e) = client.submit_order(command.clone()) {
319            log::error!("Error submitting order to client: {e}");
320            self.deny_order(
321                &command.order,
322                &format!("failed-to-submit-order-to-client: {e}"),
323            );
324        }
325    }
326
327    fn handle_submit_order_list(&self, client: &ExecutionClient, mut command: SubmitOrderList) {
328        let orders = command.order_list.orders.clone();
329
330        // Cache orders
331        let mut cache = self.cache.borrow_mut();
332        for order in &orders {
333            if !cache.order_exists(&order.client_order_id()) {
334                if let Err(e) = cache.add_order(
335                    order.clone(),
336                    command.position_id,
337                    Some(command.client_id),
338                    true,
339                ) {
340                    log::error!("Error adding order to cache: {e}");
341                    return;
342                }
343
344                if self.config.snapshot_orders {
345                    self.create_order_state_snapshot(order);
346                }
347            }
348        }
349        drop(cache);
350
351        // Get instrument from cache
352        let cache = self.cache.borrow();
353        let instrument = if let Some(instrument) = cache.instrument(&command.instrument_id) {
354            instrument
355        } else {
356            log::error!(
357                "Cannot handle submit order list: no instrument found for {}, {command}",
358                command.instrument_id,
359            );
360            return;
361        };
362
363        // Check if converting quote quantity
364        if !instrument.is_inverse() && command.order_list.orders[0].is_quote_quantity() {
365            let mut quote_qty = None;
366            let mut last_px = None;
367
368            for order in &mut command.order_list.orders {
369                if !order.is_quote_quantity() {
370                    continue; // Base quantity already set
371                }
372
373                if Some(order.quantity()) != quote_qty {
374                    last_px =
375                        self.last_px_for_conversion(&order.instrument_id(), order.order_side());
376                    quote_qty = Some(order.quantity());
377                }
378
379                if let Some(px) = last_px {
380                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
381                    self.set_order_base_qty(order, base_qty);
382                } else {
383                    for order in &command.order_list.orders {
384                        self.deny_order(
385                            order,
386                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
387                        );
388                    }
389                    return; // Denied
390                }
391            }
392        }
393
394        // Send to execution client
395        if let Err(e) = client.submit_order_list(command) {
396            log::error!("Error submitting order list to client: {e}");
397            for order in &orders {
398                self.deny_order(
399                    order,
400                    &format!("failed-to-submit-order-list-to-client: {e}"),
401                );
402            }
403        }
404    }
405
406    fn handle_modify_order(&self, client: &ExecutionClient, command: ModifyOrder) {
407        if let Err(e) = client.modify_order(command) {
408            log::error!("Error modifying order: {e}");
409        }
410    }
411
412    fn handle_cancel_order(&self, client: &ExecutionClient, command: CancelOrder) {
413        if let Err(e) = client.cancel_order(command) {
414            log::error!("Error canceling order: {e}");
415        }
416    }
417
418    fn handle_cancel_all_orders(&self, client: &ExecutionClient, command: CancelAllOrders) {
419        if let Err(e) = client.cancel_all_orders(command) {
420            log::error!("Error canceling all orders: {e}");
421        }
422    }
423
424    fn handle_batch_cancel_orders(&self, client: &ExecutionClient, command: BatchCancelOrders) {
425        if let Err(e) = client.batch_cancel_orders(command) {
426            log::error!("Error batch canceling orders: {e}");
427        }
428    }
429
430    fn handle_query_order(&self, client: &ExecutionClient, command: QueryOrder) {
431        if let Err(e) = client.query_order(command) {
432            log::error!("Error querying order: {e}");
433        }
434    }
435
436    fn create_order_state_snapshot(&self, order: &OrderAny) {
437        if self.config.debug {
438            log::debug!("Creating order state snapshot for {order}");
439        }
440
441        if self.cache.borrow().has_backing() {
442            if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
443                log::error!("Failed to snapshot order state: {e}");
444                return;
445            }
446        }
447
448        let mut msgbus = self.msgbus.borrow_mut();
449        if msgbus.has_backing {
450            let topic = msgbus
451                .switchboard
452                .get_order_snapshots_topic(order.client_order_id());
453            msgbus.publish(&topic, order);
454        }
455    }
456
457    fn create_position_state_snapshot(&self, position: &Position) {
458        if self.config.debug {
459            log::debug!("Creating position state snapshot for {position}");
460        }
461
462        // let mut position: Position = position.clone();
463        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
464        //     position.unrealized_pnl(last)
465        // }
466
467        let mut msgbus = self.msgbus.borrow_mut();
468        let topic = msgbus
469            .switchboard
470            .get_positions_snapshots_topic(position.id);
471        msgbus.publish(&topic, position);
472    }
473
474    // -- EVENT HANDLERS ----------------------------------------------------
475
476    fn handle_event(&mut self, event: &OrderEventAny) {
477        if self.config.debug {
478            log::debug!("{RECV}{EVT} {event:?}");
479        }
480
481        let client_order_id = event.client_order_id();
482        let borrowed_cache = self.cache.borrow();
483        let mut order = if let Some(order) = borrowed_cache.order(&client_order_id) {
484            order.clone()
485        } else {
486            log::warn!(
487                "Order with {} not found in the cache to apply {}",
488                event.client_order_id(),
489                event
490            );
491
492            // Try to find order by venue order ID if available
493            let venue_order_id = if let Some(id) = event.venue_order_id() {
494                id
495            } else {
496                log::error!(
497                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
498                    event.client_order_id()
499                );
500                return;
501            };
502
503            // Look up client order ID from venue order ID
504            let client_order_id = if let Some(id) = borrowed_cache.client_order_id(&venue_order_id)
505            {
506                id
507            } else {
508                log::error!(
509                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
510                    event.client_order_id(),
511                );
512                return;
513            };
514
515            // Get order using found client order ID
516            if let Some(order) = borrowed_cache.order(client_order_id) {
517                log::info!("Order with {client_order_id} was found in the cache");
518                order.clone()
519            } else {
520                log::error!(
521                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
522                );
523                return;
524            }
525        };
526
527        drop(borrowed_cache);
528        match event {
529            OrderEventAny::Filled(order_filled) => {
530                let oms_type = self.determine_oms_type(order_filled);
531                let position_id = self.determine_position_id(*order_filled, oms_type);
532
533                // Create a new fill with the determined position ID
534                let mut order_filled = *order_filled;
535                if order_filled.position_id.is_none() {
536                    order_filled.position_id = Some(position_id);
537                }
538
539                self.apply_event_to_order(&mut order, OrderEventAny::Filled(order_filled));
540                self.handle_order_fill(&order, order_filled, oms_type);
541            }
542            _ => {
543                self.apply_event_to_order(&mut order, event.clone());
544            }
545        }
546    }
547
548    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
549        // Check for strategy OMS override
550        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
551            return *oms_type;
552        }
553
554        // Use native venue OMS
555        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
556            if let Some(client) = self.clients.get(client_id) {
557                return client.oms_type;
558            }
559        }
560
561        if let Some(client) = &self.default_client {
562            return client.oms_type;
563        }
564
565        OmsType::Netting // Default fallback
566    }
567
568    fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
569        match oms_type {
570            OmsType::Hedging => self.determine_hedging_position_id(fill),
571            OmsType::Netting => self.determine_netting_position_id(fill),
572            _ => self.determine_netting_position_id(fill), // Default to netting
573        }
574    }
575
576    fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
577        // Check if position ID already exists
578        if let Some(position_id) = fill.position_id {
579            if self.config.debug {
580                log::debug!("Already had a position ID of: {}", position_id);
581            }
582            return position_id;
583        }
584
585        // Check for order
586        let cache = self.cache.borrow();
587        let order = match cache.order(&fill.client_order_id()) {
588            Some(o) => o,
589            None => {
590                panic!(
591                    "Order for {} not found to determine position ID",
592                    fill.client_order_id()
593                );
594            }
595        };
596
597        // Check execution spawn orders
598        if let Some(spawn_id) = order.exec_spawn_id() {
599            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
600            for spawned_order in spawn_orders {
601                if let Some(pos_id) = spawned_order.position_id() {
602                    if self.config.debug {
603                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
604                    }
605                    return pos_id;
606                }
607            }
608        }
609
610        // Generate new position ID
611        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
612        if self.config.debug {
613            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
614        }
615        position_id
616    }
617
618    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
619        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
620    }
621
622    fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
623        if let Err(e) = order.apply(event.clone()) {
624            match e {
625                OrderError::InvalidStateTransition => {
626                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
627                }
628                _ => {
629                    log::error!("Error applying event: {e}, did not apply {event}");
630                }
631            }
632            return;
633        }
634
635        if let Err(e) = self.cache.borrow_mut().update_order(order) {
636            log::error!("Error updating order in cache: {e}");
637        }
638
639        let mut msgbus = self.msgbus.borrow_mut();
640        let topic = msgbus
641            .switchboard
642            .get_event_orders_topic(event.strategy_id());
643        msgbus.publish(&topic, order);
644
645        if self.config.snapshot_orders {
646            self.create_order_state_snapshot(order);
647        }
648    }
649
650    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
651        let instrument =
652            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
653                instrument.clone()
654            } else {
655                log::error!(
656                    "Cannot handle order fill: no instrument found for {}, {fill}",
657                    fill.instrument_id,
658                );
659                return;
660            };
661
662        if self.cache.borrow().account(&fill.account_id).is_none() {
663            log::error!(
664                "Cannot handle order fill: no account found for {}, {fill}",
665                fill.instrument_id.venue,
666            );
667            return;
668        }
669
670        let position_id = if let Some(position_id) = fill.position_id {
671            position_id
672        } else {
673            log::error!("Cannot handle order fill: no position ID found for fill {fill}",);
674            return;
675        };
676
677        let mut position = match self.cache.borrow().position(&position_id) {
678            Some(pos) if !pos.is_closed() => pos.clone(),
679            _ => self
680                .open_position(instrument.clone(), None, fill, oms_type)
681                .unwrap(),
682        };
683
684        if self.will_flip_position(&position, fill) {
685            self.flip_position(instrument, &mut position, fill, oms_type);
686        } else {
687            self.update_position(&mut position, fill);
688        }
689
690        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
691            for client_order_id in order.linked_order_ids().unwrap_or_default() {
692                let mut cache = self.cache.borrow_mut();
693                let contingent_order = cache.mut_order(&client_order_id);
694                if let Some(contingent_order) = contingent_order {
695                    if contingent_order.position_id().is_none() {
696                        contingent_order.set_position_id(Some(position_id));
697
698                        if let Err(e) = self.cache.borrow_mut().add_position_id(
699                            &position_id,
700                            &contingent_order.instrument_id().venue,
701                            &contingent_order.client_order_id(),
702                            &contingent_order.strategy_id(),
703                        ) {
704                            log::error!("Failed to add position ID: {e}");
705                        }
706                    }
707                }
708            }
709        }
710    }
711
712    fn open_position(
713        &self,
714        instrument: InstrumentAny,
715        position: Option<&Position>,
716        fill: OrderFilled,
717        oms_type: OmsType,
718    ) -> anyhow::Result<Position> {
719        let position = if let Some(position) = position {
720            // Always snapshot opening positions to handle NETTING OMS
721            self.cache.borrow_mut().snapshot_position(position)?;
722            let mut position = position.clone();
723            position.apply(&fill);
724            self.cache.borrow_mut().update_position(&position)?;
725            position
726        } else {
727            let position = Position::new(&instrument, fill);
728            self.cache
729                .borrow_mut()
730                .add_position(position.clone(), oms_type)?;
731            if self.config.snapshot_positions {
732                self.create_position_state_snapshot(&position);
733            }
734            position
735        };
736
737        let ts_init = self.clock.borrow().timestamp_ns();
738        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
739        let mut msgbus = self.msgbus.borrow_mut();
740        let topic = msgbus
741            .switchboard
742            .get_event_positions_topic(event.strategy_id);
743        msgbus.publish(&topic, &event);
744
745        Ok(position)
746    }
747
748    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
749        position.apply(&fill);
750
751        if let Err(e) = self.cache.borrow_mut().update_position(position) {
752            log::error!("Failed to update position: {e:?}");
753            return;
754        }
755
756        if self.config.snapshot_positions {
757            self.create_position_state_snapshot(position);
758        }
759
760        let mut msgbus = self.msgbus.borrow_mut();
761        let topic = msgbus
762            .switchboard
763            .get_event_positions_topic(position.strategy_id);
764        let ts_init = self.clock.borrow().timestamp_ns();
765
766        if position.is_closed() {
767            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
768            msgbus.publish(&topic, &event);
769        } else {
770            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
771            msgbus.publish(&topic, &event);
772        }
773    }
774
775    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
776        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
777    }
778
779    fn flip_position(
780        &mut self,
781        instrument: InstrumentAny,
782        position: &mut Position,
783        fill: OrderFilled,
784        oms_type: OmsType,
785    ) {
786        let difference = match position.side {
787            PositionSide::Long => Quantity::from_raw(
788                fill.last_qty.raw - position.quantity.raw,
789                position.size_precision,
790            ),
791            PositionSide::Short => Quantity::from_raw(
792                position.quantity.raw - fill.last_qty.raw,
793                position.size_precision,
794            ),
795            _ => fill.last_qty,
796        };
797
798        // Split commission between two positions
799        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
800        let (commission1, commission2) = if let Some(commission) = fill.commission {
801            let commission_currency = commission.currency;
802            let commission1 = Money::new(commission * fill_percent, commission_currency);
803            let commission2 = commission - commission1;
804            (Some(commission1), Some(commission2))
805        } else {
806            log::error!("Commission is not available.");
807            (None, None)
808        };
809
810        let mut fill_split1: Option<OrderFilled> = None;
811        if position.is_open() {
812            fill_split1 = Some(OrderFilled::new(
813                fill.trader_id,
814                fill.strategy_id,
815                fill.instrument_id,
816                fill.client_order_id,
817                fill.venue_order_id,
818                fill.account_id,
819                fill.trade_id,
820                fill.order_side,
821                fill.order_type,
822                position.quantity,
823                fill.last_px,
824                fill.currency,
825                fill.liquidity_side,
826                UUID4::new(),
827                fill.ts_event,
828                fill.ts_init,
829                fill.reconciliation,
830                fill.position_id,
831                commission1,
832            ));
833
834            self.update_position(position, fill_split1.unwrap());
835        }
836
837        // Guard against flipping a position with a zero fill size
838        if difference.raw == 0 {
839            log::warn!(
840                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
841            );
842            return;
843        }
844
845        let position_id_flip = if oms_type == OmsType::Hedging {
846            if let Some(position_id) = fill.position_id {
847                if position_id.is_virtual() {
848                    // Generate new position ID for flipped virtual position
849                    Some(self.pos_id_generator.generate(fill.strategy_id, true))
850                } else {
851                    Some(position_id)
852                }
853            } else {
854                None
855            }
856        } else {
857            fill.position_id
858        };
859
860        let fill_split2 = OrderFilled::new(
861            fill.trader_id,
862            fill.strategy_id,
863            fill.instrument_id,
864            fill.client_order_id,
865            fill.venue_order_id,
866            fill.account_id,
867            fill.trade_id,
868            fill.order_side,
869            fill.order_type,
870            difference,
871            fill.last_px,
872            fill.currency,
873            fill.liquidity_side,
874            UUID4::new(),
875            fill.ts_event,
876            fill.ts_init,
877            fill.reconciliation,
878            position_id_flip,
879            commission2,
880        );
881
882        if oms_type == OmsType::Hedging {
883            if let Some(position_id) = fill.position_id {
884                if position_id.is_virtual() {
885                    log::warn!("Closing position {fill_split1:?}");
886                    log::warn!("Flipping position {fill_split2:?}");
887                }
888            }
889        }
890
891        // Open flipped position
892        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
893            log::error!("Failed to open flipped position: {e:?}");
894        }
895    }
896
897    // -- INTERNAL ------------------------------------------------------------
898
899    fn set_position_id_counts(&mut self) {
900        // For the internal position ID generator
901        let borrowed_cache = self.cache.borrow();
902        let positions = borrowed_cache.positions(None, None, None, None);
903
904        // Count positions per instrument_id using a HashMap
905        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
906
907        for position in positions {
908            *counts.entry(position.strategy_id).or_insert(0) += 1;
909        }
910
911        self.pos_id_generator.reset();
912
913        for (strategy_id, count) in counts {
914            self.pos_id_generator.set_count(count, strategy_id);
915            log::info!("Set PositionId count for {strategy_id} to {count}");
916        }
917    }
918
919    fn last_px_for_conversion(
920        &self,
921        instrument_id: &InstrumentId,
922        side: OrderSide,
923    ) -> Option<Price> {
924        let cache = self.cache.borrow();
925
926        // Try to get last trade price
927        if let Some(trade) = cache.trade(instrument_id) {
928            return Some(trade.price);
929        }
930
931        // Fall back to quote if available
932        if let Some(quote) = cache.quote(instrument_id) {
933            match side {
934                OrderSide::Buy => Some(quote.ask_price),
935                OrderSide::Sell => Some(quote.bid_price),
936                OrderSide::NoOrderSide => None,
937            }
938        } else {
939            None
940        }
941    }
942
943    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
944        log::info!(
945            "Setting {} order quote quantity {} to base quantity {}",
946            order.instrument_id(),
947            order.quantity(),
948            base_qty
949        );
950
951        let original_qty = order.quantity();
952        order.set_quantity(base_qty);
953        order.set_leaves_qty(base_qty);
954        order.set_is_quote_quantity(false);
955
956        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
957            return;
958        }
959
960        if let Some(linked_order_ids) = order.linked_order_ids() {
961            for client_order_id in linked_order_ids {
962                match self.cache.borrow_mut().mut_order(&client_order_id) {
963                    Some(contingent_order) => {
964                        if !contingent_order.is_quote_quantity() {
965                            continue; // Already base quantity
966                        }
967
968                        if contingent_order.quantity() != original_qty {
969                            log::warn!(
970                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
971                                contingent_order.quantity(),
972                                original_qty,
973                                base_qty
974                            );
975                        }
976
977                        log::info!(
978                            "Setting {} order quote quantity {} to base quantity {}",
979                            contingent_order.instrument_id(),
980                            contingent_order.quantity(),
981                            base_qty
982                        );
983
984                        contingent_order.set_quantity(base_qty);
985                        contingent_order.set_leaves_qty(base_qty);
986                        contingent_order.set_is_quote_quantity(false);
987                    }
988                    None => {
989                        log::error!("Contingency order {client_order_id} not found");
990                    }
991                }
992            }
993        } else {
994            log::warn!(
995                "No linked order IDs found for order {}",
996                order.client_order_id()
997            );
998        }
999    }
1000
1001    fn deny_order(&self, order: &OrderAny, reason: &str) {
1002        log::error!(
1003            "Order denied: {reason}, order ID: {}",
1004            order.client_order_id()
1005        );
1006
1007        let denied = OrderDenied::new(
1008            order.trader_id(),
1009            order.strategy_id(),
1010            order.instrument_id(),
1011            order.client_order_id(),
1012            reason.into(),
1013            UUID4::new(),
1014            self.clock.borrow().timestamp_ns(),
1015            self.clock.borrow().timestamp_ns(),
1016        );
1017
1018        let mut order = order.clone();
1019
1020        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1021            log::error!("Failed to apply denied event to order: {e}");
1022            return;
1023        }
1024
1025        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1026            log::error!("Failed to update order in cache: {e}");
1027            return;
1028        }
1029
1030        let mut msgbus = self.msgbus.borrow_mut();
1031        let topic = msgbus
1032            .switchboard
1033            .get_event_orders_topic(order.strategy_id());
1034        msgbus.publish(&topic, &denied);
1035
1036        if self.config.snapshot_orders {
1037            self.create_order_state_snapshot(&order);
1038        }
1039    }
1040}
1041
1042////////////////////////////////////////////////////////////////////////////////
1043// Tests
1044////////////////////////////////////////////////////////////////////////////////
1045#[cfg(test)]
1046mod tests {
1047    use std::{cell::RefCell, rc::Rc};
1048
1049    use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1050    use rstest::fixture;
1051
1052    use super::*;
1053
1054    #[fixture]
1055    fn msgbus() -> MessageBus {
1056        MessageBus::default()
1057    }
1058
1059    #[fixture]
1060    fn simple_cache() -> Cache {
1061        Cache::new(None, None)
1062    }
1063
1064    #[fixture]
1065    fn clock() -> TestClock {
1066        TestClock::new()
1067    }
1068
1069    // Helpers
1070    fn _get_exec_engine(
1071        msgbus: Rc<RefCell<MessageBus>>,
1072        cache: Rc<RefCell<Cache>>,
1073        clock: Rc<RefCell<TestClock>>,
1074        config: Option<ExecutionEngineConfig>,
1075    ) -> ExecutionEngine {
1076        let config = config.unwrap_or_default();
1077        ExecutionEngine::new(clock, cache, msgbus, config)
1078    }
1079
1080    // TODO: After Implementing ExecutionClient & Strategy
1081}