nautilus_risk/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Risk management engine implementation.
17
18pub mod config;
19
20use std::{cell::RefCell, fmt::Debug, rc::Rc};
21
22use ahash::AHashMap;
23use config::RiskEngineConfig;
24use nautilus_common::{
25    cache::Cache,
26    clock::Clock,
27    logging::{CMD, EVT, RECV},
28    messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
29    msgbus,
30    throttler::Throttler,
31};
32use nautilus_core::UUID4;
33use nautilus_execution::trailing::{
34    trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
35};
36use nautilus_model::{
37    accounts::{Account, AccountAny},
38    enums::{
39        InstrumentClass, OrderSide, OrderStatus, PositionSide, TimeInForce, TradingState,
40        TrailingOffsetType, TriggerType,
41    },
42    events::{OrderDenied, OrderEventAny, OrderModifyRejected},
43    identifiers::InstrumentId,
44    instruments::{Instrument, InstrumentAny},
45    orders::{Order, OrderAny, OrderList},
46    types::{Currency, Money, Price, Quantity, quantity::QuantityRaw},
47};
48use nautilus_portfolio::Portfolio;
49use rust_decimal::{Decimal, prelude::ToPrimitive};
50use ustr::Ustr;
51
52type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
53type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
54
55/// Central risk management engine that validates and controls trading operations.
56///
57/// The `RiskEngine` provides comprehensive pre-trade risk checks including order validation,
58/// balance verification, position sizing limits, and trading state management. It acts as
59/// a gateway between strategy orders and execution, ensuring all trades comply with
60/// defined risk parameters and regulatory constraints.
61#[allow(dead_code)]
62pub struct RiskEngine {
63    clock: Rc<RefCell<dyn Clock>>,
64    cache: Rc<RefCell<Cache>>,
65    portfolio: Portfolio,
66    pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
67    pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
68    max_notional_per_order: AHashMap<InstrumentId, Decimal>,
69    trading_state: TradingState,
70    config: RiskEngineConfig,
71}
72
73impl Debug for RiskEngine {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct(stringify!(RiskEngine)).finish()
76    }
77}
78
79impl RiskEngine {
80    /// Creates a new [`RiskEngine`] instance.
81    pub fn new(
82        config: RiskEngineConfig,
83        portfolio: Portfolio,
84        clock: Rc<RefCell<dyn Clock>>,
85        cache: Rc<RefCell<Cache>>,
86    ) -> Self {
87        let throttled_submit_order =
88            Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
89
90        let throttled_modify_order =
91            Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
92
93        Self {
94            clock,
95            cache,
96            portfolio,
97            throttled_submit_order,
98            throttled_modify_order,
99            max_notional_per_order: AHashMap::new(),
100            trading_state: TradingState::Active,
101            config,
102        }
103    }
104
105    fn create_submit_order_throttler(
106        config: &RiskEngineConfig,
107        clock: Rc<RefCell<dyn Clock>>,
108        cache: Rc<RefCell<Cache>>,
109    ) -> Throttler<SubmitOrder, SubmitOrderFn> {
110        let success_handler = {
111            Box::new(move |submit_order: SubmitOrder| {
112                msgbus::send_any(
113                    "ExecEngine.execute".into(),
114                    &TradingCommand::SubmitOrder(submit_order),
115                );
116            }) as Box<dyn Fn(SubmitOrder)>
117        };
118
119        let failure_handler = {
120            let cache = cache;
121            let clock = clock.clone();
122            Box::new(move |submit_order: SubmitOrder| {
123                let reason = "REJECTED BY THROTTLER";
124                log::warn!(
125                    "SubmitOrder for {} DENIED: {}",
126                    submit_order.client_order_id(),
127                    reason
128                );
129
130                Self::handle_submit_order_cache(&cache, &submit_order);
131
132                let denied = Self::create_order_denied(&submit_order, reason, &clock);
133
134                msgbus::send_any("ExecEngine.process".into(), &denied);
135            }) as Box<dyn Fn(SubmitOrder)>
136        };
137
138        Throttler::new(
139            config.max_order_submit.limit,
140            config.max_order_submit.interval_ns,
141            clock,
142            "ORDER_SUBMIT_THROTTLER".to_string(),
143            success_handler,
144            Some(failure_handler),
145            Ustr::from(UUID4::new().as_str()),
146        )
147    }
148
149    fn create_modify_order_throttler(
150        config: &RiskEngineConfig,
151        clock: Rc<RefCell<dyn Clock>>,
152        cache: Rc<RefCell<Cache>>,
153    ) -> Throttler<ModifyOrder, ModifyOrderFn> {
154        let success_handler = {
155            Box::new(move |order: ModifyOrder| {
156                msgbus::send_any(
157                    "ExecEngine.execute".into(),
158                    &TradingCommand::ModifyOrder(order),
159                );
160            }) as Box<dyn Fn(ModifyOrder)>
161        };
162
163        let failure_handler = {
164            let cache = cache;
165            let clock = clock.clone();
166            Box::new(move |order: ModifyOrder| {
167                let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
168                log::warn!(
169                    "SubmitOrder for {} DENIED: {}",
170                    order.client_order_id,
171                    reason
172                );
173
174                let order = match Self::get_existing_order(&cache, &order) {
175                    Some(order) => order,
176                    None => return,
177                };
178
179                let rejected = Self::create_modify_rejected(&order, reason, &clock);
180
181                msgbus::send_any("ExecEngine.process".into(), &rejected);
182            }) as Box<dyn Fn(ModifyOrder)>
183        };
184
185        Throttler::new(
186            config.max_order_modify.limit,
187            config.max_order_modify.interval_ns,
188            clock,
189            "ORDER_MODIFY_THROTTLER".to_string(),
190            success_handler,
191            Some(failure_handler),
192            Ustr::from(UUID4::new().as_str()),
193        )
194    }
195
196    fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
197        let mut cache = cache.borrow_mut();
198        if !cache.order_exists(&submit_order.client_order_id()) {
199            cache
200                .add_order(submit_order.order.clone(), None, None, false)
201                .map_err(|e| {
202                    log::error!("Cannot add order to cache: {e}");
203                })
204                .unwrap();
205        }
206    }
207
208    fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
209        let cache = cache.borrow();
210        if let Some(order) = cache.order(&order.client_order_id) {
211            Some(order.clone())
212        } else {
213            log::error!(
214                "Order with command.client_order_id: {} not found",
215                order.client_order_id
216            );
217            None
218        }
219    }
220
221    fn create_order_denied(
222        submit_order: &SubmitOrder,
223        reason: &str,
224        clock: &Rc<RefCell<dyn Clock>>,
225    ) -> OrderEventAny {
226        let timestamp = clock.borrow().timestamp_ns();
227        OrderEventAny::Denied(OrderDenied::new(
228            submit_order.trader_id,
229            submit_order.strategy_id,
230            submit_order.instrument_id,
231            submit_order.client_order_id(),
232            reason.into(),
233            UUID4::new(),
234            timestamp,
235            timestamp,
236        ))
237    }
238
239    fn create_modify_rejected(
240        order: &OrderAny,
241        reason: &str,
242        clock: &Rc<RefCell<dyn Clock>>,
243    ) -> OrderEventAny {
244        let timestamp = clock.borrow().timestamp_ns();
245        OrderEventAny::ModifyRejected(OrderModifyRejected::new(
246            order.trader_id(),
247            order.strategy_id(),
248            order.instrument_id(),
249            order.client_order_id(),
250            reason.into(),
251            UUID4::new(),
252            timestamp,
253            timestamp,
254            false,
255            order.venue_order_id(),
256            None,
257        ))
258    }
259
260    /// Executes a trading command through the risk management pipeline.
261    pub fn execute(&mut self, command: TradingCommand) {
262        // This will extend to other commands such as `RiskCommand`
263        self.handle_command(command);
264    }
265
266    /// Processes an order event for risk monitoring and state updates.
267    pub fn process(&mut self, event: OrderEventAny) {
268        // This will extend to other events such as `RiskEvent`
269        self.handle_event(event);
270    }
271
272    /// Sets the trading state for risk control enforcement.
273    pub fn set_trading_state(&mut self, state: TradingState) {
274        if state == self.trading_state {
275            log::warn!("No change to trading state: already set to {state:?}");
276            return;
277        }
278
279        self.trading_state = state;
280
281        let _ts_now = self.clock.borrow().timestamp_ns();
282
283        // TODO: Create a new Event "TradingStateChanged" in OrderEventAny enum.
284        // let event = OrderEventAny::TradingStateChanged(TradingStateChanged::new(..,self.trading_state,..));
285
286        msgbus::publish("events.risk".into(), &"message"); // TODO: Send the new Event here
287
288        log::info!("Trading state set to {state:?}");
289    }
290
291    /// Sets the maximum notional value per order for the specified instrument.
292    pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
293        self.max_notional_per_order.insert(instrument_id, new_value);
294
295        let new_value_str = new_value.to_string();
296        log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
297    }
298
299    /// Starts the risk engine.
300    pub fn start(&mut self) {
301        log::info!("Started");
302    }
303
304    /// Stops the risk engine.
305    pub fn stop(&mut self) {
306        log::info!("Stopped");
307    }
308
309    /// Resets the risk engine to its initial state.
310    pub fn reset(&mut self) {
311        self.throttled_submit_order.reset();
312        self.throttled_modify_order.reset();
313        self.max_notional_per_order.clear();
314        self.trading_state = TradingState::Active;
315
316        log::info!("Reset");
317    }
318
319    /// Disposes of the risk engine, releasing resources.
320    pub fn dispose(&mut self) {
321        log::info!("Disposed");
322    }
323
324    /// Returns a reference to the clock.
325    #[must_use]
326    pub fn clock(&self) -> &Rc<RefCell<dyn Clock>> {
327        &self.clock
328    }
329
330    /// Returns a reference to the configuration.
331    #[must_use]
332    pub const fn config(&self) -> &RiskEngineConfig {
333        &self.config
334    }
335
336    /// Returns the current trading state.
337    #[must_use]
338    pub const fn trading_state(&self) -> TradingState {
339        self.trading_state
340    }
341
342    /// Returns a reference to the max notional per order settings.
343    #[must_use]
344    pub const fn max_notional_per_order(&self) -> &AHashMap<InstrumentId, Decimal> {
345        &self.max_notional_per_order
346    }
347
348    fn handle_command(&mut self, command: TradingCommand) {
349        if self.config.debug {
350            log::debug!("{CMD}{RECV} {command:?}");
351        }
352
353        match command {
354            TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
355            TradingCommand::SubmitOrderList(submit_order_list) => {
356                self.handle_submit_order_list(submit_order_list);
357            }
358            TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
359            TradingCommand::QueryAccount(query_account) => {
360                self.send_to_execution(TradingCommand::QueryAccount(query_account));
361            }
362            _ => {
363                log::error!("Cannot handle command: {command}");
364            }
365        }
366    }
367
368    fn handle_submit_order(&mut self, command: SubmitOrder) {
369        if self.config.bypass {
370            self.send_to_execution(TradingCommand::SubmitOrder(command));
371            return;
372        }
373
374        let order = &command.order;
375        if let Some(position_id) = command.position_id
376            && order.is_reduce_only()
377        {
378            let position_exists = {
379                let cache = self.cache.borrow();
380                cache
381                    .position(&position_id)
382                    .map(|pos| (pos.side, pos.quantity))
383            };
384
385            if let Some((pos_side, pos_quantity)) = position_exists {
386                if !order.would_reduce_only(pos_side, pos_quantity) {
387                    self.deny_command(
388                        TradingCommand::SubmitOrder(command),
389                        &format!("Reduce only order would increase position {position_id}"),
390                    );
391                    return; // Denied
392                }
393            } else {
394                self.deny_command(
395                    TradingCommand::SubmitOrder(command),
396                    &format!("Position {position_id} not found for reduce-only order"),
397                );
398                return;
399            }
400        }
401
402        let instrument_exists = {
403            let cache = self.cache.borrow();
404            cache.instrument(&command.instrument_id).cloned()
405        };
406
407        let instrument = if let Some(instrument) = instrument_exists {
408            instrument
409        } else {
410            self.deny_command(
411                TradingCommand::SubmitOrder(command.clone()),
412                &format!("Instrument for {} not found", command.instrument_id),
413            );
414            return; // Denied
415        };
416
417        ////////////////////////////////////////////////////////////////////////////////
418        // PRE-TRADE ORDER(S) CHECKS
419        ////////////////////////////////////////////////////////////////////////////////
420        if !self.check_order(instrument.clone(), order.clone()) {
421            return; // Denied
422        }
423
424        if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
425            return; // Denied
426        }
427
428        // Route through execution gateway for TradingState checks & throttling
429        self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
430    }
431
432    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
433        if self.config.bypass {
434            self.send_to_execution(TradingCommand::SubmitOrderList(command));
435            return;
436        }
437
438        let instrument_exists = {
439            let cache = self.cache.borrow();
440            cache.instrument(&command.instrument_id).cloned()
441        };
442
443        let instrument = if let Some(instrument) = instrument_exists {
444            instrument
445        } else {
446            self.deny_command(
447                TradingCommand::SubmitOrderList(command.clone()),
448                &format!("no instrument found for {}", command.instrument_id),
449            );
450            return; // Denied
451        };
452
453        ////////////////////////////////////////////////////////////////////////////////
454        // PRE-TRADE ORDER(S) CHECKS
455        ////////////////////////////////////////////////////////////////////////////////
456        for order in command.order_list.orders.clone() {
457            if !self.check_order(instrument.clone(), order) {
458                return; // Denied
459            }
460        }
461
462        if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
463            self.deny_order_list(
464                command.order_list.clone(),
465                &format!("OrderList {} DENIED", command.order_list.id),
466            );
467            return; // Denied
468        }
469
470        self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
471    }
472
473    fn handle_modify_order(&mut self, command: ModifyOrder) {
474        ////////////////////////////////////////////////////////////////////////////////
475        // VALIDATE COMMAND
476        ////////////////////////////////////////////////////////////////////////////////
477        let order_exists = {
478            let cache = self.cache.borrow();
479            cache.order(&command.client_order_id).cloned()
480        };
481
482        let order = if let Some(order) = order_exists {
483            order
484        } else {
485            log::error!(
486                "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
487                command.client_order_id
488            );
489            return;
490        };
491
492        if order.is_closed() {
493            self.reject_modify_order(
494                order,
495                &format!(
496                    "Order with command.client_order_id: {} already closed",
497                    command.client_order_id
498                ),
499            );
500            return;
501        } else if order.status() == OrderStatus::PendingCancel {
502            self.reject_modify_order(
503                order,
504                &format!(
505                    "Order with command.client_order_id: {} is already pending cancel",
506                    command.client_order_id
507                ),
508            );
509            return;
510        }
511
512        let maybe_instrument = {
513            let cache = self.cache.borrow();
514            cache.instrument(&command.instrument_id).cloned()
515        };
516
517        let instrument = if let Some(instrument) = maybe_instrument {
518            instrument
519        } else {
520            self.reject_modify_order(
521                order,
522                &format!("no instrument found for {:?}", command.instrument_id),
523            );
524            return; // Denied
525        };
526
527        // Check Price
528        let mut risk_msg = self.check_price(&instrument, command.price);
529        if let Some(risk_msg) = risk_msg {
530            self.reject_modify_order(order, &risk_msg);
531            return; // Denied
532        }
533
534        // Check Trigger
535        risk_msg = self.check_price(&instrument, command.trigger_price);
536        if let Some(risk_msg) = risk_msg {
537            self.reject_modify_order(order, &risk_msg);
538            return; // Denied
539        }
540
541        // Check Quantity
542        risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
543        if let Some(risk_msg) = risk_msg {
544            self.reject_modify_order(order, &risk_msg);
545            return; // Denied
546        }
547
548        // Check TradingState
549        match self.trading_state {
550            TradingState::Halted => {
551                self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
552            }
553            TradingState::Reducing => {
554                if let Some(quantity) = command.quantity
555                    && quantity > order.quantity()
556                    && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
557                        || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
558                {
559                    self.reject_modify_order(
560                        order,
561                        &format!(
562                            "TradingState is REDUCING and update will increase exposure {}",
563                            instrument.id()
564                        ),
565                    );
566                }
567            }
568            _ => {}
569        }
570
571        self.throttled_modify_order.send(command);
572    }
573
574    fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
575        ////////////////////////////////////////////////////////////////////////////////
576        // VALIDATION CHECKS
577        ////////////////////////////////////////////////////////////////////////////////
578        if order.time_in_force() == TimeInForce::Gtd {
579            // SAFETY: GTD guarantees an expire time
580            let expire_time = order.expire_time().unwrap();
581            if expire_time <= self.clock.borrow().timestamp_ns() {
582                self.deny_order(
583                    order,
584                    &format!("GTD {} already past", expire_time.to_rfc3339()),
585                );
586                return false; // Denied
587            }
588        }
589
590        if !self.check_order_price(instrument.clone(), order.clone())
591            || !self.check_order_quantity(instrument, order)
592        {
593            return false; // Denied
594        }
595
596        true
597    }
598
599    fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
600        ////////////////////////////////////////////////////////////////////////////////
601        // CHECK PRICE
602        ////////////////////////////////////////////////////////////////////////////////
603        if order.price().is_some() {
604            let risk_msg = self.check_price(&instrument, order.price());
605            if let Some(risk_msg) = risk_msg {
606                self.deny_order(order, &risk_msg);
607                return false; // Denied
608            }
609        }
610
611        ////////////////////////////////////////////////////////////////////////////////
612        // CHECK TRIGGER
613        ////////////////////////////////////////////////////////////////////////////////
614        if order.trigger_price().is_some() {
615            let risk_msg = self.check_price(&instrument, order.trigger_price());
616            if let Some(risk_msg) = risk_msg {
617                self.deny_order(order, &risk_msg);
618                return false; // Denied
619            }
620        }
621
622        true
623    }
624
625    fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
626        let risk_msg = self.check_quantity(
627            &instrument,
628            Some(order.quantity()),
629            order.is_quote_quantity(),
630        );
631        if let Some(risk_msg) = risk_msg {
632            self.deny_order(order, &risk_msg);
633            return false; // Denied
634        }
635
636        true
637    }
638
639    fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
640        ////////////////////////////////////////////////////////////////////////////////
641        // CHECK TRIGGER
642        ////////////////////////////////////////////////////////////////////////////////
643        let mut last_px: Option<Price> = None;
644        let mut max_notional: Option<Money> = None;
645
646        // Determine max notional
647        let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
648        if let Some(max_notional_setting_val) = max_notional_setting.copied() {
649            max_notional = Some(Money::new(
650                max_notional_setting_val
651                    .to_f64()
652                    .expect("Invalid decimal conversion"),
653                instrument.quote_currency(),
654            ));
655        }
656
657        // Get account for risk checks
658        let account_exists = {
659            let cache = self.cache.borrow();
660            cache.account_for_venue(&instrument.id().venue).cloned()
661        };
662
663        let account = if let Some(account) = account_exists {
664            account
665        } else {
666            log::debug!("Cannot find account for venue {}", instrument.id().venue);
667            return true; // TODO: Temporary early return until handling routing/multiple venues
668        };
669        let cash_account = match account {
670            AccountAny::Cash(cash_account) => cash_account,
671            AccountAny::Margin(_) => return true, // TODO: Determine risk controls for margin
672        };
673        let free = cash_account.balance_free(Some(instrument.quote_currency()));
674        let allow_borrowing = cash_account.allow_borrowing;
675        if self.config.debug {
676            log::debug!("Free cash: {free:?}");
677        }
678
679        // Get net LONG position quantity for this instrument (for position-reducing sell checks),
680        // accounting for already submitted (but unfilled) SELL orders to prevent overselling.
681        let (net_long_qty_raw, pending_sell_qty_raw) = {
682            let cache = self.cache.borrow();
683            let long_qty: QuantityRaw = cache
684                .positions_open(None, Some(&instrument.id()), None, Some(PositionSide::Long))
685                .iter()
686                .map(|pos| pos.quantity.raw)
687                .sum();
688            let pending_sells: QuantityRaw = cache
689                .orders_open(None, Some(&instrument.id()), None, Some(OrderSide::Sell))
690                .iter()
691                .map(|ord| ord.leaves_qty().raw)
692                .sum();
693            (long_qty, pending_sells)
694        };
695
696        // Available quantity is long position minus pending sells
697        let available_long_qty_raw = net_long_qty_raw.saturating_sub(pending_sell_qty_raw);
698
699        if self.config.debug && net_long_qty_raw > 0 {
700            log::debug!(
701                "Net LONG qty (raw): {net_long_qty_raw}, pending sells: {pending_sell_qty_raw}, available: {available_long_qty_raw}"
702            );
703        }
704
705        // Track cumulative sell quantity to determine position-reducing vs position-opening sells
706        let mut cum_sell_qty_raw: QuantityRaw = 0;
707
708        let mut cum_notional_buy: Option<Money> = None;
709        let mut cum_notional_sell: Option<Money> = None;
710        let mut base_currency: Option<Currency> = None;
711        for order in &orders {
712            // Determine last price based on order type
713            last_px = match order {
714                OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
715                    if last_px.is_none() {
716                        let cache = self.cache.borrow();
717                        if let Some(last_quote) = cache.quote(&instrument.id()) {
718                            match order.order_side() {
719                                OrderSide::Buy => Some(last_quote.ask_price),
720                                OrderSide::Sell => Some(last_quote.bid_price),
721                                _ => panic!("Invalid order side"),
722                            }
723                        } else {
724                            let cache = self.cache.borrow();
725                            let last_trade = cache.trade(&instrument.id());
726
727                            if let Some(last_trade) = last_trade {
728                                Some(last_trade.price)
729                            } else {
730                                log::warn!(
731                                    "Cannot check MARKET order risk: no prices for {}",
732                                    instrument.id()
733                                );
734                                continue;
735                            }
736                        }
737                    } else {
738                        last_px
739                    }
740                }
741                OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
742                OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
743                    if let Some(trigger_price) = order.trigger_price() {
744                        Some(trigger_price)
745                    } else {
746                        // Validate trailing offset type is supported
747                        let offset_type = order.trailing_offset_type().unwrap();
748                        if !matches!(
749                            offset_type,
750                            TrailingOffsetType::Price
751                                | TrailingOffsetType::BasisPoints
752                                | TrailingOffsetType::Ticks
753                        ) {
754                            self.deny_order(
755                                order.clone(),
756                                &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
757                            );
758                            return false;
759                        }
760
761                        let trigger_type = order.trigger_type().unwrap();
762                        let cache = self.cache.borrow();
763
764                        if trigger_type == TriggerType::BidAsk {
765                            if let Some(quote) = cache.quote(&instrument.id()) {
766                                match trailing_stop_calculate_with_bid_ask(
767                                    instrument.price_increment(),
768                                    order.trailing_offset_type().unwrap(),
769                                    order.order_side_specified(),
770                                    order.trailing_offset().unwrap(),
771                                    quote.bid_price,
772                                    quote.ask_price,
773                                ) {
774                                    Ok(calculated_trigger) => Some(calculated_trigger),
775                                    Err(e) => {
776                                        log::warn!(
777                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
778                                            order.order_type()
779                                        );
780                                        continue;
781                                    }
782                                }
783                            } else {
784                                log::warn!(
785                                    "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
786                                    order.order_type(),
787                                    instrument.id()
788                                );
789                                continue;
790                            }
791                        } else if let Some(last_trade) = cache.trade(&instrument.id()) {
792                            match trailing_stop_calculate_with_last(
793                                instrument.price_increment(),
794                                order.trailing_offset_type().unwrap(),
795                                order.order_side_specified(),
796                                order.trailing_offset().unwrap(),
797                                last_trade.price,
798                            ) {
799                                Ok(calculated_trigger) => Some(calculated_trigger),
800                                Err(e) => {
801                                    log::warn!(
802                                        "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
803                                        order.order_type(),
804                                        e
805                                    );
806                                    continue;
807                                }
808                            }
809                        } else if trigger_type == TriggerType::LastOrBidAsk {
810                            // Fallback to bid/ask when no trade data available
811                            if let Some(quote) = cache.quote(&instrument.id()) {
812                                match trailing_stop_calculate_with_bid_ask(
813                                    instrument.price_increment(),
814                                    order.trailing_offset_type().unwrap(),
815                                    order.order_side_specified(),
816                                    order.trailing_offset().unwrap(),
817                                    quote.bid_price,
818                                    quote.ask_price,
819                                ) {
820                                    Ok(calculated_trigger) => Some(calculated_trigger),
821                                    Err(e) => {
822                                        log::warn!(
823                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
824                                            order.order_type()
825                                        );
826                                        continue;
827                                    }
828                                }
829                            } else {
830                                log::warn!(
831                                    "Cannot check {} order risk: no trigger price set and no market data available for {}",
832                                    order.order_type(),
833                                    instrument.id()
834                                );
835                                continue;
836                            }
837                        } else {
838                            log::warn!(
839                                "Cannot check {} order risk: no trigger price set and no market data available for {}",
840                                order.order_type(),
841                                instrument.id()
842                            );
843                            continue;
844                        }
845                    }
846                }
847                _ => order.price(),
848            };
849
850            let last_px = if let Some(px) = last_px {
851                px
852            } else {
853                log::error!("Cannot check order risk: no price available");
854                continue;
855            };
856
857            // For quote quantity limit orders, use worst-case execution price
858            let effective_price = if order.is_quote_quantity()
859                && !instrument.is_inverse()
860                && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
861            {
862                // Get current market price for worst-case execution
863                let cache = self.cache.borrow();
864                if let Some(quote_tick) = cache.quote(&instrument.id()) {
865                    match order.order_side() {
866                        // BUY: could execute at best ask if below limit (more quantity)
867                        OrderSide::Buy => last_px.min(quote_tick.ask_price),
868                        // SELL: could execute at best bid if above limit (but less quantity, so use limit)
869                        OrderSide::Sell => last_px.max(quote_tick.bid_price),
870                        _ => last_px,
871                    }
872                } else {
873                    last_px // No market data, use limit price
874                }
875            } else {
876                last_px
877            };
878
879            let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
880                instrument.calculate_base_quantity(order.quantity(), effective_price)
881            } else {
882                order.quantity()
883            };
884
885            // Check min/max quantity against effective quantity
886            if let Some(max_quantity) = instrument.max_quantity()
887                && effective_quantity > max_quantity
888            {
889                self.deny_order(
890                    order.clone(),
891                    &format!(
892                        "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
893                    ),
894                );
895                return false; // Denied
896            }
897
898            if let Some(min_quantity) = instrument.min_quantity()
899                && effective_quantity < min_quantity
900            {
901                self.deny_order(
902                    order.clone(),
903                    &format!(
904                        "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
905                    ),
906                );
907                return false; // Denied
908            }
909
910            let notional =
911                instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
912
913            if self.config.debug {
914                log::debug!("Notional: {notional:?}");
915            }
916
917            // Check MAX notional per order limit
918            if let Some(max_notional_value) = max_notional
919                && notional > max_notional_value
920            {
921                self.deny_order(
922                        order.clone(),
923                        &format!(
924                            "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
925                        ),
926                    );
927                return false; // Denied
928            }
929
930            // Check MIN notional instrument limit
931            if let Some(min_notional) = instrument.min_notional()
932                && notional.currency == min_notional.currency
933                && notional < min_notional
934            {
935                self.deny_order(
936                        order.clone(),
937                        &format!(
938                            "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
939                        ),
940                    );
941                return false; // Denied
942            }
943
944            // // Check MAX notional instrument limit
945            if let Some(max_notional) = instrument.max_notional()
946                && notional.currency == max_notional.currency
947                && notional > max_notional
948            {
949                self.deny_order(
950                        order.clone(),
951                        &format!(
952                            "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
953                        ),
954                    );
955                return false; // Denied
956            }
957
958            // Calculate OrderBalanceImpact (valid for CashAccount only)
959            let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
960            let order_balance_impact = match order.order_side() {
961                OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
962                OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
963                OrderSide::NoOrderSide => {
964                    panic!("invalid `OrderSide`, was {}", order.order_side());
965                }
966            };
967
968            if self.config.debug {
969                log::debug!("Balance impact: {order_balance_impact}");
970            }
971
972            // Skip balance check when borrowing is enabled (e.g. spot margin trading)
973            if !allow_borrowing
974                && let Some(free_val) = free
975                && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
976            {
977                self.deny_order(
978                    order.clone(),
979                    &format!(
980                        "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
981                    ),
982                );
983                return false;
984            }
985
986            if base_currency.is_none() {
987                base_currency = instrument.base_currency();
988            }
989            if order.is_buy() {
990                match cum_notional_buy.as_mut() {
991                    Some(cum_notional_buy_val) => {
992                        cum_notional_buy_val.raw += -order_balance_impact.raw;
993                    }
994                    None => {
995                        cum_notional_buy = Some(Money::from_raw(
996                            -order_balance_impact.raw,
997                            order_balance_impact.currency,
998                        ));
999                    }
1000                }
1001
1002                if self.config.debug {
1003                    log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
1004                }
1005
1006                if !allow_borrowing
1007                    && let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
1008                    && cum_notional_buy > free
1009                {
1010                    self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
1011                    return false; // Denied
1012                }
1013            } else if order.is_sell() {
1014                let is_position_reducing_sell = order.is_reduce_only()
1015                    || (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw;
1016                cum_sell_qty_raw += effective_quantity.raw;
1017
1018                if is_position_reducing_sell {
1019                    if self.config.debug {
1020                        log::debug!("Position-reducing SELL skips balance check");
1021                    }
1022                    continue;
1023                }
1024
1025                if cash_account.base_currency.is_some() {
1026                    match cum_notional_sell.as_mut() {
1027                        Some(cum_notional_buy_val) => {
1028                            cum_notional_buy_val.raw += order_balance_impact.raw;
1029                        }
1030                        None => {
1031                            cum_notional_sell = Some(Money::from_raw(
1032                                order_balance_impact.raw,
1033                                order_balance_impact.currency,
1034                            ));
1035                        }
1036                    }
1037                    if self.config.debug {
1038                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1039                    }
1040
1041                    if !allow_borrowing
1042                        && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1043                        && cum_notional_sell > free
1044                    {
1045                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1046                        return false; // Denied
1047                    }
1048                }
1049                // Account is already of type Cash, so no check
1050                else if let Some(base_currency) = base_currency {
1051                    let cash_value = Money::from_raw(
1052                        effective_quantity
1053                            .raw
1054                            .try_into()
1055                            .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
1056                            .unwrap(),
1057                        base_currency,
1058                    );
1059
1060                    if self.config.debug {
1061                        log::debug!("Cash value: {cash_value:?}");
1062                        log::debug!(
1063                            "Total: {:?}",
1064                            cash_account.balance_total(Some(base_currency))
1065                        );
1066                        log::debug!(
1067                            "Locked: {:?}",
1068                            cash_account.balance_locked(Some(base_currency))
1069                        );
1070                        log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1071                    }
1072
1073                    match cum_notional_sell {
1074                        Some(mut value) => {
1075                            value.raw += cash_value.raw;
1076                            cum_notional_sell = Some(value);
1077                        }
1078                        None => cum_notional_sell = Some(cash_value),
1079                    }
1080
1081                    if self.config.debug {
1082                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1083                    }
1084                    if !allow_borrowing
1085                        && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1086                        && cum_notional_sell.raw > free.raw
1087                    {
1088                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1089                        return false; // Denied
1090                    }
1091                }
1092            }
1093        }
1094
1095        // Finally
1096        true // Passed
1097    }
1098
1099    fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1100        let price_val = price?;
1101
1102        if price_val.precision > instrument.price_precision() {
1103            return Some(format!(
1104                "price {} invalid (precision {} > {})",
1105                price_val,
1106                price_val.precision,
1107                instrument.price_precision()
1108            ));
1109        }
1110
1111        if !matches!(
1112            instrument.instrument_class(),
1113            InstrumentClass::Option
1114                | InstrumentClass::FuturesSpread
1115                | InstrumentClass::OptionSpread
1116        ) && price_val.raw <= 0
1117        {
1118            return Some(format!("price {price_val} invalid (<= 0)"));
1119        }
1120
1121        None
1122    }
1123
1124    fn check_quantity(
1125        &self,
1126        instrument: &InstrumentAny,
1127        quantity: Option<Quantity>,
1128        is_quote_quantity: bool,
1129    ) -> Option<String> {
1130        let quantity_val = quantity?;
1131
1132        // Check precision
1133        if quantity_val.precision > instrument.size_precision() {
1134            return Some(format!(
1135                "quantity {} invalid (precision {} > {})",
1136                quantity_val,
1137                quantity_val.precision,
1138                instrument.size_precision()
1139            ));
1140        }
1141
1142        // Skip min/max checks for quote quantities (they will be checked in check_orders_risk using effective_quantity)
1143        if is_quote_quantity {
1144            return None;
1145        }
1146
1147        // Check maximum quantity
1148        if let Some(max_quantity) = instrument.max_quantity()
1149            && quantity_val > max_quantity
1150        {
1151            return Some(format!(
1152                "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1153            ));
1154        }
1155
1156        // Check minimum quantity
1157        if let Some(min_quantity) = instrument.min_quantity()
1158            && quantity_val < min_quantity
1159        {
1160            return Some(format!(
1161                "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1162            ));
1163        }
1164
1165        None
1166    }
1167
1168    fn deny_command(&self, command: TradingCommand, reason: &str) {
1169        match command {
1170            TradingCommand::SubmitOrder(command) => {
1171                self.deny_order(command.order, reason);
1172            }
1173            TradingCommand::SubmitOrderList(command) => {
1174                self.deny_order_list(command.order_list, reason);
1175            }
1176            _ => {
1177                panic!("Cannot deny command {command}");
1178            }
1179        }
1180    }
1181
1182    fn deny_order(&self, order: OrderAny, reason: &str) {
1183        log::warn!(
1184            "SubmitOrder for {} DENIED: {}",
1185            order.client_order_id(),
1186            reason
1187        );
1188
1189        if order.status() != OrderStatus::Initialized {
1190            return;
1191        }
1192
1193        // Scope the cache borrow to avoid RefCell conflict when sending to ExecEngine
1194        {
1195            let mut cache = self.cache.borrow_mut();
1196            if !cache.order_exists(&order.client_order_id()) {
1197                cache
1198                    .add_order(order.clone(), None, None, false)
1199                    .map_err(|e| {
1200                        log::error!("Cannot add order to cache: {e}");
1201                    })
1202                    .unwrap();
1203            }
1204        }
1205
1206        let denied = OrderEventAny::Denied(OrderDenied::new(
1207            order.trader_id(),
1208            order.strategy_id(),
1209            order.instrument_id(),
1210            order.client_order_id(),
1211            reason.into(),
1212            UUID4::new(),
1213            self.clock.borrow().timestamp_ns(),
1214            self.clock.borrow().timestamp_ns(),
1215        ));
1216
1217        msgbus::send_any("ExecEngine.process".into(), &denied);
1218    }
1219
1220    fn deny_order_list(&self, order_list: OrderList, reason: &str) {
1221        for order in order_list.orders {
1222            if !order.is_closed() {
1223                self.deny_order(order, reason);
1224            }
1225        }
1226    }
1227
1228    fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1229        let ts_event = self.clock.borrow().timestamp_ns();
1230        let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1231            order.trader_id(),
1232            order.strategy_id(),
1233            order.instrument_id(),
1234            order.client_order_id(),
1235            reason.into(),
1236            UUID4::new(),
1237            ts_event,
1238            ts_event,
1239            false,
1240            order.venue_order_id(),
1241            order.account_id(),
1242        ));
1243
1244        msgbus::send_any("ExecEngine.process".into(), &denied);
1245    }
1246
1247    fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1248        match self.trading_state {
1249            TradingState::Halted => match command {
1250                TradingCommand::SubmitOrder(submit_order) => {
1251                    self.deny_order(submit_order.order, "TradingState::HALTED");
1252                }
1253                TradingCommand::SubmitOrderList(submit_order_list) => {
1254                    self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1255                }
1256                _ => {}
1257            },
1258            TradingState::Reducing => match command {
1259                TradingCommand::SubmitOrder(submit_order) => {
1260                    let order = submit_order.order;
1261                    if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1262                        self.deny_order(
1263                            order,
1264                            &format!(
1265                                "BUY when TradingState::REDUCING and LONG {}",
1266                                instrument.id()
1267                            ),
1268                        );
1269                    } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1270                        self.deny_order(
1271                            order,
1272                            &format!(
1273                                "SELL when TradingState::REDUCING and SHORT {}",
1274                                instrument.id()
1275                            ),
1276                        );
1277                    }
1278                }
1279                TradingCommand::SubmitOrderList(submit_order_list) => {
1280                    let order_list = submit_order_list.order_list;
1281                    for order in &order_list.orders {
1282                        if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1283                            self.deny_order_list(
1284                                order_list,
1285                                &format!(
1286                                    "BUY when TradingState::REDUCING and LONG {}",
1287                                    instrument.id()
1288                                ),
1289                            );
1290                            return;
1291                        } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1292                            self.deny_order_list(
1293                                order_list,
1294                                &format!(
1295                                    "SELL when TradingState::REDUCING and SHORT {}",
1296                                    instrument.id()
1297                                ),
1298                            );
1299                            return;
1300                        }
1301                    }
1302                }
1303                _ => {}
1304            },
1305            TradingState::Active => match command {
1306                TradingCommand::SubmitOrder(submit_order) => {
1307                    self.throttled_submit_order.send(submit_order);
1308                }
1309                TradingCommand::SubmitOrderList(submit_order_list) => {
1310                    // TODO: implement throttler for order lists
1311                    self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1312                }
1313                _ => {}
1314            },
1315        }
1316    }
1317
1318    fn send_to_execution(&self, command: TradingCommand) {
1319        msgbus::send_any("ExecEngine.execute".into(), &command);
1320    }
1321
1322    fn handle_event(&mut self, event: OrderEventAny) {
1323        // We intend to extend the risk engine to be able to handle additional events.
1324        // For now we just log.
1325        if self.config.debug {
1326            log::debug!("{RECV}{EVT} {event:?}");
1327        }
1328    }
1329}