nautilus_risk/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Risk management engine implementation.
17
18pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, fmt::Debug, rc::Rc};
24
25use ahash::AHashMap;
26use config::RiskEngineConfig;
27use nautilus_common::{
28    cache::Cache,
29    clock::Clock,
30    logging::{CMD, EVT, RECV},
31    messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
32    msgbus,
33    throttler::Throttler,
34};
35use nautilus_core::UUID4;
36use nautilus_execution::trailing::{
37    trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
38};
39use nautilus_model::{
40    accounts::{Account, AccountAny},
41    enums::{
42        InstrumentClass, OrderSide, OrderStatus, PositionSide, TimeInForce, TradingState,
43        TrailingOffsetType, TriggerType,
44    },
45    events::{OrderDenied, OrderEventAny, OrderModifyRejected},
46    identifiers::InstrumentId,
47    instruments::{Instrument, InstrumentAny},
48    orders::{Order, OrderAny, OrderList},
49    types::{Currency, Money, Price, Quantity, quantity::QuantityRaw},
50};
51use nautilus_portfolio::Portfolio;
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53use ustr::Ustr;
54
55type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
56type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
57
58/// Central risk management engine that validates and controls trading operations.
59///
60/// The `RiskEngine` provides comprehensive pre-trade risk checks including order validation,
61/// balance verification, position sizing limits, and trading state management. It acts as
62/// a gateway between strategy orders and execution, ensuring all trades comply with
63/// defined risk parameters and regulatory constraints.
64#[allow(dead_code)]
65pub struct RiskEngine {
66    clock: Rc<RefCell<dyn Clock>>,
67    cache: Rc<RefCell<Cache>>,
68    portfolio: Portfolio,
69    pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
70    pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
71    max_notional_per_order: AHashMap<InstrumentId, Decimal>,
72    trading_state: TradingState,
73    config: RiskEngineConfig,
74}
75
76impl Debug for RiskEngine {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct(stringify!(RiskEngine)).finish()
79    }
80}
81
82impl RiskEngine {
83    /// Creates a new [`RiskEngine`] instance.
84    pub fn new(
85        config: RiskEngineConfig,
86        portfolio: Portfolio,
87        clock: Rc<RefCell<dyn Clock>>,
88        cache: Rc<RefCell<Cache>>,
89    ) -> Self {
90        let throttled_submit_order =
91            Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
92
93        let throttled_modify_order =
94            Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
95
96        Self {
97            clock,
98            cache,
99            portfolio,
100            throttled_submit_order,
101            throttled_modify_order,
102            max_notional_per_order: AHashMap::new(),
103            trading_state: TradingState::Active,
104            config,
105        }
106    }
107
108    fn create_submit_order_throttler(
109        config: &RiskEngineConfig,
110        clock: Rc<RefCell<dyn Clock>>,
111        cache: Rc<RefCell<Cache>>,
112    ) -> Throttler<SubmitOrder, SubmitOrderFn> {
113        let success_handler = {
114            Box::new(move |submit_order: SubmitOrder| {
115                msgbus::send_any(
116                    "ExecEngine.execute".into(),
117                    &TradingCommand::SubmitOrder(submit_order),
118                );
119            }) as Box<dyn Fn(SubmitOrder)>
120        };
121
122        let failure_handler = {
123            let cache = cache;
124            let clock = clock.clone();
125            Box::new(move |submit_order: SubmitOrder| {
126                let reason = "REJECTED BY THROTTLER";
127                log::warn!(
128                    "SubmitOrder for {} DENIED: {}",
129                    submit_order.client_order_id,
130                    reason
131                );
132
133                Self::handle_submit_order_cache(&cache, &submit_order);
134
135                let denied = Self::create_order_denied(&submit_order, reason, &clock);
136
137                msgbus::send_any("ExecEngine.process".into(), &denied);
138            }) as Box<dyn Fn(SubmitOrder)>
139        };
140
141        Throttler::new(
142            config.max_order_submit.limit,
143            config.max_order_submit.interval_ns,
144            clock,
145            "ORDER_SUBMIT_THROTTLER".to_string(),
146            success_handler,
147            Some(failure_handler),
148            Ustr::from(UUID4::new().as_str()),
149        )
150    }
151
152    fn create_modify_order_throttler(
153        config: &RiskEngineConfig,
154        clock: Rc<RefCell<dyn Clock>>,
155        cache: Rc<RefCell<Cache>>,
156    ) -> Throttler<ModifyOrder, ModifyOrderFn> {
157        let success_handler = {
158            Box::new(move |order: ModifyOrder| {
159                msgbus::send_any(
160                    "ExecEngine.execute".into(),
161                    &TradingCommand::ModifyOrder(order),
162                );
163            }) as Box<dyn Fn(ModifyOrder)>
164        };
165
166        let failure_handler = {
167            let cache = cache;
168            let clock = clock.clone();
169            Box::new(move |order: ModifyOrder| {
170                let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
171                log::warn!(
172                    "SubmitOrder for {} DENIED: {}",
173                    order.client_order_id,
174                    reason
175                );
176
177                let order = match Self::get_existing_order(&cache, &order) {
178                    Some(order) => order,
179                    None => return,
180                };
181
182                let rejected = Self::create_modify_rejected(&order, reason, &clock);
183
184                msgbus::send_any("ExecEngine.process".into(), &rejected);
185            }) as Box<dyn Fn(ModifyOrder)>
186        };
187
188        Throttler::new(
189            config.max_order_modify.limit,
190            config.max_order_modify.interval_ns,
191            clock,
192            "ORDER_MODIFY_THROTTLER".to_string(),
193            success_handler,
194            Some(failure_handler),
195            Ustr::from(UUID4::new().as_str()),
196        )
197    }
198
199    fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
200        let mut cache = cache.borrow_mut();
201        if !cache.order_exists(&submit_order.client_order_id) {
202            cache
203                .add_order(submit_order.order.clone(), None, None, false)
204                .map_err(|e| {
205                    log::error!("Cannot add order to cache: {e}");
206                })
207                .unwrap();
208        }
209    }
210
211    fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
212        let cache = cache.borrow();
213        if let Some(order) = cache.order(&order.client_order_id) {
214            Some(order.clone())
215        } else {
216            log::error!(
217                "Order with command.client_order_id: {} not found",
218                order.client_order_id
219            );
220            None
221        }
222    }
223
224    fn create_order_denied(
225        submit_order: &SubmitOrder,
226        reason: &str,
227        clock: &Rc<RefCell<dyn Clock>>,
228    ) -> OrderEventAny {
229        let timestamp = clock.borrow().timestamp_ns();
230        OrderEventAny::Denied(OrderDenied::new(
231            submit_order.trader_id,
232            submit_order.strategy_id,
233            submit_order.instrument_id,
234            submit_order.client_order_id,
235            reason.into(),
236            UUID4::new(),
237            timestamp,
238            timestamp,
239        ))
240    }
241
242    fn create_modify_rejected(
243        order: &OrderAny,
244        reason: &str,
245        clock: &Rc<RefCell<dyn Clock>>,
246    ) -> OrderEventAny {
247        let timestamp = clock.borrow().timestamp_ns();
248        OrderEventAny::ModifyRejected(OrderModifyRejected::new(
249            order.trader_id(),
250            order.strategy_id(),
251            order.instrument_id(),
252            order.client_order_id(),
253            reason.into(),
254            UUID4::new(),
255            timestamp,
256            timestamp,
257            false,
258            order.venue_order_id(),
259            None,
260        ))
261    }
262
263    // -- COMMANDS --------------------------------------------------------------------------------
264
265    /// Executes a trading command through the risk management pipeline.
266    pub fn execute(&mut self, command: TradingCommand) {
267        // This will extend to other commands such as `RiskCommand`
268        self.handle_command(command);
269    }
270
271    /// Processes an order event for risk monitoring and state updates.
272    pub fn process(&mut self, event: OrderEventAny) {
273        // This will extend to other events such as `RiskEvent`
274        self.handle_event(event);
275    }
276
277    /// Sets the trading state for risk control enforcement.
278    pub fn set_trading_state(&mut self, state: TradingState) {
279        if state == self.trading_state {
280            log::warn!("No change to trading state: already set to {state:?}");
281            return;
282        }
283
284        self.trading_state = state;
285
286        let _ts_now = self.clock.borrow().timestamp_ns();
287
288        // TODO: Create a new Event "TradingStateChanged" in OrderEventAny enum.
289        // let event = OrderEventAny::TradingStateChanged(TradingStateChanged::new(..,self.trading_state,..));
290
291        msgbus::publish("events.risk".into(), &"message"); // TODO: Send the new Event here
292
293        log::info!("Trading state set to {state:?}");
294    }
295
296    /// Sets the maximum notional value per order for the specified instrument.
297    pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
298        self.max_notional_per_order.insert(instrument_id, new_value);
299
300        let new_value_str = new_value.to_string();
301        log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
302    }
303
304    // -- COMMAND HANDLERS ------------------------------------------------------------------------
305
306    // Renamed from `execute_command`
307    fn handle_command(&mut self, command: TradingCommand) {
308        if self.config.debug {
309            log::debug!("{CMD}{RECV} {command:?}");
310        }
311
312        match command {
313            TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
314            TradingCommand::SubmitOrderList(submit_order_list) => {
315                self.handle_submit_order_list(submit_order_list);
316            }
317            TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
318            TradingCommand::QueryAccount(query_account) => {
319                self.send_to_execution(TradingCommand::QueryAccount(query_account));
320            }
321            _ => {
322                log::error!("Cannot handle command: {command}");
323            }
324        }
325    }
326
327    fn handle_submit_order(&mut self, command: SubmitOrder) {
328        if self.config.bypass {
329            self.send_to_execution(TradingCommand::SubmitOrder(command));
330            return;
331        }
332
333        let order = &command.order;
334        if let Some(position_id) = command.position_id
335            && order.is_reduce_only()
336        {
337            let position_exists = {
338                let cache = self.cache.borrow();
339                cache
340                    .position(&position_id)
341                    .map(|pos| (pos.side, pos.quantity))
342            };
343
344            if let Some((pos_side, pos_quantity)) = position_exists {
345                if !order.would_reduce_only(pos_side, pos_quantity) {
346                    self.deny_command(
347                        TradingCommand::SubmitOrder(command),
348                        &format!("Reduce only order would increase position {position_id}"),
349                    );
350                    return; // Denied
351                }
352            } else {
353                self.deny_command(
354                    TradingCommand::SubmitOrder(command),
355                    &format!("Position {position_id} not found for reduce-only order"),
356                );
357                return;
358            }
359        }
360
361        let instrument_exists = {
362            let cache = self.cache.borrow();
363            cache.instrument(&command.instrument_id).cloned()
364        };
365
366        let instrument = if let Some(instrument) = instrument_exists {
367            instrument
368        } else {
369            self.deny_command(
370                TradingCommand::SubmitOrder(command.clone()),
371                &format!("Instrument for {} not found", command.instrument_id),
372            );
373            return; // Denied
374        };
375
376        ////////////////////////////////////////////////////////////////////////////////
377        // PRE-TRADE ORDER(S) CHECKS
378        ////////////////////////////////////////////////////////////////////////////////
379        if !self.check_order(instrument.clone(), order.clone()) {
380            return; // Denied
381        }
382
383        if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
384            return; // Denied
385        }
386
387        // Route through execution gateway for TradingState checks & throttling
388        self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
389    }
390
391    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
392        if self.config.bypass {
393            self.send_to_execution(TradingCommand::SubmitOrderList(command));
394            return;
395        }
396
397        let instrument_exists = {
398            let cache = self.cache.borrow();
399            cache.instrument(&command.instrument_id).cloned()
400        };
401
402        let instrument = if let Some(instrument) = instrument_exists {
403            instrument
404        } else {
405            self.deny_command(
406                TradingCommand::SubmitOrderList(command.clone()),
407                &format!("no instrument found for {}", command.instrument_id),
408            );
409            return; // Denied
410        };
411
412        ////////////////////////////////////////////////////////////////////////////////
413        // PRE-TRADE ORDER(S) CHECKS
414        ////////////////////////////////////////////////////////////////////////////////
415        for order in command.order_list.orders.clone() {
416            if !self.check_order(instrument.clone(), order) {
417                return; // Denied
418            }
419        }
420
421        if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
422            self.deny_order_list(
423                command.order_list.clone(),
424                &format!("OrderList {} DENIED", command.order_list.id),
425            );
426            return; // Denied
427        }
428
429        self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
430    }
431
432    fn handle_modify_order(&mut self, command: ModifyOrder) {
433        ////////////////////////////////////////////////////////////////////////////////
434        // VALIDATE COMMAND
435        ////////////////////////////////////////////////////////////////////////////////
436        let order_exists = {
437            let cache = self.cache.borrow();
438            cache.order(&command.client_order_id).cloned()
439        };
440
441        let order = if let Some(order) = order_exists {
442            order
443        } else {
444            log::error!(
445                "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
446                command.client_order_id
447            );
448            return;
449        };
450
451        if order.is_closed() {
452            self.reject_modify_order(
453                order,
454                &format!(
455                    "Order with command.client_order_id: {} already closed",
456                    command.client_order_id
457                ),
458            );
459            return;
460        } else if order.status() == OrderStatus::PendingCancel {
461            self.reject_modify_order(
462                order,
463                &format!(
464                    "Order with command.client_order_id: {} is already pending cancel",
465                    command.client_order_id
466                ),
467            );
468            return;
469        }
470
471        let maybe_instrument = {
472            let cache = self.cache.borrow();
473            cache.instrument(&command.instrument_id).cloned()
474        };
475
476        let instrument = if let Some(instrument) = maybe_instrument {
477            instrument
478        } else {
479            self.reject_modify_order(
480                order,
481                &format!("no instrument found for {:?}", command.instrument_id),
482            );
483            return; // Denied
484        };
485
486        // Check Price
487        let mut risk_msg = self.check_price(&instrument, command.price);
488        if let Some(risk_msg) = risk_msg {
489            self.reject_modify_order(order, &risk_msg);
490            return; // Denied
491        }
492
493        // Check Trigger
494        risk_msg = self.check_price(&instrument, command.trigger_price);
495        if let Some(risk_msg) = risk_msg {
496            self.reject_modify_order(order, &risk_msg);
497            return; // Denied
498        }
499
500        // Check Quantity
501        risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
502        if let Some(risk_msg) = risk_msg {
503            self.reject_modify_order(order, &risk_msg);
504            return; // Denied
505        }
506
507        // Check TradingState
508        match self.trading_state {
509            TradingState::Halted => {
510                self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
511            }
512            TradingState::Reducing => {
513                if let Some(quantity) = command.quantity
514                    && quantity > order.quantity()
515                    && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
516                        || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
517                {
518                    self.reject_modify_order(
519                        order,
520                        &format!(
521                            "TradingState is REDUCING and update will increase exposure {}",
522                            instrument.id()
523                        ),
524                    );
525                }
526            }
527            _ => {}
528        }
529
530        self.throttled_modify_order.send(command);
531    }
532
533    // -- PRE-TRADE CHECKS ------------------------------------------------------------------------
534
535    fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
536        ////////////////////////////////////////////////////////////////////////////////
537        // VALIDATION CHECKS
538        ////////////////////////////////////////////////////////////////////////////////
539        if order.time_in_force() == TimeInForce::Gtd {
540            // SAFETY: GTD guarantees an expire time
541            let expire_time = order.expire_time().unwrap();
542            if expire_time <= self.clock.borrow().timestamp_ns() {
543                self.deny_order(
544                    order,
545                    &format!("GTD {} already past", expire_time.to_rfc3339()),
546                );
547                return false; // Denied
548            }
549        }
550
551        if !self.check_order_price(instrument.clone(), order.clone())
552            || !self.check_order_quantity(instrument, order)
553        {
554            return false; // Denied
555        }
556
557        true
558    }
559
560    fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
561        ////////////////////////////////////////////////////////////////////////////////
562        // CHECK PRICE
563        ////////////////////////////////////////////////////////////////////////////////
564        if order.price().is_some() {
565            let risk_msg = self.check_price(&instrument, order.price());
566            if let Some(risk_msg) = risk_msg {
567                self.deny_order(order, &risk_msg);
568                return false; // Denied
569            }
570        }
571
572        ////////////////////////////////////////////////////////////////////////////////
573        // CHECK TRIGGER
574        ////////////////////////////////////////////////////////////////////////////////
575        if order.trigger_price().is_some() {
576            let risk_msg = self.check_price(&instrument, order.trigger_price());
577            if let Some(risk_msg) = risk_msg {
578                self.deny_order(order, &risk_msg);
579                return false; // Denied
580            }
581        }
582
583        true
584    }
585
586    fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
587        let risk_msg = self.check_quantity(
588            &instrument,
589            Some(order.quantity()),
590            order.is_quote_quantity(),
591        );
592        if let Some(risk_msg) = risk_msg {
593            self.deny_order(order, &risk_msg);
594            return false; // Denied
595        }
596
597        true
598    }
599
600    fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
601        ////////////////////////////////////////////////////////////////////////////////
602        // CHECK TRIGGER
603        ////////////////////////////////////////////////////////////////////////////////
604        let mut last_px: Option<Price> = None;
605        let mut max_notional: Option<Money> = None;
606
607        // Determine max notional
608        let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
609        if let Some(max_notional_setting_val) = max_notional_setting.copied() {
610            max_notional = Some(Money::new(
611                max_notional_setting_val
612                    .to_f64()
613                    .expect("Invalid decimal conversion"),
614                instrument.quote_currency(),
615            ));
616        }
617
618        // Get account for risk checks
619        let account_exists = {
620            let cache = self.cache.borrow();
621            cache.account_for_venue(&instrument.id().venue).cloned()
622        };
623
624        let account = if let Some(account) = account_exists {
625            account
626        } else {
627            log::debug!("Cannot find account for venue {}", instrument.id().venue);
628            return true; // TODO: Temporary early return until handling routing/multiple venues
629        };
630        let cash_account = match account {
631            AccountAny::Cash(cash_account) => cash_account,
632            AccountAny::Margin(_) => return true, // TODO: Determine risk controls for margin
633        };
634        let free = cash_account.balance_free(Some(instrument.quote_currency()));
635        let allow_borrowing = cash_account.allow_borrowing;
636        if self.config.debug {
637            log::debug!("Free cash: {free:?}");
638        }
639
640        // Get net LONG position quantity for this instrument (for position-reducing sell checks),
641        // accounting for already submitted (but unfilled) SELL orders to prevent overselling.
642        let (net_long_qty_raw, pending_sell_qty_raw) = {
643            let cache = self.cache.borrow();
644            let long_qty: QuantityRaw = cache
645                .positions_open(None, Some(&instrument.id()), None, Some(PositionSide::Long))
646                .iter()
647                .map(|pos| pos.quantity.raw)
648                .sum();
649            let pending_sells: QuantityRaw = cache
650                .orders_open(None, Some(&instrument.id()), None, Some(OrderSide::Sell))
651                .iter()
652                .map(|ord| ord.leaves_qty().raw)
653                .sum();
654            (long_qty, pending_sells)
655        };
656
657        // Available quantity is long position minus pending sells
658        let available_long_qty_raw = net_long_qty_raw.saturating_sub(pending_sell_qty_raw);
659
660        if self.config.debug && net_long_qty_raw > 0 {
661            log::debug!(
662                "Net LONG qty (raw): {net_long_qty_raw}, pending sells: {pending_sell_qty_raw}, available: {available_long_qty_raw}"
663            );
664        }
665
666        // Track cumulative sell quantity to determine position-reducing vs position-opening sells
667        let mut cum_sell_qty_raw: QuantityRaw = 0;
668
669        let mut cum_notional_buy: Option<Money> = None;
670        let mut cum_notional_sell: Option<Money> = None;
671        let mut base_currency: Option<Currency> = None;
672        for order in &orders {
673            // Determine last price based on order type
674            last_px = match order {
675                OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
676                    if last_px.is_none() {
677                        let cache = self.cache.borrow();
678                        if let Some(last_quote) = cache.quote(&instrument.id()) {
679                            match order.order_side() {
680                                OrderSide::Buy => Some(last_quote.ask_price),
681                                OrderSide::Sell => Some(last_quote.bid_price),
682                                _ => panic!("Invalid order side"),
683                            }
684                        } else {
685                            let cache = self.cache.borrow();
686                            let last_trade = cache.trade(&instrument.id());
687
688                            if let Some(last_trade) = last_trade {
689                                Some(last_trade.price)
690                            } else {
691                                log::warn!(
692                                    "Cannot check MARKET order risk: no prices for {}",
693                                    instrument.id()
694                                );
695                                continue;
696                            }
697                        }
698                    } else {
699                        last_px
700                    }
701                }
702                OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
703                OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
704                    if let Some(trigger_price) = order.trigger_price() {
705                        Some(trigger_price)
706                    } else {
707                        // Validate trailing offset type is supported
708                        let offset_type = order.trailing_offset_type().unwrap();
709                        if !matches!(
710                            offset_type,
711                            TrailingOffsetType::Price
712                                | TrailingOffsetType::BasisPoints
713                                | TrailingOffsetType::Ticks
714                        ) {
715                            self.deny_order(
716                                order.clone(),
717                                &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
718                            );
719                            return false;
720                        }
721
722                        let trigger_type = order.trigger_type().unwrap();
723                        let cache = self.cache.borrow();
724
725                        if trigger_type == TriggerType::BidAsk {
726                            if let Some(quote) = cache.quote(&instrument.id()) {
727                                match trailing_stop_calculate_with_bid_ask(
728                                    instrument.price_increment(),
729                                    order.trailing_offset_type().unwrap(),
730                                    order.order_side_specified(),
731                                    order.trailing_offset().unwrap(),
732                                    quote.bid_price,
733                                    quote.ask_price,
734                                ) {
735                                    Ok(calculated_trigger) => Some(calculated_trigger),
736                                    Err(e) => {
737                                        log::warn!(
738                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
739                                            order.order_type()
740                                        );
741                                        continue;
742                                    }
743                                }
744                            } else {
745                                log::warn!(
746                                    "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
747                                    order.order_type(),
748                                    instrument.id()
749                                );
750                                continue;
751                            }
752                        } else if let Some(last_trade) = cache.trade(&instrument.id()) {
753                            match trailing_stop_calculate_with_last(
754                                instrument.price_increment(),
755                                order.trailing_offset_type().unwrap(),
756                                order.order_side_specified(),
757                                order.trailing_offset().unwrap(),
758                                last_trade.price,
759                            ) {
760                                Ok(calculated_trigger) => Some(calculated_trigger),
761                                Err(e) => {
762                                    log::warn!(
763                                        "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
764                                        order.order_type(),
765                                        e
766                                    );
767                                    continue;
768                                }
769                            }
770                        } else if trigger_type == TriggerType::LastOrBidAsk {
771                            // Fallback to bid/ask when no trade data available
772                            if let Some(quote) = cache.quote(&instrument.id()) {
773                                match trailing_stop_calculate_with_bid_ask(
774                                    instrument.price_increment(),
775                                    order.trailing_offset_type().unwrap(),
776                                    order.order_side_specified(),
777                                    order.trailing_offset().unwrap(),
778                                    quote.bid_price,
779                                    quote.ask_price,
780                                ) {
781                                    Ok(calculated_trigger) => Some(calculated_trigger),
782                                    Err(e) => {
783                                        log::warn!(
784                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
785                                            order.order_type()
786                                        );
787                                        continue;
788                                    }
789                                }
790                            } else {
791                                log::warn!(
792                                    "Cannot check {} order risk: no trigger price set and no market data available for {}",
793                                    order.order_type(),
794                                    instrument.id()
795                                );
796                                continue;
797                            }
798                        } else {
799                            log::warn!(
800                                "Cannot check {} order risk: no trigger price set and no market data available for {}",
801                                order.order_type(),
802                                instrument.id()
803                            );
804                            continue;
805                        }
806                    }
807                }
808                _ => order.price(),
809            };
810
811            let last_px = if let Some(px) = last_px {
812                px
813            } else {
814                log::error!("Cannot check order risk: no price available");
815                continue;
816            };
817
818            // For quote quantity limit orders, use worst-case execution price
819            let effective_price = if order.is_quote_quantity()
820                && !instrument.is_inverse()
821                && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
822            {
823                // Get current market price for worst-case execution
824                let cache = self.cache.borrow();
825                if let Some(quote_tick) = cache.quote(&instrument.id()) {
826                    match order.order_side() {
827                        // BUY: could execute at best ask if below limit (more quantity)
828                        OrderSide::Buy => last_px.min(quote_tick.ask_price),
829                        // SELL: could execute at best bid if above limit (but less quantity, so use limit)
830                        OrderSide::Sell => last_px.max(quote_tick.bid_price),
831                        _ => last_px,
832                    }
833                } else {
834                    last_px // No market data, use limit price
835                }
836            } else {
837                last_px
838            };
839
840            let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
841                instrument.calculate_base_quantity(order.quantity(), effective_price)
842            } else {
843                order.quantity()
844            };
845
846            // Check min/max quantity against effective quantity
847            if let Some(max_quantity) = instrument.max_quantity()
848                && effective_quantity > max_quantity
849            {
850                self.deny_order(
851                    order.clone(),
852                    &format!(
853                        "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
854                    ),
855                );
856                return false; // Denied
857            }
858
859            if let Some(min_quantity) = instrument.min_quantity()
860                && effective_quantity < min_quantity
861            {
862                self.deny_order(
863                    order.clone(),
864                    &format!(
865                        "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
866                    ),
867                );
868                return false; // Denied
869            }
870
871            let notional =
872                instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
873
874            if self.config.debug {
875                log::debug!("Notional: {notional:?}");
876            }
877
878            // Check MAX notional per order limit
879            if let Some(max_notional_value) = max_notional
880                && notional > max_notional_value
881            {
882                self.deny_order(
883                        order.clone(),
884                        &format!(
885                            "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
886                        ),
887                    );
888                return false; // Denied
889            }
890
891            // Check MIN notional instrument limit
892            if let Some(min_notional) = instrument.min_notional()
893                && notional.currency == min_notional.currency
894                && notional < min_notional
895            {
896                self.deny_order(
897                        order.clone(),
898                        &format!(
899                            "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
900                        ),
901                    );
902                return false; // Denied
903            }
904
905            // // Check MAX notional instrument limit
906            if let Some(max_notional) = instrument.max_notional()
907                && notional.currency == max_notional.currency
908                && notional > max_notional
909            {
910                self.deny_order(
911                        order.clone(),
912                        &format!(
913                            "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
914                        ),
915                    );
916                return false; // Denied
917            }
918
919            // Calculate OrderBalanceImpact (valid for CashAccount only)
920            let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
921            let order_balance_impact = match order.order_side() {
922                OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
923                OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
924                OrderSide::NoOrderSide => {
925                    panic!("invalid `OrderSide`, was {}", order.order_side());
926                }
927            };
928
929            if self.config.debug {
930                log::debug!("Balance impact: {order_balance_impact}");
931            }
932
933            // Skip balance check when borrowing is enabled (e.g. spot margin trading)
934            if !allow_borrowing
935                && let Some(free_val) = free
936                && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
937            {
938                self.deny_order(
939                    order.clone(),
940                    &format!(
941                        "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
942                    ),
943                );
944                return false;
945            }
946
947            if base_currency.is_none() {
948                base_currency = instrument.base_currency();
949            }
950            if order.is_buy() {
951                match cum_notional_buy.as_mut() {
952                    Some(cum_notional_buy_val) => {
953                        cum_notional_buy_val.raw += -order_balance_impact.raw;
954                    }
955                    None => {
956                        cum_notional_buy = Some(Money::from_raw(
957                            -order_balance_impact.raw,
958                            order_balance_impact.currency,
959                        ));
960                    }
961                }
962
963                if self.config.debug {
964                    log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
965                }
966
967                if !allow_borrowing
968                    && let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
969                    && cum_notional_buy > free
970                {
971                    self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
972                    return false; // Denied
973                }
974            } else if order.is_sell() {
975                let is_position_reducing_sell = order.is_reduce_only()
976                    || (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw;
977                cum_sell_qty_raw += effective_quantity.raw;
978
979                if is_position_reducing_sell {
980                    if self.config.debug {
981                        log::debug!("Position-reducing SELL skips balance check");
982                    }
983                    continue;
984                }
985
986                if cash_account.base_currency.is_some() {
987                    match cum_notional_sell.as_mut() {
988                        Some(cum_notional_buy_val) => {
989                            cum_notional_buy_val.raw += order_balance_impact.raw;
990                        }
991                        None => {
992                            cum_notional_sell = Some(Money::from_raw(
993                                order_balance_impact.raw,
994                                order_balance_impact.currency,
995                            ));
996                        }
997                    }
998                    if self.config.debug {
999                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1000                    }
1001
1002                    if !allow_borrowing
1003                        && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1004                        && cum_notional_sell > free
1005                    {
1006                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1007                        return false; // Denied
1008                    }
1009                }
1010                // Account is already of type Cash, so no check
1011                else if let Some(base_currency) = base_currency {
1012                    let cash_value = Money::from_raw(
1013                        effective_quantity
1014                            .raw
1015                            .try_into()
1016                            .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
1017                            .unwrap(),
1018                        base_currency,
1019                    );
1020
1021                    if self.config.debug {
1022                        log::debug!("Cash value: {cash_value:?}");
1023                        log::debug!(
1024                            "Total: {:?}",
1025                            cash_account.balance_total(Some(base_currency))
1026                        );
1027                        log::debug!(
1028                            "Locked: {:?}",
1029                            cash_account.balance_locked(Some(base_currency))
1030                        );
1031                        log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1032                    }
1033
1034                    match cum_notional_sell {
1035                        Some(mut value) => {
1036                            value.raw += cash_value.raw;
1037                            cum_notional_sell = Some(value);
1038                        }
1039                        None => cum_notional_sell = Some(cash_value),
1040                    }
1041
1042                    if self.config.debug {
1043                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1044                    }
1045                    if !allow_borrowing
1046                        && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1047                        && cum_notional_sell.raw > free.raw
1048                    {
1049                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1050                        return false; // Denied
1051                    }
1052                }
1053            }
1054        }
1055
1056        // Finally
1057        true // Passed
1058    }
1059
1060    fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1061        let price_val = price?;
1062
1063        if price_val.precision > instrument.price_precision() {
1064            return Some(format!(
1065                "price {} invalid (precision {} > {})",
1066                price_val,
1067                price_val.precision,
1068                instrument.price_precision()
1069            ));
1070        }
1071
1072        if !matches!(
1073            instrument.instrument_class(),
1074            InstrumentClass::Option
1075                | InstrumentClass::FuturesSpread
1076                | InstrumentClass::OptionSpread
1077        ) && price_val.raw <= 0
1078        {
1079            return Some(format!("price {price_val} invalid (<= 0)"));
1080        }
1081
1082        None
1083    }
1084
1085    fn check_quantity(
1086        &self,
1087        instrument: &InstrumentAny,
1088        quantity: Option<Quantity>,
1089        is_quote_quantity: bool,
1090    ) -> Option<String> {
1091        let quantity_val = quantity?;
1092
1093        // Check precision
1094        if quantity_val.precision > instrument.size_precision() {
1095            return Some(format!(
1096                "quantity {} invalid (precision {} > {})",
1097                quantity_val,
1098                quantity_val.precision,
1099                instrument.size_precision()
1100            ));
1101        }
1102
1103        // Skip min/max checks for quote quantities (they will be checked in check_orders_risk using effective_quantity)
1104        if is_quote_quantity {
1105            return None;
1106        }
1107
1108        // Check maximum quantity
1109        if let Some(max_quantity) = instrument.max_quantity()
1110            && quantity_val > max_quantity
1111        {
1112            return Some(format!(
1113                "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1114            ));
1115        }
1116
1117        // Check minimum quantity
1118        if let Some(min_quantity) = instrument.min_quantity()
1119            && quantity_val < min_quantity
1120        {
1121            return Some(format!(
1122                "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1123            ));
1124        }
1125
1126        None
1127    }
1128
1129    // -- DENIALS ---------------------------------------------------------------------------------
1130
1131    fn deny_command(&self, command: TradingCommand, reason: &str) {
1132        match command {
1133            TradingCommand::SubmitOrder(command) => {
1134                self.deny_order(command.order, reason);
1135            }
1136            TradingCommand::SubmitOrderList(command) => {
1137                self.deny_order_list(command.order_list, reason);
1138            }
1139            _ => {
1140                panic!("Cannot deny command {command}");
1141            }
1142        }
1143    }
1144
1145    fn deny_order(&self, order: OrderAny, reason: &str) {
1146        log::warn!(
1147            "SubmitOrder for {} DENIED: {}",
1148            order.client_order_id(),
1149            reason
1150        );
1151
1152        if order.status() != OrderStatus::Initialized {
1153            return;
1154        }
1155
1156        // Scope the cache borrow to avoid RefCell conflict when sending to ExecEngine
1157        {
1158            let mut cache = self.cache.borrow_mut();
1159            if !cache.order_exists(&order.client_order_id()) {
1160                cache
1161                    .add_order(order.clone(), None, None, false)
1162                    .map_err(|e| {
1163                        log::error!("Cannot add order to cache: {e}");
1164                    })
1165                    .unwrap();
1166            }
1167        }
1168
1169        let denied = OrderEventAny::Denied(OrderDenied::new(
1170            order.trader_id(),
1171            order.strategy_id(),
1172            order.instrument_id(),
1173            order.client_order_id(),
1174            reason.into(),
1175            UUID4::new(),
1176            self.clock.borrow().timestamp_ns(),
1177            self.clock.borrow().timestamp_ns(),
1178        ));
1179
1180        msgbus::send_any("ExecEngine.process".into(), &denied);
1181    }
1182
1183    fn deny_order_list(&self, order_list: OrderList, reason: &str) {
1184        for order in order_list.orders {
1185            if !order.is_closed() {
1186                self.deny_order(order, reason);
1187            }
1188        }
1189    }
1190
1191    fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1192        let ts_event = self.clock.borrow().timestamp_ns();
1193        let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1194            order.trader_id(),
1195            order.strategy_id(),
1196            order.instrument_id(),
1197            order.client_order_id(),
1198            reason.into(),
1199            UUID4::new(),
1200            ts_event,
1201            ts_event,
1202            false,
1203            order.venue_order_id(),
1204            order.account_id(),
1205        ));
1206
1207        msgbus::send_any("ExecEngine.process".into(), &denied);
1208    }
1209
1210    // -- EGRESS ----------------------------------------------------------------------------------
1211
1212    fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1213        match self.trading_state {
1214            TradingState::Halted => match command {
1215                TradingCommand::SubmitOrder(submit_order) => {
1216                    self.deny_order(submit_order.order, "TradingState::HALTED");
1217                }
1218                TradingCommand::SubmitOrderList(submit_order_list) => {
1219                    self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1220                }
1221                _ => {}
1222            },
1223            TradingState::Reducing => match command {
1224                TradingCommand::SubmitOrder(submit_order) => {
1225                    let order = submit_order.order;
1226                    if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1227                        self.deny_order(
1228                            order,
1229                            &format!(
1230                                "BUY when TradingState::REDUCING and LONG {}",
1231                                instrument.id()
1232                            ),
1233                        );
1234                    } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1235                        self.deny_order(
1236                            order,
1237                            &format!(
1238                                "SELL when TradingState::REDUCING and SHORT {}",
1239                                instrument.id()
1240                            ),
1241                        );
1242                    }
1243                }
1244                TradingCommand::SubmitOrderList(submit_order_list) => {
1245                    let order_list = submit_order_list.order_list;
1246                    for order in &order_list.orders {
1247                        if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1248                            self.deny_order_list(
1249                                order_list,
1250                                &format!(
1251                                    "BUY when TradingState::REDUCING and LONG {}",
1252                                    instrument.id()
1253                                ),
1254                            );
1255                            return;
1256                        } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1257                            self.deny_order_list(
1258                                order_list,
1259                                &format!(
1260                                    "SELL when TradingState::REDUCING and SHORT {}",
1261                                    instrument.id()
1262                                ),
1263                            );
1264                            return;
1265                        }
1266                    }
1267                }
1268                _ => {}
1269            },
1270            TradingState::Active => match command {
1271                TradingCommand::SubmitOrder(submit_order) => {
1272                    self.throttled_submit_order.send(submit_order);
1273                }
1274                TradingCommand::SubmitOrderList(submit_order_list) => {
1275                    // TODO: implement throttler for order lists
1276                    self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1277                }
1278                _ => {}
1279            },
1280        }
1281    }
1282
1283    fn send_to_execution(&self, command: TradingCommand) {
1284        msgbus::send_any("ExecEngine.execute".into(), &command);
1285    }
1286
1287    fn handle_event(&mut self, event: OrderEventAny) {
1288        // We intend to extend the risk engine to be able to handle additional events.
1289        // For now we just log.
1290        if self.config.debug {
1291            log::debug!("{RECV}{EVT} {event:?}");
1292        }
1293    }
1294}