nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24
25use std::{
26    cell::RefCell,
27    collections::{HashMap, HashSet},
28    rc::Rc,
29    time::SystemTime,
30};
31
32use config::ExecutionEngineConfig;
33use nautilus_common::{
34    cache::Cache,
35    clock::Clock,
36    generators::position_id::PositionIdGenerator,
37    logging::{CMD, EVT, RECV},
38    msgbus::MessageBus,
39};
40use nautilus_core::UUID4;
41use nautilus_model::{
42    enums::{ContingencyType, OmsType, OrderSide, PositionSide},
43    events::{
44        OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
45        PositionOpened,
46    },
47    identifiers::{ClientId, InstrumentId, PositionId, StrategyId, Venue},
48    instruments::InstrumentAny,
49    orders::{OrderAny, OrderError},
50    position::Position,
51    types::{Money, Price, Quantity},
52};
53
54use crate::{
55    client::ExecutionClient,
56    messages::{
57        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder, SubmitOrder,
58        SubmitOrderList, TradingCommand,
59    },
60};
61
62pub struct ExecutionEngine {
63    clock: Rc<RefCell<dyn Clock>>,
64    cache: Rc<RefCell<Cache>>,
65    msgbus: Rc<RefCell<MessageBus>>,
66    clients: HashMap<ClientId, ExecutionClient>,
67    default_client: Option<ExecutionClient>,
68    routing_map: HashMap<Venue, ClientId>,
69    oms_overrides: HashMap<StrategyId, OmsType>,
70    external_order_claims: HashMap<InstrumentId, StrategyId>,
71    pos_id_generator: PositionIdGenerator,
72    config: ExecutionEngineConfig,
73}
74
75impl ExecutionEngine {
76    pub fn new(
77        clock: Rc<RefCell<dyn Clock>>,
78        cache: Rc<RefCell<Cache>>,
79        msgbus: Rc<RefCell<MessageBus>>,
80        config: Option<ExecutionEngineConfig>,
81    ) -> Self {
82        let trader_id = msgbus.borrow().trader_id;
83        Self {
84            clock: clock.clone(),
85            cache,
86            msgbus,
87            clients: HashMap::new(),
88            default_client: None,
89            routing_map: HashMap::new(),
90            oms_overrides: HashMap::new(),
91            external_order_claims: HashMap::new(),
92            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
93            config: config.unwrap_or_default(),
94        }
95    }
96
97    #[must_use]
98    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
99        self.pos_id_generator.count(strategy_id)
100    }
101
102    #[must_use]
103    pub fn check_integrity(&self) -> bool {
104        self.cache.borrow_mut().check_integrity()
105    }
106
107    #[must_use]
108    pub fn check_connected(&self) -> bool {
109        self.clients.values().all(|c| c.is_connected)
110    }
111
112    #[must_use]
113    pub fn check_disconnected(&self) -> bool {
114        self.clients.values().all(|c| !c.is_connected)
115    }
116
117    #[must_use]
118    pub fn check_residuals(&self) -> bool {
119        self.cache.borrow().check_residuals()
120    }
121
122    #[must_use]
123    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
124        self.external_order_claims.keys().copied().collect()
125    }
126
127    // -- REGISTRATION --------------------------------------------------------
128
129    pub fn register_client(&mut self, client: ExecutionClient) -> anyhow::Result<()> {
130        if self.clients.contains_key(&client.client_id) {
131            anyhow::bail!("Client already registered with ID {}", client.client_id);
132        }
133
134        // If client has venue, register routing
135        self.routing_map.insert(client.venue, client.client_id);
136
137        log::info!("Registered client {}", client.client_id);
138        self.clients.insert(client.client_id, client);
139        Ok(())
140    }
141
142    pub fn register_default_client(&mut self, client: ExecutionClient) {
143        log::info!("Registered default client {}", client.client_id);
144        self.default_client = Some(client);
145    }
146
147    pub fn register_venue_routing(
148        &mut self,
149        client_id: ClientId,
150        venue: Venue,
151    ) -> anyhow::Result<()> {
152        if !self.clients.contains_key(&client_id) {
153            anyhow::bail!("No client registered with ID {client_id}");
154        }
155
156        self.routing_map.insert(venue, client_id);
157        log::info!("Set client {client_id} routing for {venue}");
158        Ok(())
159    }
160
161    // TODO: Implement `Strategy`
162    // pub fn register_external_order_claims(&mut self, strategy: Strategy) -> anyhow::Result<()> {
163    //     todo!();
164    // }
165
166    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
167        if self.clients.remove(&client_id).is_some() {
168            // Remove from routing map if present
169            self.routing_map
170                .retain(|_, mapped_id| mapped_id != &client_id);
171            log::info!("Deregistered client {client_id}");
172            Ok(())
173        } else {
174            anyhow::bail!("No client registered with ID {client_id}")
175        }
176    }
177
178    // -- COMMANDS ------------------------------------------------------------
179    #[allow(clippy::await_holding_refcell_ref)]
180    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
181        let ts = SystemTime::now();
182
183        {
184            let mut cache = self.cache.borrow_mut();
185            cache.cache_general()?;
186            self.cache.borrow_mut().cache_all().await?;
187            cache.build_index();
188            let _ = cache.check_integrity();
189        }
190
191        self.set_position_id_counts();
192
193        log::info!(
194            "Loaded cache in {}ms",
195            SystemTime::now()
196                .duration_since(ts)
197                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {}", e))?
198                .as_millis()
199        );
200
201        Ok(())
202    }
203
204    pub fn flush_db(&self) {
205        self.cache.borrow_mut().flush_db();
206    }
207
208    pub fn process(&mut self, event: &OrderEventAny) {
209        self.handle_event(event);
210    }
211
212    pub fn execute(&self, command: TradingCommand) {
213        self.execute_command(command);
214    }
215
216    // -- COMMAND HANDLERS ----------------------------------------------------
217
218    fn execute_command(&self, command: TradingCommand) {
219        if self.config.debug {
220            log::debug!("{RECV}{CMD} {command:?}");
221        }
222
223        let client = if let Some(client) = self
224            .clients
225            .get(&command.client_id())
226            .or_else(|| {
227                self.routing_map
228                    .get(&command.instrument_id().venue)
229                    .and_then(|client_id| self.clients.get(client_id))
230            })
231            .or(self.default_client.as_ref())
232        {
233            client
234        } else {
235            log::error!(
236                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
237                command.client_id(),
238                command.instrument_id().venue,
239            );
240            return;
241        };
242
243        match command {
244            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
245            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
246            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
247            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
248            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
249            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
250            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
251        }
252    }
253
254    fn handle_submit_order(&self, client: &ExecutionClient, command: SubmitOrder) {
255        let mut command = command;
256        let mut order = command.order.clone();
257        let client_order_id = order.client_order_id();
258        let instrument_id = order.instrument_id();
259
260        // Check if the order exists in the cache
261        if !self.cache.borrow().order_exists(&client_order_id) {
262            // Add order to cache in a separate scope to drop the mutable borrow
263            {
264                let mut cache = self.cache.borrow_mut();
265                if let Err(e) = cache.add_order(
266                    order.clone(),
267                    command.position_id,
268                    Some(command.client_id),
269                    true,
270                ) {
271                    log::error!("Error adding order to cache: {e}");
272                    return;
273                }
274            }
275
276            if self.config.snapshot_orders {
277                self.create_order_state_snapshot(&order);
278            }
279        }
280
281        // Get instrument in a separate scope to manage borrows
282        let instrument = {
283            let cache = self.cache.borrow();
284            if let Some(instrument) = cache.instrument(&instrument_id) {
285                instrument.clone()
286            } else {
287                log::error!(
288                    "Cannot handle submit order: no instrument found for {instrument_id}, {command}",
289                );
290                return;
291            }
292        };
293
294        // Handle quote quantity conversion
295        if !instrument.is_inverse() && order.is_quote_quantity() {
296            let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
297
298            if let Some(price) = last_px {
299                let base_qty = instrument.get_base_quantity(order.quantity(), price);
300                self.set_order_base_qty(&mut order, base_qty);
301            } else {
302                self.deny_order(
303                    &order,
304                    &format!("no-price-to-convert-quote-qty {instrument_id}"),
305                );
306                return;
307            }
308        }
309
310        command.order = order;
311
312        // Send the order to the execution client
313        if let Err(e) = client.submit_order(command.clone()) {
314            log::error!("Error submitting order to client: {e}");
315            self.deny_order(
316                &command.order,
317                &format!("failed-to-submit-order-to-client: {e}"),
318            );
319        }
320    }
321
322    fn handle_submit_order_list(&self, client: &ExecutionClient, mut command: SubmitOrderList) {
323        let orders = command.order_list.orders.clone();
324
325        // Cache orders
326        let mut cache = self.cache.borrow_mut();
327        for order in &orders {
328            if !cache.order_exists(&order.client_order_id()) {
329                if let Err(e) = cache.add_order(
330                    order.clone(),
331                    command.position_id,
332                    Some(command.client_id),
333                    true,
334                ) {
335                    log::error!("Error adding order to cache: {e}");
336                    return;
337                }
338
339                if self.config.snapshot_orders {
340                    self.create_order_state_snapshot(order);
341                }
342            }
343        }
344        drop(cache);
345
346        // Get instrument from cache
347        let cache = self.cache.borrow();
348        let instrument = if let Some(instrument) = cache.instrument(&command.instrument_id) {
349            instrument
350        } else {
351            log::error!(
352                "Cannot handle submit order list: no instrument found for {}, {command}",
353                command.instrument_id,
354            );
355            return;
356        };
357
358        // Check if converting quote quantity
359        if !instrument.is_inverse() && command.order_list.orders[0].is_quote_quantity() {
360            let mut quote_qty = None;
361            let mut last_px = None;
362
363            for order in &mut command.order_list.orders {
364                if !order.is_quote_quantity() {
365                    continue; // Base quantity already set
366                }
367
368                if Some(order.quantity()) != quote_qty {
369                    last_px =
370                        self.last_px_for_conversion(&order.instrument_id(), order.order_side());
371                    quote_qty = Some(order.quantity());
372                }
373
374                if let Some(px) = last_px {
375                    let base_qty = instrument.get_base_quantity(order.quantity(), px);
376                    self.set_order_base_qty(order, base_qty);
377                } else {
378                    for order in &command.order_list.orders {
379                        self.deny_order(
380                            order,
381                            &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
382                        );
383                    }
384                    return; // Denied
385                }
386            }
387        }
388
389        // Send to execution client
390        if let Err(e) = client.submit_order_list(command) {
391            log::error!("Error submitting order list to client: {e}");
392            for order in &orders {
393                self.deny_order(
394                    order,
395                    &format!("failed-to-submit-order-list-to-client: {e}"),
396                );
397            }
398        }
399    }
400
401    fn handle_modify_order(&self, client: &ExecutionClient, command: ModifyOrder) {
402        if let Err(e) = client.modify_order(command) {
403            log::error!("Error modifying order: {e}");
404        }
405    }
406
407    fn handle_cancel_order(&self, client: &ExecutionClient, command: CancelOrder) {
408        if let Err(e) = client.cancel_order(command) {
409            log::error!("Error canceling order: {e}");
410        }
411    }
412
413    fn handle_cancel_all_orders(&self, client: &ExecutionClient, command: CancelAllOrders) {
414        if let Err(e) = client.cancel_all_orders(command) {
415            log::error!("Error canceling all orders: {e}");
416        }
417    }
418
419    fn handle_batch_cancel_orders(&self, client: &ExecutionClient, command: BatchCancelOrders) {
420        if let Err(e) = client.batch_cancel_orders(command) {
421            log::error!("Error batch canceling orders: {e}");
422        }
423    }
424
425    fn handle_query_order(&self, client: &ExecutionClient, command: QueryOrder) {
426        if let Err(e) = client.query_order(command) {
427            log::error!("Error querying order: {e}");
428        }
429    }
430
431    fn create_order_state_snapshot(&self, order: &OrderAny) {
432        if self.config.debug {
433            log::debug!("Creating order state snapshot for {order}");
434        }
435
436        if self.cache.borrow().has_backing() {
437            if let Err(e) = self.cache.borrow().snapshot_order_state(order) {
438                log::error!("Failed to snapshot order state: {e}");
439                return;
440            }
441        }
442
443        let mut msgbus = self.msgbus.borrow_mut();
444        if msgbus.has_backing {
445            let topic = msgbus
446                .switchboard
447                .get_order_snapshots_topic(order.client_order_id());
448            msgbus.publish(&topic, order);
449        }
450    }
451
452    fn create_position_state_snapshot(&self, position: &Position) {
453        if self.config.debug {
454            log::debug!("Creating position state snapshot for {position}");
455        }
456
457        // let mut position: Position = position.clone();
458        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
459        //     position.unrealized_pnl(last)
460        // }
461
462        let mut msgbus = self.msgbus.borrow_mut();
463        let topic = msgbus
464            .switchboard
465            .get_positions_snapshots_topic(position.id);
466        msgbus.publish(&topic, position);
467    }
468
469    // -- EVENT HANDLERS ----------------------------------------------------
470
471    fn handle_event(&mut self, event: &OrderEventAny) {
472        if self.config.debug {
473            log::debug!("{RECV}{EVT} {event:?}");
474        }
475
476        let client_order_id = event.client_order_id();
477        let cache = self.cache.borrow();
478        let mut order = if let Some(order) = cache.order(&client_order_id) {
479            order.clone()
480        } else {
481            log::warn!(
482                "Order with {} not found in the cache to apply {}",
483                event.client_order_id(),
484                event
485            );
486
487            // Try to find order by venue order ID if available
488            let venue_order_id = if let Some(id) = event.venue_order_id() {
489                id
490            } else {
491                log::error!(
492                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
493                    event.client_order_id()
494                );
495                return;
496            };
497
498            // Look up client order ID from venue order ID
499            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
500                id
501            } else {
502                log::error!(
503                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
504                    event.client_order_id(),
505                );
506                return;
507            };
508
509            // Get order using found client order ID
510            if let Some(order) = cache.order(client_order_id) {
511                log::info!("Order with {client_order_id} was found in the cache");
512                order.clone()
513            } else {
514                log::error!(
515                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
516                );
517                return;
518            }
519        };
520
521        drop(cache);
522        match event {
523            OrderEventAny::Filled(order_filled) => {
524                let oms_type = self.determine_oms_type(order_filled);
525                let position_id = self.determine_position_id(*order_filled, oms_type);
526
527                // Create a new fill with the determined position ID
528                let mut order_filled = *order_filled;
529                if order_filled.position_id.is_none() {
530                    order_filled.position_id = Some(position_id);
531                }
532
533                self.apply_event_to_order(&mut order, OrderEventAny::Filled(order_filled));
534                self.handle_order_fill(&order, order_filled, oms_type);
535            }
536            _ => {
537                self.apply_event_to_order(&mut order, event.clone());
538            }
539        }
540    }
541
542    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
543        // Check for strategy OMS override
544        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
545            return *oms_type;
546        }
547
548        // Use native venue OMS
549        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue) {
550            if let Some(client) = self.clients.get(client_id) {
551                return client.oms_type;
552            }
553        }
554
555        if let Some(client) = &self.default_client {
556            return client.oms_type;
557        }
558
559        OmsType::Netting // Default fallback
560    }
561
562    fn determine_position_id(&mut self, fill: OrderFilled, oms_type: OmsType) -> PositionId {
563        match oms_type {
564            OmsType::Hedging => self.determine_hedging_position_id(fill),
565            OmsType::Netting => self.determine_netting_position_id(fill),
566            _ => self.determine_netting_position_id(fill), // Default to netting
567        }
568    }
569
570    fn determine_hedging_position_id(&mut self, fill: OrderFilled) -> PositionId {
571        // Check if position ID already exists
572        if let Some(position_id) = fill.position_id {
573            if self.config.debug {
574                log::debug!("Already had a position ID of: {}", position_id);
575            }
576            return position_id;
577        }
578
579        // Check for order
580        let cache = self.cache.borrow();
581        let order = match cache.order(&fill.client_order_id()) {
582            Some(o) => o,
583            None => {
584                panic!(
585                    "Order for {} not found to determine position ID",
586                    fill.client_order_id()
587                );
588            }
589        };
590
591        // Check execution spawn orders
592        if let Some(spawn_id) = order.exec_spawn_id() {
593            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
594            for spawned_order in spawn_orders {
595                if let Some(pos_id) = spawned_order.position_id() {
596                    if self.config.debug {
597                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
598                    }
599                    return pos_id;
600                }
601            }
602        }
603
604        // Generate new position ID
605        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
606        if self.config.debug {
607            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
608        }
609        position_id
610    }
611
612    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
613        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
614    }
615
616    fn apply_event_to_order(&self, order: &mut OrderAny, event: OrderEventAny) {
617        if let Err(e) = order.apply(event.clone()) {
618            match e {
619                OrderError::InvalidStateTransition => {
620                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
621                }
622                _ => {
623                    log::error!("Error applying event: {e}, did not apply {event}");
624                }
625            }
626            return;
627        }
628
629        if let Err(e) = self.cache.borrow_mut().update_order(order) {
630            log::error!("Error updating order in cache: {e}");
631        }
632
633        let mut msgbus = self.msgbus.borrow_mut();
634        let topic = msgbus
635            .switchboard
636            .get_event_orders_topic(event.strategy_id());
637        msgbus.publish(&topic, order);
638
639        if self.config.snapshot_orders {
640            self.create_order_state_snapshot(order);
641        }
642    }
643
644    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
645        let instrument =
646            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
647                instrument.clone()
648            } else {
649                log::error!(
650                    "Cannot handle order fill: no instrument found for {}, {fill}",
651                    fill.instrument_id,
652                );
653                return;
654            };
655
656        if self.cache.borrow().account(&fill.account_id).is_none() {
657            log::error!(
658                "Cannot handle order fill: no account found for {}, {fill}",
659                fill.instrument_id.venue,
660            );
661            return;
662        }
663
664        let position_id = if let Some(position_id) = fill.position_id {
665            position_id
666        } else {
667            log::error!("Cannot handle order fill: no position ID found for fill {fill}",);
668            return;
669        };
670
671        let mut position = match self.cache.borrow().position(&position_id) {
672            Some(pos) if !pos.is_closed() => pos.clone(),
673            _ => self
674                .open_position(instrument.clone(), None, fill, oms_type)
675                .unwrap(),
676        };
677
678        if self.will_flip_position(&position, fill) {
679            self.flip_position(instrument, &mut position, fill, oms_type);
680        } else {
681            self.update_position(&mut position, fill);
682        }
683
684        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) && position.is_open() {
685            for client_order_id in order.linked_order_ids().unwrap_or_default() {
686                let mut cache = self.cache.borrow_mut();
687                let contingent_order = cache.mut_order(&client_order_id);
688                if let Some(contingent_order) = contingent_order {
689                    if contingent_order.position_id().is_none() {
690                        contingent_order.set_position_id(Some(position_id));
691
692                        if let Err(e) = self.cache.borrow_mut().add_position_id(
693                            &position_id,
694                            &contingent_order.instrument_id().venue,
695                            &contingent_order.client_order_id(),
696                            &contingent_order.strategy_id(),
697                        ) {
698                            log::error!("Failed to add position ID: {e}");
699                        }
700                    }
701                }
702            }
703        }
704    }
705
706    fn open_position(
707        &self,
708        instrument: InstrumentAny,
709        position: Option<&Position>,
710        fill: OrderFilled,
711        oms_type: OmsType,
712    ) -> anyhow::Result<Position> {
713        let position = if let Some(position) = position {
714            // Always snapshot opening positions to handle NETTING OMS
715            self.cache.borrow_mut().snapshot_position(position)?;
716            let mut position = position.clone();
717            position.apply(&fill);
718            self.cache.borrow_mut().update_position(&position)?;
719            position
720        } else {
721            let position = Position::new(&instrument, fill);
722            self.cache
723                .borrow_mut()
724                .add_position(position.clone(), oms_type)?;
725            if self.config.snapshot_positions {
726                self.create_position_state_snapshot(&position);
727            }
728            position
729        };
730
731        let ts_init = self.clock.borrow().timestamp_ns();
732        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
733        let mut msgbus = self.msgbus.borrow_mut();
734        let topic = msgbus
735            .switchboard
736            .get_event_positions_topic(event.strategy_id);
737        msgbus.publish(&topic, &event);
738
739        Ok(position)
740    }
741
742    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
743        position.apply(&fill);
744
745        if let Err(e) = self.cache.borrow_mut().update_position(position) {
746            log::error!("Failed to update position: {e:?}");
747            return;
748        }
749
750        if self.config.snapshot_positions {
751            self.create_position_state_snapshot(position);
752        }
753
754        let mut msgbus = self.msgbus.borrow_mut();
755        let topic = msgbus
756            .switchboard
757            .get_event_positions_topic(position.strategy_id);
758        let ts_init = self.clock.borrow().timestamp_ns();
759
760        if position.is_closed() {
761            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
762            msgbus.publish(&topic, &event);
763        } else {
764            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
765            msgbus.publish(&topic, &event);
766        }
767    }
768
769    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
770        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
771    }
772
773    fn flip_position(
774        &mut self,
775        instrument: InstrumentAny,
776        position: &mut Position,
777        fill: OrderFilled,
778        oms_type: OmsType,
779    ) {
780        let difference = match position.side {
781            PositionSide::Long => Quantity::from_raw(
782                fill.last_qty.raw - position.quantity.raw,
783                position.size_precision,
784            ),
785            PositionSide::Short => Quantity::from_raw(
786                position.quantity.raw - fill.last_qty.raw,
787                position.size_precision,
788            ),
789            _ => fill.last_qty,
790        };
791
792        // Split commission between two positions
793        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
794        let (commission1, commission2) = if let Some(commission) = fill.commission {
795            let commission_currency = commission.currency;
796            let commission1 = Money::new(commission * fill_percent, commission_currency);
797            let commission2 = commission - commission1;
798            (Some(commission1), Some(commission2))
799        } else {
800            log::error!("Commission is not available.");
801            (None, None)
802        };
803
804        let mut fill_split1: Option<OrderFilled> = None;
805        if position.is_open() {
806            fill_split1 = Some(OrderFilled::new(
807                fill.trader_id,
808                fill.strategy_id,
809                fill.instrument_id,
810                fill.client_order_id,
811                fill.venue_order_id,
812                fill.account_id,
813                fill.trade_id,
814                fill.order_side,
815                fill.order_type,
816                position.quantity,
817                fill.last_px,
818                fill.currency,
819                fill.liquidity_side,
820                UUID4::new(),
821                fill.ts_event,
822                fill.ts_init,
823                fill.reconciliation,
824                fill.position_id,
825                commission1,
826            ));
827
828            self.update_position(position, fill_split1.unwrap());
829        }
830
831        // Guard against flipping a position with a zero fill size
832        if difference.raw == 0 {
833            log::warn!(
834                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
835            );
836            return;
837        }
838
839        let position_id_flip = if oms_type == OmsType::Hedging {
840            if let Some(position_id) = fill.position_id {
841                if position_id.is_virtual() {
842                    // Generate new position ID for flipped virtual position
843                    Some(self.pos_id_generator.generate(fill.strategy_id, true))
844                } else {
845                    Some(position_id)
846                }
847            } else {
848                None
849            }
850        } else {
851            fill.position_id
852        };
853
854        let fill_split2 = OrderFilled::new(
855            fill.trader_id,
856            fill.strategy_id,
857            fill.instrument_id,
858            fill.client_order_id,
859            fill.venue_order_id,
860            fill.account_id,
861            fill.trade_id,
862            fill.order_side,
863            fill.order_type,
864            difference,
865            fill.last_px,
866            fill.currency,
867            fill.liquidity_side,
868            UUID4::new(),
869            fill.ts_event,
870            fill.ts_init,
871            fill.reconciliation,
872            position_id_flip,
873            commission2,
874        );
875
876        if oms_type == OmsType::Hedging {
877            if let Some(position_id) = fill.position_id {
878                if position_id.is_virtual() {
879                    log::warn!("Closing position {fill_split1:?}");
880                    log::warn!("Flipping position {fill_split2:?}");
881                }
882            }
883        }
884
885        // Open flipped position
886        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
887            log::error!("Failed to open flipped position: {e:?}");
888        }
889    }
890
891    // -- INTERNAL ------------------------------------------------------------
892
893    fn set_position_id_counts(&mut self) {
894        // For the internal position ID generator
895        let cache = self.cache.borrow();
896        let positions = cache.positions(None, None, None, None);
897
898        // Count positions per instrument_id using a HashMap
899        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
900
901        for position in positions {
902            *counts.entry(position.strategy_id).or_insert(0) += 1;
903        }
904
905        self.pos_id_generator.reset();
906
907        for (strategy_id, count) in counts {
908            self.pos_id_generator.set_count(count, strategy_id);
909            log::info!("Set PositionId count for {strategy_id} to {count}");
910        }
911    }
912
913    fn last_px_for_conversion(
914        &self,
915        instrument_id: &InstrumentId,
916        side: OrderSide,
917    ) -> Option<Price> {
918        let cache = self.cache.borrow();
919
920        // Try to get last trade price
921        if let Some(trade) = cache.trade(instrument_id) {
922            return Some(trade.price);
923        }
924
925        // Fall back to quote if available
926        if let Some(quote) = cache.quote(instrument_id) {
927            match side {
928                OrderSide::Buy => Some(quote.ask_price),
929                OrderSide::Sell => Some(quote.bid_price),
930                OrderSide::NoOrderSide => None,
931            }
932        } else {
933            None
934        }
935    }
936
937    fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
938        log::info!(
939            "Setting {} order quote quantity {} to base quantity {}",
940            order.instrument_id(),
941            order.quantity(),
942            base_qty
943        );
944
945        let original_qty = order.quantity();
946        order.set_quantity(base_qty);
947        order.set_leaves_qty(base_qty);
948        order.set_is_quote_quantity(false);
949
950        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
951            return;
952        }
953
954        if let Some(linked_order_ids) = order.linked_order_ids() {
955            for client_order_id in linked_order_ids {
956                match self.cache.borrow_mut().mut_order(&client_order_id) {
957                    Some(contingent_order) => {
958                        if !contingent_order.is_quote_quantity() {
959                            continue; // Already base quantity
960                        }
961
962                        if contingent_order.quantity() != original_qty {
963                            log::warn!(
964                                "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
965                                contingent_order.quantity(),
966                                original_qty,
967                                base_qty
968                            );
969                        }
970
971                        log::info!(
972                            "Setting {} order quote quantity {} to base quantity {}",
973                            contingent_order.instrument_id(),
974                            contingent_order.quantity(),
975                            base_qty
976                        );
977
978                        contingent_order.set_quantity(base_qty);
979                        contingent_order.set_leaves_qty(base_qty);
980                        contingent_order.set_is_quote_quantity(false);
981                    }
982                    None => {
983                        log::error!("Contingency order {client_order_id} not found");
984                    }
985                }
986            }
987        } else {
988            log::warn!(
989                "No linked order IDs found for order {}",
990                order.client_order_id()
991            );
992        }
993    }
994
995    fn deny_order(&self, order: &OrderAny, reason: &str) {
996        log::error!(
997            "Order denied: {reason}, order ID: {}",
998            order.client_order_id()
999        );
1000
1001        let denied = OrderDenied::new(
1002            order.trader_id(),
1003            order.strategy_id(),
1004            order.instrument_id(),
1005            order.client_order_id(),
1006            reason.into(),
1007            UUID4::new(),
1008            self.clock.borrow().timestamp_ns(),
1009            self.clock.borrow().timestamp_ns(),
1010        );
1011
1012        let mut order = order.clone();
1013
1014        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1015            log::error!("Failed to apply denied event to order: {e}");
1016            return;
1017        }
1018
1019        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1020            log::error!("Failed to update order in cache: {e}");
1021            return;
1022        }
1023
1024        let mut msgbus = self.msgbus.borrow_mut();
1025        let topic = msgbus
1026            .switchboard
1027            .get_event_orders_topic(order.strategy_id());
1028        msgbus.publish(&topic, &denied);
1029
1030        if self.config.snapshot_orders {
1031            self.create_order_state_snapshot(&order);
1032        }
1033    }
1034}
1035
1036////////////////////////////////////////////////////////////////////////////////
1037// Tests
1038////////////////////////////////////////////////////////////////////////////////
1039#[cfg(test)]
1040mod tests {
1041    use std::{cell::RefCell, rc::Rc};
1042
1043    use nautilus_common::{cache::Cache, clock::TestClock, msgbus::MessageBus};
1044    use rstest::fixture;
1045
1046    use super::*;
1047
1048    #[fixture]
1049    fn msgbus() -> MessageBus {
1050        MessageBus::default()
1051    }
1052
1053    #[fixture]
1054    fn simple_cache() -> Cache {
1055        Cache::new(None, None)
1056    }
1057
1058    #[fixture]
1059    fn clock() -> TestClock {
1060        TestClock::new()
1061    }
1062
1063    // Helpers
1064    fn _get_exec_engine(
1065        msgbus: Rc<RefCell<MessageBus>>,
1066        cache: Rc<RefCell<Cache>>,
1067        clock: Rc<RefCell<TestClock>>,
1068        config: Option<ExecutionEngineConfig>,
1069    ) -> ExecutionEngine {
1070        ExecutionEngine::new(clock, cache, msgbus, config)
1071    }
1072
1073    // TODO: After Implementing ExecutionClient & Strategy
1074}