nautilus_risk/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Risk management engine implementation.
17
18pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
24
25use config::RiskEngineConfig;
26use nautilus_common::{
27    cache::Cache,
28    clock::Clock,
29    logging::{CMD, EVT, RECV},
30    messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31    msgbus,
32    throttler::Throttler,
33};
34use nautilus_core::UUID4;
35use nautilus_execution::trailing::{
36    trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
37};
38use nautilus_model::{
39    accounts::{Account, AccountAny},
40    enums::{
41        InstrumentClass, OrderSide, OrderStatus, TimeInForce, TradingState, TrailingOffsetType,
42        TriggerType,
43    },
44    events::{OrderDenied, OrderEventAny, OrderModifyRejected},
45    identifiers::InstrumentId,
46    instruments::{Instrument, InstrumentAny},
47    orders::{Order, OrderAny, OrderList},
48    types::{Currency, Money, Price, Quantity},
49};
50use nautilus_portfolio::Portfolio;
51use rust_decimal::{Decimal, prelude::ToPrimitive};
52use ustr::Ustr;
53
54type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
55type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
56
57/// Central risk management engine that validates and controls trading operations.
58///
59/// The `RiskEngine` provides comprehensive pre-trade risk checks including order validation,
60/// balance verification, position sizing limits, and trading state management. It acts as
61/// a gateway between strategy orders and execution, ensuring all trades comply with
62/// defined risk parameters and regulatory constraints.
63#[allow(dead_code)]
64pub struct RiskEngine {
65    clock: Rc<RefCell<dyn Clock>>,
66    cache: Rc<RefCell<Cache>>,
67    portfolio: Portfolio,
68    pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
69    pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
70    max_notional_per_order: HashMap<InstrumentId, Decimal>,
71    trading_state: TradingState,
72    config: RiskEngineConfig,
73}
74
75impl Debug for RiskEngine {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct(stringify!(RiskEngine)).finish()
78    }
79}
80
81impl RiskEngine {
82    /// Creates a new [`RiskEngine`] instance.
83    pub fn new(
84        config: RiskEngineConfig,
85        portfolio: Portfolio,
86        clock: Rc<RefCell<dyn Clock>>,
87        cache: Rc<RefCell<Cache>>,
88    ) -> Self {
89        let throttled_submit_order =
90            Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
91
92        let throttled_modify_order =
93            Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
94
95        Self {
96            clock,
97            cache,
98            portfolio,
99            throttled_submit_order,
100            throttled_modify_order,
101            max_notional_per_order: HashMap::new(),
102            trading_state: TradingState::Active,
103            config,
104        }
105    }
106
107    fn create_submit_order_throttler(
108        config: &RiskEngineConfig,
109        clock: Rc<RefCell<dyn Clock>>,
110        cache: Rc<RefCell<Cache>>,
111    ) -> Throttler<SubmitOrder, SubmitOrderFn> {
112        let success_handler = {
113            Box::new(move |submit_order: SubmitOrder| {
114                msgbus::send_any(
115                    "ExecEngine.execute".into(),
116                    &TradingCommand::SubmitOrder(submit_order),
117                );
118            }) as Box<dyn Fn(SubmitOrder)>
119        };
120
121        let failure_handler = {
122            let cache = cache;
123            let clock = clock.clone();
124            Box::new(move |submit_order: SubmitOrder| {
125                let reason = "REJECTED BY THROTTLER";
126                log::warn!(
127                    "SubmitOrder for {} DENIED: {}",
128                    submit_order.client_order_id,
129                    reason
130                );
131
132                Self::handle_submit_order_cache(&cache, &submit_order);
133
134                let denied = Self::create_order_denied(&submit_order, reason, &clock);
135
136                msgbus::send_any("ExecEngine.process".into(), &denied);
137            }) as Box<dyn Fn(SubmitOrder)>
138        };
139
140        Throttler::new(
141            config.max_order_submit.limit,
142            config.max_order_submit.interval_ns,
143            clock,
144            "ORDER_SUBMIT_THROTTLER".to_string(),
145            success_handler,
146            Some(failure_handler),
147            Ustr::from(&UUID4::new().to_string()),
148        )
149    }
150
151    fn create_modify_order_throttler(
152        config: &RiskEngineConfig,
153        clock: Rc<RefCell<dyn Clock>>,
154        cache: Rc<RefCell<Cache>>,
155    ) -> Throttler<ModifyOrder, ModifyOrderFn> {
156        let success_handler = {
157            Box::new(move |order: ModifyOrder| {
158                msgbus::send_any(
159                    "ExecEngine.execute".into(),
160                    &TradingCommand::ModifyOrder(order),
161                );
162            }) as Box<dyn Fn(ModifyOrder)>
163        };
164
165        let failure_handler = {
166            let cache = cache;
167            let clock = clock.clone();
168            Box::new(move |order: ModifyOrder| {
169                let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
170                log::warn!(
171                    "SubmitOrder for {} DENIED: {}",
172                    order.client_order_id,
173                    reason
174                );
175
176                let order = match Self::get_existing_order(&cache, &order) {
177                    Some(order) => order,
178                    None => return,
179                };
180
181                let rejected = Self::create_modify_rejected(&order, reason, &clock);
182
183                msgbus::send_any("ExecEngine.process".into(), &rejected);
184            }) as Box<dyn Fn(ModifyOrder)>
185        };
186
187        Throttler::new(
188            config.max_order_modify.limit,
189            config.max_order_modify.interval_ns,
190            clock,
191            "ORDER_MODIFY_THROTTLER".to_string(),
192            success_handler,
193            Some(failure_handler),
194            Ustr::from(&UUID4::new().to_string()),
195        )
196    }
197
198    fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
199        let mut cache = cache.borrow_mut();
200        if !cache.order_exists(&submit_order.client_order_id) {
201            cache
202                .add_order(submit_order.order.clone(), None, None, false)
203                .map_err(|e| {
204                    log::error!("Cannot add order to cache: {e}");
205                })
206                .unwrap();
207        }
208    }
209
210    fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
211        let cache = cache.borrow();
212        if let Some(order) = cache.order(&order.client_order_id) {
213            Some(order.clone())
214        } else {
215            log::error!(
216                "Order with command.client_order_id: {} not found",
217                order.client_order_id
218            );
219            None
220        }
221    }
222
223    fn create_order_denied(
224        submit_order: &SubmitOrder,
225        reason: &str,
226        clock: &Rc<RefCell<dyn Clock>>,
227    ) -> OrderEventAny {
228        let timestamp = clock.borrow().timestamp_ns();
229        OrderEventAny::Denied(OrderDenied::new(
230            submit_order.trader_id,
231            submit_order.strategy_id,
232            submit_order.instrument_id,
233            submit_order.client_order_id,
234            reason.into(),
235            UUID4::new(),
236            timestamp,
237            timestamp,
238        ))
239    }
240
241    fn create_modify_rejected(
242        order: &OrderAny,
243        reason: &str,
244        clock: &Rc<RefCell<dyn Clock>>,
245    ) -> OrderEventAny {
246        let timestamp = clock.borrow().timestamp_ns();
247        OrderEventAny::ModifyRejected(OrderModifyRejected::new(
248            order.trader_id(),
249            order.strategy_id(),
250            order.instrument_id(),
251            order.client_order_id(),
252            reason.into(),
253            UUID4::new(),
254            timestamp,
255            timestamp,
256            false,
257            order.venue_order_id(),
258            None,
259        ))
260    }
261
262    // -- COMMANDS --------------------------------------------------------------------------------
263
264    /// Executes a trading command through the risk management pipeline.
265    pub fn execute(&mut self, command: TradingCommand) {
266        // This will extend to other commands such as `RiskCommand`
267        self.handle_command(command);
268    }
269
270    /// Processes an order event for risk monitoring and state updates.
271    pub fn process(&mut self, event: OrderEventAny) {
272        // This will extend to other events such as `RiskEvent`
273        self.handle_event(event);
274    }
275
276    /// Sets the trading state for risk control enforcement.
277    pub fn set_trading_state(&mut self, state: TradingState) {
278        if state == self.trading_state {
279            log::warn!("No change to trading state: already set to {state:?}");
280            return;
281        }
282
283        self.trading_state = state;
284
285        let _ts_now = self.clock.borrow().timestamp_ns();
286
287        // TODO: Create a new Event "TradingStateChanged" in OrderEventAny enum.
288        // let event = OrderEventAny::TradingStateChanged(TradingStateChanged::new(..,self.trading_state,..));
289
290        msgbus::publish("events.risk".into(), &"message"); // TODO: Send the new Event here
291
292        log::info!("Trading state set to {state:?}");
293    }
294
295    /// Sets the maximum notional value per order for the specified instrument.
296    pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
297        self.max_notional_per_order.insert(instrument_id, new_value);
298
299        let new_value_str = new_value.to_string();
300        log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
301    }
302
303    // -- COMMAND HANDLERS ------------------------------------------------------------------------
304
305    // Renamed from `execute_command`
306    fn handle_command(&mut self, command: TradingCommand) {
307        if self.config.debug {
308            log::debug!("{CMD}{RECV} {command:?}");
309        }
310
311        match command {
312            TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
313            TradingCommand::SubmitOrderList(submit_order_list) => {
314                self.handle_submit_order_list(submit_order_list);
315            }
316            TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
317            TradingCommand::QueryAccount(query_account) => {
318                self.send_to_execution(TradingCommand::QueryAccount(query_account));
319            }
320            _ => {
321                log::error!("Cannot handle command: {command}");
322            }
323        }
324    }
325
326    fn handle_submit_order(&mut self, command: SubmitOrder) {
327        if self.config.bypass {
328            self.send_to_execution(TradingCommand::SubmitOrder(command));
329            return;
330        }
331
332        let order = &command.order;
333        if let Some(position_id) = command.position_id
334            && order.is_reduce_only()
335        {
336            let position_exists = {
337                let cache = self.cache.borrow();
338                cache
339                    .position(&position_id)
340                    .map(|pos| (pos.side, pos.quantity))
341            };
342
343            if let Some((pos_side, pos_quantity)) = position_exists {
344                if !order.would_reduce_only(pos_side, pos_quantity) {
345                    self.deny_command(
346                        TradingCommand::SubmitOrder(command),
347                        &format!("Reduce only order would increase position {position_id}"),
348                    );
349                    return; // Denied
350                }
351            } else {
352                self.deny_command(
353                    TradingCommand::SubmitOrder(command),
354                    &format!("Position {position_id} not found for reduce-only order"),
355                );
356                return;
357            }
358        }
359
360        let instrument_exists = {
361            let cache = self.cache.borrow();
362            cache.instrument(&command.instrument_id).cloned()
363        };
364
365        let instrument = if let Some(instrument) = instrument_exists {
366            instrument
367        } else {
368            self.deny_command(
369                TradingCommand::SubmitOrder(command.clone()),
370                &format!("Instrument for {} not found", command.instrument_id),
371            );
372            return; // Denied
373        };
374
375        ////////////////////////////////////////////////////////////////////////////////
376        // PRE-TRADE ORDER(S) CHECKS
377        ////////////////////////////////////////////////////////////////////////////////
378        if !self.check_order(instrument.clone(), order.clone()) {
379            return; // Denied
380        }
381
382        if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
383            return; // Denied
384        }
385
386        // Route through execution gateway for TradingState checks & throttling
387        self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
388    }
389
390    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
391        if self.config.bypass {
392            self.send_to_execution(TradingCommand::SubmitOrderList(command));
393            return;
394        }
395
396        let instrument_exists = {
397            let cache = self.cache.borrow();
398            cache.instrument(&command.instrument_id).cloned()
399        };
400
401        let instrument = if let Some(instrument) = instrument_exists {
402            instrument
403        } else {
404            self.deny_command(
405                TradingCommand::SubmitOrderList(command.clone()),
406                &format!("no instrument found for {}", command.instrument_id),
407            );
408            return; // Denied
409        };
410
411        ////////////////////////////////////////////////////////////////////////////////
412        // PRE-TRADE ORDER(S) CHECKS
413        ////////////////////////////////////////////////////////////////////////////////
414        for order in command.order_list.orders.clone() {
415            if !self.check_order(instrument.clone(), order) {
416                return; // Denied
417            }
418        }
419
420        if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
421            self.deny_order_list(
422                command.order_list.clone(),
423                &format!("OrderList {} DENIED", command.order_list.id),
424            );
425            return; // Denied
426        }
427
428        self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
429    }
430
431    fn handle_modify_order(&mut self, command: ModifyOrder) {
432        ////////////////////////////////////////////////////////////////////////////////
433        // VALIDATE COMMAND
434        ////////////////////////////////////////////////////////////////////////////////
435        let order_exists = {
436            let cache = self.cache.borrow();
437            cache.order(&command.client_order_id).cloned()
438        };
439
440        let order = if let Some(order) = order_exists {
441            order
442        } else {
443            log::error!(
444                "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
445                command.client_order_id
446            );
447            return;
448        };
449
450        if order.is_closed() {
451            self.reject_modify_order(
452                order,
453                &format!(
454                    "Order with command.client_order_id: {} already closed",
455                    command.client_order_id
456                ),
457            );
458            return;
459        } else if order.status() == OrderStatus::PendingCancel {
460            self.reject_modify_order(
461                order,
462                &format!(
463                    "Order with command.client_order_id: {} is already pending cancel",
464                    command.client_order_id
465                ),
466            );
467            return;
468        }
469
470        let maybe_instrument = {
471            let cache = self.cache.borrow();
472            cache.instrument(&command.instrument_id).cloned()
473        };
474
475        let instrument = if let Some(instrument) = maybe_instrument {
476            instrument
477        } else {
478            self.reject_modify_order(
479                order,
480                &format!("no instrument found for {:?}", command.instrument_id),
481            );
482            return; // Denied
483        };
484
485        // Check Price
486        let mut risk_msg = self.check_price(&instrument, command.price);
487        if let Some(risk_msg) = risk_msg {
488            self.reject_modify_order(order, &risk_msg);
489            return; // Denied
490        }
491
492        // Check Trigger
493        risk_msg = self.check_price(&instrument, command.trigger_price);
494        if let Some(risk_msg) = risk_msg {
495            self.reject_modify_order(order, &risk_msg);
496            return; // Denied
497        }
498
499        // Check Quantity
500        risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
501        if let Some(risk_msg) = risk_msg {
502            self.reject_modify_order(order, &risk_msg);
503            return; // Denied
504        }
505
506        // Check TradingState
507        match self.trading_state {
508            TradingState::Halted => {
509                self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
510            }
511            TradingState::Reducing => {
512                if let Some(quantity) = command.quantity
513                    && quantity > order.quantity()
514                    && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
515                        || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
516                {
517                    self.reject_modify_order(
518                        order,
519                        &format!(
520                            "TradingState is REDUCING and update will increase exposure {}",
521                            instrument.id()
522                        ),
523                    );
524                }
525            }
526            _ => {}
527        }
528
529        self.throttled_modify_order.send(command);
530    }
531
532    // -- PRE-TRADE CHECKS ------------------------------------------------------------------------
533
534    fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
535        ////////////////////////////////////////////////////////////////////////////////
536        // VALIDATION CHECKS
537        ////////////////////////////////////////////////////////////////////////////////
538        if order.time_in_force() == TimeInForce::Gtd {
539            // SAFETY: GTD guarantees an expire time
540            let expire_time = order.expire_time().unwrap();
541            if expire_time <= self.clock.borrow().timestamp_ns() {
542                self.deny_order(
543                    order,
544                    &format!("GTD {} already past", expire_time.to_rfc3339()),
545                );
546                return false; // Denied
547            }
548        }
549
550        if !self.check_order_price(instrument.clone(), order.clone())
551            || !self.check_order_quantity(instrument, order)
552        {
553            return false; // Denied
554        }
555
556        true
557    }
558
559    fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
560        ////////////////////////////////////////////////////////////////////////////////
561        // CHECK PRICE
562        ////////////////////////////////////////////////////////////////////////////////
563        if order.price().is_some() {
564            let risk_msg = self.check_price(&instrument, order.price());
565            if let Some(risk_msg) = risk_msg {
566                self.deny_order(order, &risk_msg);
567                return false; // Denied
568            }
569        }
570
571        ////////////////////////////////////////////////////////////////////////////////
572        // CHECK TRIGGER
573        ////////////////////////////////////////////////////////////////////////////////
574        if order.trigger_price().is_some() {
575            let risk_msg = self.check_price(&instrument, order.trigger_price());
576            if let Some(risk_msg) = risk_msg {
577                self.deny_order(order, &risk_msg);
578                return false; // Denied
579            }
580        }
581
582        true
583    }
584
585    fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
586        let risk_msg = self.check_quantity(
587            &instrument,
588            Some(order.quantity()),
589            order.is_quote_quantity(),
590        );
591        if let Some(risk_msg) = risk_msg {
592            self.deny_order(order, &risk_msg);
593            return false; // Denied
594        }
595
596        true
597    }
598
599    fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
600        ////////////////////////////////////////////////////////////////////////////////
601        // CHECK TRIGGER
602        ////////////////////////////////////////////////////////////////////////////////
603        let mut last_px: Option<Price> = None;
604        let mut max_notional: Option<Money> = None;
605
606        // Determine max notional
607        let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
608        if let Some(max_notional_setting_val) = max_notional_setting.copied() {
609            max_notional = Some(Money::new(
610                max_notional_setting_val
611                    .to_f64()
612                    .expect("Invalid decimal conversion"),
613                instrument.quote_currency(),
614            ));
615        }
616
617        // Get account for risk checks
618        let account_exists = {
619            let cache = self.cache.borrow();
620            cache.account_for_venue(&instrument.id().venue).cloned()
621        };
622
623        let account = if let Some(account) = account_exists {
624            account
625        } else {
626            log::debug!("Cannot find account for venue {}", instrument.id().venue);
627            return true; // TODO: Temporary early return until handling routing/multiple venues
628        };
629        let cash_account = match account {
630            AccountAny::Cash(cash_account) => cash_account,
631            AccountAny::Margin(_) => return true, // TODO: Determine risk controls for margin
632        };
633        let free = cash_account.balance_free(Some(instrument.quote_currency()));
634        if self.config.debug {
635            log::debug!("Free cash: {free:?}");
636        }
637
638        let mut cum_notional_buy: Option<Money> = None;
639        let mut cum_notional_sell: Option<Money> = None;
640        let mut base_currency: Option<Currency> = None;
641        for order in &orders {
642            // Determine last price based on order type
643            last_px = match order {
644                OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
645                    if last_px.is_none() {
646                        let cache = self.cache.borrow();
647                        if let Some(last_quote) = cache.quote(&instrument.id()) {
648                            match order.order_side() {
649                                OrderSide::Buy => Some(last_quote.ask_price),
650                                OrderSide::Sell => Some(last_quote.bid_price),
651                                _ => panic!("Invalid order side"),
652                            }
653                        } else {
654                            let cache = self.cache.borrow();
655                            let last_trade = cache.trade(&instrument.id());
656
657                            if let Some(last_trade) = last_trade {
658                                Some(last_trade.price)
659                            } else {
660                                log::warn!(
661                                    "Cannot check MARKET order risk: no prices for {}",
662                                    instrument.id()
663                                );
664                                continue;
665                            }
666                        }
667                    } else {
668                        last_px
669                    }
670                }
671                OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
672                OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
673                    if let Some(trigger_price) = order.trigger_price() {
674                        Some(trigger_price)
675                    } else {
676                        // Validate trailing offset type is supported
677                        let offset_type = order.trailing_offset_type().unwrap();
678                        if !matches!(
679                            offset_type,
680                            TrailingOffsetType::Price
681                                | TrailingOffsetType::BasisPoints
682                                | TrailingOffsetType::Ticks
683                        ) {
684                            self.deny_order(
685                                order.clone(),
686                                &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
687                            );
688                            return false;
689                        }
690
691                        let trigger_type = order.trigger_type().unwrap();
692                        let cache = self.cache.borrow();
693
694                        if trigger_type == TriggerType::BidAsk {
695                            if let Some(quote) = cache.quote(&instrument.id()) {
696                                match trailing_stop_calculate_with_bid_ask(
697                                    instrument.price_increment(),
698                                    order.trailing_offset_type().unwrap(),
699                                    order.order_side_specified(),
700                                    order.trailing_offset().unwrap(),
701                                    quote.bid_price,
702                                    quote.ask_price,
703                                ) {
704                                    Ok(calculated_trigger) => Some(calculated_trigger),
705                                    Err(e) => {
706                                        log::warn!(
707                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
708                                            order.order_type()
709                                        );
710                                        continue;
711                                    }
712                                }
713                            } else {
714                                log::warn!(
715                                    "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
716                                    order.order_type(),
717                                    instrument.id()
718                                );
719                                continue;
720                            }
721                        } else if let Some(last_trade) = cache.trade(&instrument.id()) {
722                            match trailing_stop_calculate_with_last(
723                                instrument.price_increment(),
724                                order.trailing_offset_type().unwrap(),
725                                order.order_side_specified(),
726                                order.trailing_offset().unwrap(),
727                                last_trade.price,
728                            ) {
729                                Ok(calculated_trigger) => Some(calculated_trigger),
730                                Err(e) => {
731                                    log::warn!(
732                                        "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
733                                        order.order_type(),
734                                        e
735                                    );
736                                    continue;
737                                }
738                            }
739                        } else if trigger_type == TriggerType::LastOrBidAsk {
740                            // Fallback to bid/ask when no trade data available
741                            if let Some(quote) = cache.quote(&instrument.id()) {
742                                match trailing_stop_calculate_with_bid_ask(
743                                    instrument.price_increment(),
744                                    order.trailing_offset_type().unwrap(),
745                                    order.order_side_specified(),
746                                    order.trailing_offset().unwrap(),
747                                    quote.bid_price,
748                                    quote.ask_price,
749                                ) {
750                                    Ok(calculated_trigger) => Some(calculated_trigger),
751                                    Err(e) => {
752                                        log::warn!(
753                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
754                                            order.order_type()
755                                        );
756                                        continue;
757                                    }
758                                }
759                            } else {
760                                log::warn!(
761                                    "Cannot check {} order risk: no trigger price set and no market data available for {}",
762                                    order.order_type(),
763                                    instrument.id()
764                                );
765                                continue;
766                            }
767                        } else {
768                            log::warn!(
769                                "Cannot check {} order risk: no trigger price set and no market data available for {}",
770                                order.order_type(),
771                                instrument.id()
772                            );
773                            continue;
774                        }
775                    }
776                }
777                _ => order.price(),
778            };
779
780            let last_px = if let Some(px) = last_px {
781                px
782            } else {
783                log::error!("Cannot check order risk: no price available");
784                continue;
785            };
786
787            // For quote quantity limit orders, use worst-case execution price
788            let effective_price = if order.is_quote_quantity()
789                && !instrument.is_inverse()
790                && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
791            {
792                // Get current market price for worst-case execution
793                let cache = self.cache.borrow();
794                if let Some(quote_tick) = cache.quote(&instrument.id()) {
795                    match order.order_side() {
796                        // BUY: could execute at best ask if below limit (more quantity)
797                        OrderSide::Buy => last_px.min(quote_tick.ask_price),
798                        // SELL: could execute at best bid if above limit (but less quantity, so use limit)
799                        OrderSide::Sell => last_px.max(quote_tick.bid_price),
800                        _ => last_px,
801                    }
802                } else {
803                    last_px // No market data, use limit price
804                }
805            } else {
806                last_px
807            };
808
809            let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
810                instrument.calculate_base_quantity(order.quantity(), effective_price)
811            } else {
812                order.quantity()
813            };
814
815            // Check min/max quantity against effective quantity
816            if let Some(max_quantity) = instrument.max_quantity()
817                && effective_quantity > max_quantity
818            {
819                self.deny_order(
820                    order.clone(),
821                    &format!(
822                        "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
823                    ),
824                );
825                return false; // Denied
826            }
827
828            if let Some(min_quantity) = instrument.min_quantity()
829                && effective_quantity < min_quantity
830            {
831                self.deny_order(
832                    order.clone(),
833                    &format!(
834                        "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
835                    ),
836                );
837                return false; // Denied
838            }
839
840            let notional =
841                instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
842
843            if self.config.debug {
844                log::debug!("Notional: {notional:?}");
845            }
846
847            // Check MAX notional per order limit
848            if let Some(max_notional_value) = max_notional
849                && notional > max_notional_value
850            {
851                self.deny_order(
852                        order.clone(),
853                        &format!(
854                            "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
855                        ),
856                    );
857                return false; // Denied
858            }
859
860            // Check MIN notional instrument limit
861            if let Some(min_notional) = instrument.min_notional()
862                && notional.currency == min_notional.currency
863                && notional < min_notional
864            {
865                self.deny_order(
866                        order.clone(),
867                        &format!(
868                            "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
869                        ),
870                    );
871                return false; // Denied
872            }
873
874            // // Check MAX notional instrument limit
875            if let Some(max_notional) = instrument.max_notional()
876                && notional.currency == max_notional.currency
877                && notional > max_notional
878            {
879                self.deny_order(
880                        order.clone(),
881                        &format!(
882                            "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
883                        ),
884                    );
885                return false; // Denied
886            }
887
888            // Calculate OrderBalanceImpact (valid for CashAccount only)
889            let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
890            let order_balance_impact = match order.order_side() {
891                OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
892                OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
893                OrderSide::NoOrderSide => {
894                    panic!("invalid `OrderSide`, was {}", order.order_side());
895                }
896            };
897
898            if self.config.debug {
899                log::debug!("Balance impact: {order_balance_impact}");
900            }
901
902            if let Some(free_val) = free
903                && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
904            {
905                self.deny_order(
906                    order.clone(),
907                    &format!(
908                        "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
909                    ),
910                );
911                return false;
912            }
913
914            if base_currency.is_none() {
915                base_currency = instrument.base_currency();
916            }
917            if order.is_buy() {
918                match cum_notional_buy.as_mut() {
919                    Some(cum_notional_buy_val) => {
920                        cum_notional_buy_val.raw += -order_balance_impact.raw;
921                    }
922                    None => {
923                        cum_notional_buy = Some(Money::from_raw(
924                            -order_balance_impact.raw,
925                            order_balance_impact.currency,
926                        ));
927                    }
928                }
929
930                if self.config.debug {
931                    log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
932                }
933
934                if let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
935                    && cum_notional_buy > free
936                {
937                    self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
938                    return false; // Denied
939                }
940            } else if order.is_sell() {
941                if cash_account.base_currency.is_some() {
942                    if order.is_reduce_only() {
943                        if self.config.debug {
944                            log::debug!(
945                                "Reduce-only SELL skips cumulative notional free-balance check"
946                            );
947                        }
948                    } else {
949                        match cum_notional_sell.as_mut() {
950                            Some(cum_notional_buy_val) => {
951                                cum_notional_buy_val.raw += order_balance_impact.raw;
952                            }
953                            None => {
954                                cum_notional_sell = Some(Money::from_raw(
955                                    order_balance_impact.raw,
956                                    order_balance_impact.currency,
957                                ));
958                            }
959                        }
960                        if self.config.debug {
961                            log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
962                        }
963
964                        if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
965                            && cum_notional_sell > free
966                        {
967                            self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
968                            return false; // Denied
969                        }
970                    }
971                }
972                // Account is already of type Cash, so no check
973                else if let Some(base_currency) = base_currency {
974                    if order.is_reduce_only() {
975                        if self.config.debug {
976                            log::debug!(
977                                "Reduce-only SELL skips base-currency cumulative free check"
978                            );
979                        }
980                        continue;
981                    }
982
983                    let cash_value = Money::from_raw(
984                        effective_quantity
985                            .raw
986                            .try_into()
987                            .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
988                            .unwrap(),
989                        base_currency,
990                    );
991
992                    if self.config.debug {
993                        log::debug!("Cash value: {cash_value:?}");
994                        log::debug!(
995                            "Total: {:?}",
996                            cash_account.balance_total(Some(base_currency))
997                        );
998                        log::debug!(
999                            "Locked: {:?}",
1000                            cash_account.balance_locked(Some(base_currency))
1001                        );
1002                        log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1003                    }
1004
1005                    match cum_notional_sell {
1006                        Some(mut value) => {
1007                            value.raw += cash_value.raw;
1008                            cum_notional_sell = Some(value);
1009                        }
1010                        None => cum_notional_sell = Some(cash_value),
1011                    }
1012
1013                    if self.config.debug {
1014                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1015                    }
1016                    if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1017                        && cum_notional_sell.raw > free.raw
1018                    {
1019                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1020                        return false; // Denied
1021                    }
1022                }
1023            }
1024        }
1025
1026        // Finally
1027        true // Passed
1028    }
1029
1030    fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1031        let price_val = price?;
1032
1033        if price_val.precision > instrument.price_precision() {
1034            return Some(format!(
1035                "price {} invalid (precision {} > {})",
1036                price_val,
1037                price_val.precision,
1038                instrument.price_precision()
1039            ));
1040        }
1041
1042        if !matches!(
1043            instrument.instrument_class(),
1044            InstrumentClass::Option
1045                | InstrumentClass::FuturesSpread
1046                | InstrumentClass::OptionSpread
1047        ) && price_val.raw <= 0
1048        {
1049            return Some(format!("price {price_val} invalid (<= 0)"));
1050        }
1051
1052        None
1053    }
1054
1055    fn check_quantity(
1056        &self,
1057        instrument: &InstrumentAny,
1058        quantity: Option<Quantity>,
1059        is_quote_quantity: bool,
1060    ) -> Option<String> {
1061        let quantity_val = quantity?;
1062
1063        // Check precision
1064        if quantity_val.precision > instrument.size_precision() {
1065            return Some(format!(
1066                "quantity {} invalid (precision {} > {})",
1067                quantity_val,
1068                quantity_val.precision,
1069                instrument.size_precision()
1070            ));
1071        }
1072
1073        // Skip min/max checks for quote quantities (they will be checked in check_orders_risk using effective_quantity)
1074        if is_quote_quantity {
1075            return None;
1076        }
1077
1078        // Check maximum quantity
1079        if let Some(max_quantity) = instrument.max_quantity()
1080            && quantity_val > max_quantity
1081        {
1082            return Some(format!(
1083                "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1084            ));
1085        }
1086
1087        // Check minimum quantity
1088        if let Some(min_quantity) = instrument.min_quantity()
1089            && quantity_val < min_quantity
1090        {
1091            return Some(format!(
1092                "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1093            ));
1094        }
1095
1096        None
1097    }
1098
1099    // -- DENIALS ---------------------------------------------------------------------------------
1100
1101    fn deny_command(&self, command: TradingCommand, reason: &str) {
1102        match command {
1103            TradingCommand::SubmitOrder(command) => {
1104                self.deny_order(command.order, reason);
1105            }
1106            TradingCommand::SubmitOrderList(command) => {
1107                self.deny_order_list(command.order_list, reason);
1108            }
1109            _ => {
1110                panic!("Cannot deny command {command}");
1111            }
1112        }
1113    }
1114
1115    fn deny_order(&self, order: OrderAny, reason: &str) {
1116        log::warn!(
1117            "SubmitOrder for {} DENIED: {}",
1118            order.client_order_id(),
1119            reason
1120        );
1121
1122        if order.status() != OrderStatus::Initialized {
1123            return;
1124        }
1125
1126        let mut cache = self.cache.borrow_mut();
1127        if !cache.order_exists(&order.client_order_id()) {
1128            cache
1129                .add_order(order.clone(), None, None, false)
1130                .map_err(|e| {
1131                    log::error!("Cannot add order to cache: {e}");
1132                })
1133                .unwrap();
1134        }
1135
1136        let denied = OrderEventAny::Denied(OrderDenied::new(
1137            order.trader_id(),
1138            order.strategy_id(),
1139            order.instrument_id(),
1140            order.client_order_id(),
1141            reason.into(),
1142            UUID4::new(),
1143            self.clock.borrow().timestamp_ns(),
1144            self.clock.borrow().timestamp_ns(),
1145        ));
1146
1147        msgbus::send_any("ExecEngine.process".into(), &denied);
1148    }
1149
1150    fn deny_order_list(&self, order_list: OrderList, reason: &str) {
1151        for order in order_list.orders {
1152            if !order.is_closed() {
1153                self.deny_order(order, reason);
1154            }
1155        }
1156    }
1157
1158    fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1159        let ts_event = self.clock.borrow().timestamp_ns();
1160        let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1161            order.trader_id(),
1162            order.strategy_id(),
1163            order.instrument_id(),
1164            order.client_order_id(),
1165            reason.into(),
1166            UUID4::new(),
1167            ts_event,
1168            ts_event,
1169            false,
1170            order.venue_order_id(),
1171            order.account_id(),
1172        ));
1173
1174        msgbus::send_any("ExecEngine.process".into(), &denied);
1175    }
1176
1177    // -- EGRESS ----------------------------------------------------------------------------------
1178
1179    fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1180        match self.trading_state {
1181            TradingState::Halted => match command {
1182                TradingCommand::SubmitOrder(submit_order) => {
1183                    self.deny_order(submit_order.order, "TradingState::HALTED");
1184                }
1185                TradingCommand::SubmitOrderList(submit_order_list) => {
1186                    self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1187                }
1188                _ => {}
1189            },
1190            TradingState::Reducing => match command {
1191                TradingCommand::SubmitOrder(submit_order) => {
1192                    let order = submit_order.order;
1193                    if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1194                        self.deny_order(
1195                            order,
1196                            &format!(
1197                                "BUY when TradingState::REDUCING and LONG {}",
1198                                instrument.id()
1199                            ),
1200                        );
1201                    } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1202                        self.deny_order(
1203                            order,
1204                            &format!(
1205                                "SELL when TradingState::REDUCING and SHORT {}",
1206                                instrument.id()
1207                            ),
1208                        );
1209                    }
1210                }
1211                TradingCommand::SubmitOrderList(submit_order_list) => {
1212                    let order_list = submit_order_list.order_list;
1213                    for order in &order_list.orders {
1214                        if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1215                            self.deny_order_list(
1216                                order_list,
1217                                &format!(
1218                                    "BUY when TradingState::REDUCING and LONG {}",
1219                                    instrument.id()
1220                                ),
1221                            );
1222                            return;
1223                        } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1224                            self.deny_order_list(
1225                                order_list,
1226                                &format!(
1227                                    "SELL when TradingState::REDUCING and SHORT {}",
1228                                    instrument.id()
1229                                ),
1230                            );
1231                            return;
1232                        }
1233                    }
1234                }
1235                _ => {}
1236            },
1237            TradingState::Active => match command {
1238                TradingCommand::SubmitOrder(submit_order) => {
1239                    self.throttled_submit_order.send(submit_order);
1240                }
1241                TradingCommand::SubmitOrderList(submit_order_list) => {
1242                    // TODO: implement throttler for order lists
1243                    self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1244                }
1245                _ => {}
1246            },
1247        }
1248    }
1249
1250    fn send_to_execution(&self, command: TradingCommand) {
1251        msgbus::send_any("ExecEngine.execute".into(), &command);
1252    }
1253
1254    fn handle_event(&mut self, event: OrderEventAny) {
1255        // We intend to extend the risk engine to be able to handle additional events.
1256        // For now we just log.
1257        if self.config.debug {
1258            log::debug!("{RECV}{EVT} {event:?}");
1259        }
1260    }
1261}