Skip to main content

nautilus_risk/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Risk management engine implementation.
17
18pub mod config;
19
20use std::{cell::RefCell, fmt::Debug, rc::Rc};
21
22use ahash::AHashMap;
23use config::RiskEngineConfig;
24use nautilus_common::{
25    cache::Cache,
26    clock::Clock,
27    logging::{CMD, EVT, RECV},
28    messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
29    msgbus,
30    msgbus::{MessagingSwitchboard, TypedIntoHandler},
31    throttler::Throttler,
32};
33use nautilus_core::{UUID4, WeakCell};
34use nautilus_execution::trailing::{
35    trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
36};
37use nautilus_model::{
38    accounts::{Account, AccountAny},
39    enums::{
40        InstrumentClass, OrderSide, OrderStatus, PositionSide, TimeInForce, TradingState,
41        TrailingOffsetType, TriggerType,
42    },
43    events::{OrderDenied, OrderEventAny, OrderModifyRejected},
44    identifiers::InstrumentId,
45    instruments::{Instrument, InstrumentAny},
46    orders::{Order, OrderAny},
47    types::{Currency, Money, Price, Quantity, quantity::QuantityRaw},
48};
49use nautilus_portfolio::Portfolio;
50use rust_decimal::{Decimal, prelude::ToPrimitive};
51use ustr::Ustr;
52
53type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
54type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
55
56/// Central risk management engine that validates and controls trading operations.
57///
58/// The `RiskEngine` provides comprehensive pre-trade risk checks including order validation,
59/// balance verification, position sizing limits, and trading state management. It acts as
60/// a gateway between strategy orders and execution, ensuring all trades comply with
61/// defined risk parameters and regulatory constraints.
62#[allow(dead_code)]
63pub struct RiskEngine {
64    clock: Rc<RefCell<dyn Clock>>,
65    cache: Rc<RefCell<Cache>>,
66    portfolio: Portfolio,
67    pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
68    pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
69    max_notional_per_order: AHashMap<InstrumentId, Decimal>,
70    trading_state: TradingState,
71    config: RiskEngineConfig,
72}
73
74impl Debug for RiskEngine {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct(stringify!(RiskEngine)).finish()
77    }
78}
79
80impl RiskEngine {
81    /// Creates a new [`RiskEngine`] instance.
82    pub fn new(
83        config: RiskEngineConfig,
84        portfolio: Portfolio,
85        clock: Rc<RefCell<dyn Clock>>,
86        cache: Rc<RefCell<Cache>>,
87    ) -> Self {
88        let throttled_submit_order =
89            Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
90
91        let throttled_modify_order =
92            Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
93
94        Self {
95            clock,
96            cache,
97            portfolio,
98            throttled_submit_order,
99            throttled_modify_order,
100            max_notional_per_order: AHashMap::new(),
101            trading_state: TradingState::Active,
102            config,
103        }
104    }
105
106    /// Registers all message bus handlers for the risk engine.
107    pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
108        let weak = WeakCell::from(Rc::downgrade(&engine));
109
110        msgbus::register_trading_command_endpoint(
111            MessagingSwitchboard::risk_engine_execute(),
112            TypedIntoHandler::from(move |cmd: TradingCommand| {
113                if let Some(rc) = weak.upgrade() {
114                    rc.borrow_mut().execute(cmd);
115                }
116            }),
117        );
118    }
119
120    fn create_submit_order_throttler(
121        config: &RiskEngineConfig,
122        clock: Rc<RefCell<dyn Clock>>,
123        cache: Rc<RefCell<Cache>>,
124    ) -> Throttler<SubmitOrder, SubmitOrderFn> {
125        let success_handler = {
126            Box::new(move |submit_order: SubmitOrder| {
127                let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
128                msgbus::send_trading_command(endpoint, TradingCommand::SubmitOrder(submit_order));
129            }) as Box<dyn Fn(SubmitOrder)>
130        };
131
132        let failure_handler = {
133            let cache = cache;
134            let clock = clock.clone();
135            Box::new(move |submit_order: SubmitOrder| {
136                let reason = "REJECTED BY THROTTLER";
137                log::warn!(
138                    "SubmitOrder for {} DENIED: {}",
139                    submit_order.client_order_id,
140                    reason
141                );
142
143                Self::handle_submit_order_cache(&cache, &submit_order);
144
145                let denied = Self::create_order_denied(&submit_order, reason, &clock);
146
147                let endpoint = MessagingSwitchboard::exec_engine_process();
148                msgbus::send_order_event(endpoint, denied);
149            }) as Box<dyn Fn(SubmitOrder)>
150        };
151
152        Throttler::new(
153            config.max_order_submit.limit,
154            config.max_order_submit.interval_ns,
155            clock,
156            "ORDER_SUBMIT_THROTTLER".to_string(),
157            success_handler,
158            Some(failure_handler),
159            Ustr::from(UUID4::new().as_str()),
160        )
161    }
162
163    fn create_modify_order_throttler(
164        config: &RiskEngineConfig,
165        clock: Rc<RefCell<dyn Clock>>,
166        cache: Rc<RefCell<Cache>>,
167    ) -> Throttler<ModifyOrder, ModifyOrderFn> {
168        let success_handler = {
169            Box::new(move |order: ModifyOrder| {
170                let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
171                msgbus::send_trading_command(endpoint, TradingCommand::ModifyOrder(order));
172            }) as Box<dyn Fn(ModifyOrder)>
173        };
174
175        let failure_handler = {
176            let cache = cache;
177            let clock = clock.clone();
178            Box::new(move |order: ModifyOrder| {
179                let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
180                log::warn!(
181                    "SubmitOrder for {} DENIED: {}",
182                    order.client_order_id,
183                    reason
184                );
185
186                let order = match Self::get_existing_order(&cache, &order) {
187                    Some(order) => order,
188                    None => return,
189                };
190
191                let rejected = Self::create_modify_rejected(&order, reason, &clock);
192
193                let endpoint = MessagingSwitchboard::exec_engine_process();
194                msgbus::send_order_event(endpoint, rejected);
195            }) as Box<dyn Fn(ModifyOrder)>
196        };
197
198        Throttler::new(
199            config.max_order_modify.limit,
200            config.max_order_modify.interval_ns,
201            clock,
202            "ORDER_MODIFY_THROTTLER".to_string(),
203            success_handler,
204            Some(failure_handler),
205            Ustr::from(UUID4::new().as_str()),
206        )
207    }
208
209    fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
210        let cache = cache.borrow();
211        if !cache.order_exists(&submit_order.client_order_id) {
212            log::error!(
213                "Order not found in cache for client_order_id: {}",
214                submit_order.client_order_id
215            );
216        }
217    }
218
219    fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
220        let cache = cache.borrow();
221        if let Some(order) = cache.order(&order.client_order_id) {
222            Some(order.clone())
223        } else {
224            log::error!(
225                "Order with command.client_order_id: {} not found",
226                order.client_order_id
227            );
228            None
229        }
230    }
231
232    fn create_order_denied(
233        submit_order: &SubmitOrder,
234        reason: &str,
235        clock: &Rc<RefCell<dyn Clock>>,
236    ) -> OrderEventAny {
237        let timestamp = clock.borrow().timestamp_ns();
238        OrderEventAny::Denied(OrderDenied::new(
239            submit_order.trader_id,
240            submit_order.strategy_id,
241            submit_order.instrument_id,
242            submit_order.client_order_id,
243            reason.into(),
244            UUID4::new(),
245            timestamp,
246            timestamp,
247        ))
248    }
249
250    fn create_modify_rejected(
251        order: &OrderAny,
252        reason: &str,
253        clock: &Rc<RefCell<dyn Clock>>,
254    ) -> OrderEventAny {
255        let timestamp = clock.borrow().timestamp_ns();
256        OrderEventAny::ModifyRejected(OrderModifyRejected::new(
257            order.trader_id(),
258            order.strategy_id(),
259            order.instrument_id(),
260            order.client_order_id(),
261            reason.into(),
262            UUID4::new(),
263            timestamp,
264            timestamp,
265            false,
266            order.venue_order_id(),
267            None,
268        ))
269    }
270
271    /// Executes a trading command through the risk management pipeline.
272    pub fn execute(&mut self, command: TradingCommand) {
273        // This will extend to other commands such as `RiskCommand`
274        self.handle_command(command);
275    }
276
277    /// Processes an order event for risk monitoring and state updates.
278    pub fn process(&mut self, event: OrderEventAny) {
279        // This will extend to other events such as `RiskEvent`
280        self.handle_event(event);
281    }
282
283    /// Sets the trading state for risk control enforcement.
284    pub fn set_trading_state(&mut self, state: TradingState) {
285        if state == self.trading_state {
286            log::warn!("No change to trading state: already set to {state:?}");
287            return;
288        }
289
290        self.trading_state = state;
291
292        let _ts_now = self.clock.borrow().timestamp_ns();
293
294        // TODO: Create a new Event "TradingStateChanged" in OrderEventAny enum.
295        // let event = OrderEventAny::TradingStateChanged(TradingStateChanged::new(..,self.trading_state,..));
296
297        msgbus::publish_any("events.risk".into(), &"message"); // TODO: Send the new Event here
298
299        log::info!("Trading state set to {state:?}");
300    }
301
302    /// Sets the maximum notional value per order for the specified instrument.
303    pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
304        self.max_notional_per_order.insert(instrument_id, new_value);
305
306        let new_value_str = new_value.to_string();
307        log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
308    }
309
310    /// Starts the risk engine.
311    pub fn start(&mut self) {
312        log::info!("Started");
313    }
314
315    /// Stops the risk engine.
316    pub fn stop(&mut self) {
317        log::info!("Stopped");
318    }
319
320    /// Resets the risk engine to its initial state.
321    pub fn reset(&mut self) {
322        self.throttled_submit_order.reset();
323        self.throttled_modify_order.reset();
324        self.max_notional_per_order.clear();
325        self.trading_state = TradingState::Active;
326
327        log::info!("Reset");
328    }
329
330    /// Disposes of the risk engine, releasing resources.
331    pub fn dispose(&mut self) {
332        log::info!("Disposed");
333    }
334
335    /// Returns a reference to the clock.
336    #[must_use]
337    pub fn clock(&self) -> &Rc<RefCell<dyn Clock>> {
338        &self.clock
339    }
340
341    /// Returns a reference to the cache.
342    #[must_use]
343    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
344        &self.cache
345    }
346
347    /// Returns a reference to the configuration.
348    #[must_use]
349    pub const fn config(&self) -> &RiskEngineConfig {
350        &self.config
351    }
352
353    /// Returns the current trading state.
354    #[must_use]
355    pub const fn trading_state(&self) -> TradingState {
356        self.trading_state
357    }
358
359    /// Returns a reference to the max notional per order settings.
360    #[must_use]
361    pub const fn max_notional_per_order(&self) -> &AHashMap<InstrumentId, Decimal> {
362        &self.max_notional_per_order
363    }
364
365    fn handle_command(&mut self, command: TradingCommand) {
366        if self.config.debug {
367            log::debug!("{CMD}{RECV} {command:?}");
368        }
369
370        match command {
371            TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
372            TradingCommand::SubmitOrderList(submit_order_list) => {
373                self.handle_submit_order_list(submit_order_list);
374            }
375            TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
376            TradingCommand::QueryAccount(query_account) => {
377                self.send_to_execution(TradingCommand::QueryAccount(query_account));
378            }
379            _ => {
380                log::error!("Cannot handle command: {command}");
381            }
382        }
383    }
384
385    fn handle_submit_order(&mut self, command: SubmitOrder) {
386        if self.config.bypass {
387            self.send_to_execution(TradingCommand::SubmitOrder(command));
388            return;
389        }
390
391        let order = {
392            let cache = self.cache.borrow();
393            match cache.order(&command.client_order_id) {
394                Some(order) => order.clone(),
395                None => {
396                    log::error!(
397                        "Cannot handle submit order: order not found in cache for {}",
398                        command.client_order_id
399                    );
400                    return;
401                }
402            }
403        };
404
405        if let Some(position_id) = command.position_id
406            && order.is_reduce_only()
407        {
408            let position_exists = {
409                let cache = self.cache.borrow();
410                cache
411                    .position(&position_id)
412                    .map(|pos| (pos.side, pos.quantity))
413            };
414
415            if let Some((pos_side, pos_quantity)) = position_exists {
416                if !order.would_reduce_only(pos_side, pos_quantity) {
417                    self.deny_command(
418                        TradingCommand::SubmitOrder(command),
419                        &format!("Reduce only order would increase position {position_id}"),
420                    );
421                    return; // Denied
422                }
423            } else {
424                self.deny_command(
425                    TradingCommand::SubmitOrder(command),
426                    &format!("Position {position_id} not found for reduce-only order"),
427                );
428                return;
429            }
430        }
431
432        let instrument_exists = {
433            let cache = self.cache.borrow();
434            cache.instrument(&command.instrument_id).cloned()
435        };
436
437        let instrument = if let Some(instrument) = instrument_exists {
438            instrument
439        } else {
440            self.deny_command(
441                TradingCommand::SubmitOrder(command.clone()),
442                &format!("Instrument for {} not found", command.instrument_id),
443            );
444            return; // Denied
445        };
446
447        if !self.check_order(instrument.clone(), order.clone()) {
448            return; // Denied
449        }
450
451        if !self.check_orders_risk(instrument.clone(), &[order]) {
452            return; // Denied
453        }
454
455        // Route through execution gateway for TradingState checks & throttling
456        self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
457    }
458
459    fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
460        if self.config.bypass {
461            self.send_to_execution(TradingCommand::SubmitOrderList(command));
462            return;
463        }
464
465        let instrument_exists = {
466            let cache = self.cache.borrow();
467            cache.instrument(&command.instrument_id).cloned()
468        };
469
470        let instrument = if let Some(instrument) = instrument_exists {
471            instrument
472        } else {
473            self.deny_command(
474                TradingCommand::SubmitOrderList(command.clone()),
475                &format!("no instrument found for {}", command.instrument_id),
476            );
477            return; // Denied
478        };
479
480        let orders: Vec<OrderAny> = self
481            .cache
482            .borrow()
483            .orders_for_ids(&command.order_list.client_order_ids, &command);
484
485        if orders.len() != command.order_list.client_order_ids.len() {
486            self.deny_order_list(
487                &orders,
488                &format!("Incomplete order list: missing orders in cache for {command}"),
489            );
490            return; // Denied
491        }
492
493        for order in orders.clone() {
494            if !self.check_order(instrument.clone(), order) {
495                return; // Denied
496            }
497        }
498
499        if !self.check_orders_risk(instrument.clone(), &orders) {
500            self.deny_order_list(
501                &orders,
502                &format!("OrderList {} DENIED", command.order_list.id),
503            );
504            return; // Denied
505        }
506
507        self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
508    }
509
510    fn handle_modify_order(&mut self, command: ModifyOrder) {
511        let order_exists = {
512            let cache = self.cache.borrow();
513            cache.order(&command.client_order_id).cloned()
514        };
515
516        let order = if let Some(order) = order_exists {
517            order
518        } else {
519            log::error!(
520                "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
521                command.client_order_id
522            );
523            return;
524        };
525
526        if order.is_closed() {
527            self.reject_modify_order(
528                order,
529                &format!(
530                    "Order with command.client_order_id: {} already closed",
531                    command.client_order_id
532                ),
533            );
534            return;
535        } else if order.status() == OrderStatus::PendingCancel {
536            self.reject_modify_order(
537                order,
538                &format!(
539                    "Order with command.client_order_id: {} is already pending cancel",
540                    command.client_order_id
541                ),
542            );
543            return;
544        }
545
546        let maybe_instrument = {
547            let cache = self.cache.borrow();
548            cache.instrument(&command.instrument_id).cloned()
549        };
550
551        let instrument = if let Some(instrument) = maybe_instrument {
552            instrument
553        } else {
554            self.reject_modify_order(
555                order,
556                &format!("no instrument found for {:?}", command.instrument_id),
557            );
558            return; // Denied
559        };
560
561        // Check Price
562        let mut risk_msg = self.check_price(&instrument, command.price);
563        if let Some(risk_msg) = risk_msg {
564            self.reject_modify_order(order, &risk_msg);
565            return; // Denied
566        }
567
568        // Check Trigger
569        risk_msg = self.check_price(&instrument, command.trigger_price);
570        if let Some(risk_msg) = risk_msg {
571            self.reject_modify_order(order, &risk_msg);
572            return; // Denied
573        }
574
575        // Check Quantity
576        risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
577        if let Some(risk_msg) = risk_msg {
578            self.reject_modify_order(order, &risk_msg);
579            return; // Denied
580        }
581
582        // Check TradingState
583        match self.trading_state {
584            TradingState::Halted => {
585                self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
586            }
587            TradingState::Reducing => {
588                if let Some(quantity) = command.quantity
589                    && quantity > order.quantity()
590                    && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
591                        || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
592                {
593                    self.reject_modify_order(
594                        order,
595                        &format!(
596                            "TradingState is REDUCING and update will increase exposure {}",
597                            instrument.id()
598                        ),
599                    );
600                }
601            }
602            _ => {}
603        }
604
605        self.throttled_modify_order.send(command);
606    }
607
608    fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
609        if order.time_in_force() == TimeInForce::Gtd {
610            // SAFETY: GTD guarantees an expire time
611            let expire_time = order.expire_time().unwrap();
612            if expire_time <= self.clock.borrow().timestamp_ns() {
613                self.deny_order(
614                    order,
615                    &format!("GTD {} already past", expire_time.to_rfc3339()),
616                );
617                return false; // Denied
618            }
619        }
620
621        if !self.check_order_price(instrument.clone(), order.clone())
622            || !self.check_order_quantity(instrument, order)
623        {
624            return false; // Denied
625        }
626
627        true
628    }
629
630    fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
631        if order.price().is_some() {
632            let risk_msg = self.check_price(&instrument, order.price());
633            if let Some(risk_msg) = risk_msg {
634                self.deny_order(order, &risk_msg);
635                return false; // Denied
636            }
637        }
638
639        if order.trigger_price().is_some() {
640            let risk_msg = self.check_price(&instrument, order.trigger_price());
641            if let Some(risk_msg) = risk_msg {
642                self.deny_order(order, &risk_msg);
643                return false; // Denied
644            }
645        }
646
647        true
648    }
649
650    fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
651        let risk_msg = self.check_quantity(
652            &instrument,
653            Some(order.quantity()),
654            order.is_quote_quantity(),
655        );
656        if let Some(risk_msg) = risk_msg {
657            self.deny_order(order, &risk_msg);
658            return false; // Denied
659        }
660
661        true
662    }
663
664    fn check_orders_risk(&self, instrument: InstrumentAny, orders: &[OrderAny]) -> bool {
665        let mut last_px: Option<Price> = None;
666        let mut max_notional: Option<Money> = None;
667
668        // Determine max notional
669        let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
670        if let Some(max_notional_setting_val) = max_notional_setting.copied() {
671            max_notional = Some(Money::new(
672                max_notional_setting_val
673                    .to_f64()
674                    .expect("Invalid decimal conversion"),
675                instrument.quote_currency(),
676            ));
677        }
678
679        // Get account for risk checks
680        let account_exists = {
681            let cache = self.cache.borrow();
682            cache.account_for_venue(&instrument.id().venue).cloned()
683        };
684
685        let account = if let Some(account) = account_exists {
686            account
687        } else {
688            log::debug!("Cannot find account for venue {}", instrument.id().venue);
689            return true; // TODO: Temporary early return until handling routing/multiple venues
690        };
691        let cash_account = match account {
692            AccountAny::Cash(cash_account) => cash_account,
693            AccountAny::Margin(_) => return true, // TODO: Determine risk controls for margin
694        };
695        let free = cash_account.balance_free(Some(instrument.quote_currency()));
696        let allow_borrowing = cash_account.allow_borrowing;
697        if self.config.debug {
698            log::debug!("Free cash: {free:?}");
699        }
700
701        // Get net LONG position quantity for this instrument (for position-reducing sell checks),
702        // accounting for already submitted (but unfilled) SELL orders to prevent overselling.
703        let (net_long_qty_raw, pending_sell_qty_raw) = {
704            let cache = self.cache.borrow();
705            let long_qty: QuantityRaw = cache
706                .positions_open(
707                    None,
708                    Some(&instrument.id()),
709                    None,
710                    None,
711                    Some(PositionSide::Long),
712                )
713                .iter()
714                .map(|pos| pos.quantity.raw)
715                .sum();
716            let pending_sells: QuantityRaw = cache
717                .orders_open(
718                    None,
719                    Some(&instrument.id()),
720                    None,
721                    None,
722                    Some(OrderSide::Sell),
723                )
724                .iter()
725                .map(|ord| ord.leaves_qty().raw)
726                .sum();
727            (long_qty, pending_sells)
728        };
729
730        // Available quantity is long position minus pending sells
731        let available_long_qty_raw = net_long_qty_raw.saturating_sub(pending_sell_qty_raw);
732
733        if self.config.debug && net_long_qty_raw > 0 {
734            log::debug!(
735                "Net LONG qty (raw): {net_long_qty_raw}, pending sells: {pending_sell_qty_raw}, available: {available_long_qty_raw}"
736            );
737        }
738
739        // Track cumulative sell quantity to determine position-reducing vs position-opening sells
740        let mut cum_sell_qty_raw: QuantityRaw = 0;
741
742        let mut cum_notional_buy: Option<Money> = None;
743        let mut cum_notional_sell: Option<Money> = None;
744        let mut base_currency: Option<Currency> = None;
745        for order in orders {
746            // Determine last price based on order type
747            last_px = match order {
748                OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
749                    if last_px.is_none() {
750                        let cache = self.cache.borrow();
751                        if let Some(last_quote) = cache.quote(&instrument.id()) {
752                            match order.order_side() {
753                                OrderSide::Buy => Some(last_quote.ask_price),
754                                OrderSide::Sell => Some(last_quote.bid_price),
755                                _ => panic!("Invalid order side"),
756                            }
757                        } else {
758                            let cache = self.cache.borrow();
759                            let last_trade = cache.trade(&instrument.id());
760
761                            if let Some(last_trade) = last_trade {
762                                Some(last_trade.price)
763                            } else {
764                                log::warn!(
765                                    "Cannot check MARKET order risk: no prices for {}",
766                                    instrument.id()
767                                );
768                                continue;
769                            }
770                        }
771                    } else {
772                        last_px
773                    }
774                }
775                OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
776                OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
777                    if let Some(trigger_price) = order.trigger_price() {
778                        Some(trigger_price)
779                    } else {
780                        // Validate trailing offset type is supported
781                        let offset_type = order.trailing_offset_type().unwrap();
782                        if !matches!(
783                            offset_type,
784                            TrailingOffsetType::Price
785                                | TrailingOffsetType::BasisPoints
786                                | TrailingOffsetType::Ticks
787                        ) {
788                            self.deny_order(
789                                order.clone(),
790                                &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
791                            );
792                            return false;
793                        }
794
795                        let trigger_type = order.trigger_type().unwrap();
796                        let cache = self.cache.borrow();
797
798                        if trigger_type == TriggerType::BidAsk {
799                            if let Some(quote) = cache.quote(&instrument.id()) {
800                                match trailing_stop_calculate_with_bid_ask(
801                                    instrument.price_increment(),
802                                    order.trailing_offset_type().unwrap(),
803                                    order.order_side_specified(),
804                                    order.trailing_offset().unwrap(),
805                                    quote.bid_price,
806                                    quote.ask_price,
807                                ) {
808                                    Ok(calculated_trigger) => Some(calculated_trigger),
809                                    Err(e) => {
810                                        log::warn!(
811                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
812                                            order.order_type()
813                                        );
814                                        continue;
815                                    }
816                                }
817                            } else {
818                                log::warn!(
819                                    "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
820                                    order.order_type(),
821                                    instrument.id()
822                                );
823                                continue;
824                            }
825                        } else if let Some(last_trade) = cache.trade(&instrument.id()) {
826                            match trailing_stop_calculate_with_last(
827                                instrument.price_increment(),
828                                order.trailing_offset_type().unwrap(),
829                                order.order_side_specified(),
830                                order.trailing_offset().unwrap(),
831                                last_trade.price,
832                            ) {
833                                Ok(calculated_trigger) => Some(calculated_trigger),
834                                Err(e) => {
835                                    log::warn!(
836                                        "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
837                                        order.order_type(),
838                                        e
839                                    );
840                                    continue;
841                                }
842                            }
843                        } else if trigger_type == TriggerType::LastOrBidAsk {
844                            // Fallback to bid/ask when no trade data available
845                            if let Some(quote) = cache.quote(&instrument.id()) {
846                                match trailing_stop_calculate_with_bid_ask(
847                                    instrument.price_increment(),
848                                    order.trailing_offset_type().unwrap(),
849                                    order.order_side_specified(),
850                                    order.trailing_offset().unwrap(),
851                                    quote.bid_price,
852                                    quote.ask_price,
853                                ) {
854                                    Ok(calculated_trigger) => Some(calculated_trigger),
855                                    Err(e) => {
856                                        log::warn!(
857                                            "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
858                                            order.order_type()
859                                        );
860                                        continue;
861                                    }
862                                }
863                            } else {
864                                log::warn!(
865                                    "Cannot check {} order risk: no trigger price set and no market data available for {}",
866                                    order.order_type(),
867                                    instrument.id()
868                                );
869                                continue;
870                            }
871                        } else {
872                            log::warn!(
873                                "Cannot check {} order risk: no trigger price set and no market data available for {}",
874                                order.order_type(),
875                                instrument.id()
876                            );
877                            continue;
878                        }
879                    }
880                }
881                _ => order.price(),
882            };
883
884            let last_px = if let Some(px) = last_px {
885                px
886            } else {
887                log::error!("Cannot check order risk: no price available");
888                continue;
889            };
890
891            // For quote quantity limit orders, use worst-case execution price
892            let effective_price = if order.is_quote_quantity()
893                && !instrument.is_inverse()
894                && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
895            {
896                // Get current market price for worst-case execution
897                let cache = self.cache.borrow();
898                if let Some(quote_tick) = cache.quote(&instrument.id()) {
899                    match order.order_side() {
900                        // BUY: could execute at best ask if below limit (more quantity)
901                        OrderSide::Buy => last_px.min(quote_tick.ask_price),
902                        // SELL: could execute at best bid if above limit (but less quantity, so use limit)
903                        OrderSide::Sell => last_px.max(quote_tick.bid_price),
904                        _ => last_px,
905                    }
906                } else {
907                    last_px // No market data, use limit price
908                }
909            } else {
910                last_px
911            };
912
913            let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
914                instrument.calculate_base_quantity(order.quantity(), effective_price)
915            } else {
916                order.quantity()
917            };
918
919            // Check min/max quantity against effective quantity
920            if let Some(max_quantity) = instrument.max_quantity()
921                && effective_quantity > max_quantity
922            {
923                self.deny_order(
924                    order.clone(),
925                    &format!(
926                        "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
927                    ),
928                );
929                return false; // Denied
930            }
931
932            if let Some(min_quantity) = instrument.min_quantity()
933                && effective_quantity < min_quantity
934            {
935                self.deny_order(
936                    order.clone(),
937                    &format!(
938                        "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
939                    ),
940                );
941                return false; // Denied
942            }
943
944            let notional =
945                instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
946
947            if self.config.debug {
948                log::debug!("Notional: {notional:?}");
949            }
950
951            // Check MAX notional per order limit
952            if let Some(max_notional_value) = max_notional
953                && notional > max_notional_value
954            {
955                self.deny_order(
956                        order.clone(),
957                        &format!(
958                            "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
959                        ),
960                    );
961                return false; // Denied
962            }
963
964            // Check MIN notional instrument limit
965            if let Some(min_notional) = instrument.min_notional()
966                && notional.currency == min_notional.currency
967                && notional < min_notional
968            {
969                self.deny_order(
970                        order.clone(),
971                        &format!(
972                            "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
973                        ),
974                    );
975                return false; // Denied
976            }
977
978            // // Check MAX notional instrument limit
979            if let Some(max_notional) = instrument.max_notional()
980                && notional.currency == max_notional.currency
981                && notional > max_notional
982            {
983                self.deny_order(
984                        order.clone(),
985                        &format!(
986                            "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
987                        ),
988                    );
989                return false; // Denied
990            }
991
992            // Calculate OrderBalanceImpact (valid for CashAccount only)
993            let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
994            let order_balance_impact = match order.order_side() {
995                OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
996                OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
997                OrderSide::NoOrderSide => {
998                    panic!("invalid `OrderSide`, was {}", order.order_side());
999                }
1000            };
1001
1002            if self.config.debug {
1003                log::debug!("Balance impact: {order_balance_impact}");
1004            }
1005
1006            // Skip balance check when borrowing is enabled (e.g. spot margin trading)
1007            if !allow_borrowing
1008                && let Some(free_val) = free
1009                && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
1010            {
1011                self.deny_order(
1012                    order.clone(),
1013                    &format!(
1014                        "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
1015                    ),
1016                );
1017                return false;
1018            }
1019
1020            if base_currency.is_none() {
1021                base_currency = instrument.base_currency();
1022            }
1023            if order.is_buy() {
1024                match cum_notional_buy.as_mut() {
1025                    Some(cum_notional_buy_val) => {
1026                        cum_notional_buy_val.raw += -order_balance_impact.raw;
1027                    }
1028                    None => {
1029                        cum_notional_buy = Some(Money::from_raw(
1030                            -order_balance_impact.raw,
1031                            order_balance_impact.currency,
1032                        ));
1033                    }
1034                }
1035
1036                if self.config.debug {
1037                    log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
1038                }
1039
1040                if !allow_borrowing
1041                    && let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
1042                    && cum_notional_buy > free
1043                {
1044                    self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
1045                    return false; // Denied
1046                }
1047            } else if order.is_sell() {
1048                let is_position_reducing_sell = order.is_reduce_only()
1049                    || (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw;
1050                cum_sell_qty_raw += effective_quantity.raw;
1051
1052                if is_position_reducing_sell {
1053                    if self.config.debug {
1054                        log::debug!("Position-reducing SELL skips balance check");
1055                    }
1056                    continue;
1057                }
1058
1059                if cash_account.base_currency.is_some() {
1060                    match cum_notional_sell.as_mut() {
1061                        Some(cum_notional_buy_val) => {
1062                            cum_notional_buy_val.raw += order_balance_impact.raw;
1063                        }
1064                        None => {
1065                            cum_notional_sell = Some(Money::from_raw(
1066                                order_balance_impact.raw,
1067                                order_balance_impact.currency,
1068                            ));
1069                        }
1070                    }
1071                    if self.config.debug {
1072                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1073                    }
1074
1075                    if !allow_borrowing
1076                        && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1077                        && cum_notional_sell > free
1078                    {
1079                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1080                        return false; // Denied
1081                    }
1082                }
1083                // Account is already of type Cash, so no check
1084                else if let Some(base_currency) = base_currency {
1085                    let cash_value = Money::from_raw(
1086                        effective_quantity
1087                            .raw
1088                            .try_into()
1089                            .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
1090                            .unwrap(),
1091                        base_currency,
1092                    );
1093
1094                    if self.config.debug {
1095                        log::debug!("Cash value: {cash_value:?}");
1096                        log::debug!(
1097                            "Total: {:?}",
1098                            cash_account.balance_total(Some(base_currency))
1099                        );
1100                        log::debug!(
1101                            "Locked: {:?}",
1102                            cash_account.balance_locked(Some(base_currency))
1103                        );
1104                        log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1105                    }
1106
1107                    match cum_notional_sell {
1108                        Some(mut value) => {
1109                            value.raw += cash_value.raw;
1110                            cum_notional_sell = Some(value);
1111                        }
1112                        None => cum_notional_sell = Some(cash_value),
1113                    }
1114
1115                    if self.config.debug {
1116                        log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1117                    }
1118                    if !allow_borrowing
1119                        && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1120                        && cum_notional_sell.raw > free.raw
1121                    {
1122                        self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1123                        return false; // Denied
1124                    }
1125                }
1126            }
1127        }
1128
1129        // Finally
1130        true // Passed
1131    }
1132
1133    fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1134        let price_val = price?;
1135
1136        if price_val.precision > instrument.price_precision() {
1137            return Some(format!(
1138                "price {} invalid (precision {} > {})",
1139                price_val,
1140                price_val.precision,
1141                instrument.price_precision()
1142            ));
1143        }
1144
1145        if !matches!(
1146            instrument.instrument_class(),
1147            InstrumentClass::Option
1148                | InstrumentClass::FuturesSpread
1149                | InstrumentClass::OptionSpread
1150        ) && price_val.raw <= 0
1151        {
1152            return Some(format!("price {price_val} invalid (<= 0)"));
1153        }
1154
1155        None
1156    }
1157
1158    fn check_quantity(
1159        &self,
1160        instrument: &InstrumentAny,
1161        quantity: Option<Quantity>,
1162        is_quote_quantity: bool,
1163    ) -> Option<String> {
1164        let quantity_val = quantity?;
1165
1166        // Check precision
1167        if quantity_val.precision > instrument.size_precision() {
1168            return Some(format!(
1169                "quantity {} invalid (precision {} > {})",
1170                quantity_val,
1171                quantity_val.precision,
1172                instrument.size_precision()
1173            ));
1174        }
1175
1176        // Skip min/max checks for quote quantities (they will be checked in check_orders_risk using effective_quantity)
1177        if is_quote_quantity {
1178            return None;
1179        }
1180
1181        // Check maximum quantity
1182        if let Some(max_quantity) = instrument.max_quantity()
1183            && quantity_val > max_quantity
1184        {
1185            return Some(format!(
1186                "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1187            ));
1188        }
1189
1190        // Check minimum quantity
1191        if let Some(min_quantity) = instrument.min_quantity()
1192            && quantity_val < min_quantity
1193        {
1194            return Some(format!(
1195                "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1196            ));
1197        }
1198
1199        None
1200    }
1201
1202    fn deny_command(&self, command: TradingCommand, reason: &str) {
1203        match command {
1204            TradingCommand::SubmitOrder(command) => {
1205                let order = {
1206                    let cache = self.cache.borrow();
1207                    cache.order(&command.client_order_id).cloned()
1208                };
1209                if let Some(order) = order {
1210                    self.deny_order(order, reason);
1211                } else {
1212                    log::error!(
1213                        "Cannot deny order: not found in cache for {}",
1214                        command.client_order_id
1215                    );
1216                }
1217            }
1218            TradingCommand::SubmitOrderList(command) => {
1219                let orders: Vec<OrderAny> = self
1220                    .cache
1221                    .borrow()
1222                    .orders_for_ids(&command.order_list.client_order_ids, &command);
1223                self.deny_order_list(&orders, reason);
1224            }
1225            _ => {
1226                panic!("Cannot deny command {command}");
1227            }
1228        }
1229    }
1230
1231    fn deny_order(&self, order: OrderAny, reason: &str) {
1232        log::warn!(
1233            "SubmitOrder for {} DENIED: {}",
1234            order.client_order_id(),
1235            reason
1236        );
1237
1238        if order.status() != OrderStatus::Initialized {
1239            return;
1240        }
1241
1242        // Scope the cache borrow to avoid RefCell conflict when sending to ExecEngine
1243        {
1244            let mut cache = self.cache.borrow_mut();
1245            if !cache.order_exists(&order.client_order_id()) {
1246                cache
1247                    .add_order(order.clone(), None, None, false)
1248                    .map_err(|e| {
1249                        log::error!("Cannot add order to cache: {e}");
1250                    })
1251                    .unwrap();
1252            }
1253        }
1254
1255        let denied = OrderEventAny::Denied(OrderDenied::new(
1256            order.trader_id(),
1257            order.strategy_id(),
1258            order.instrument_id(),
1259            order.client_order_id(),
1260            reason.into(),
1261            UUID4::new(),
1262            self.clock.borrow().timestamp_ns(),
1263            self.clock.borrow().timestamp_ns(),
1264        ));
1265
1266        let endpoint = MessagingSwitchboard::exec_engine_process();
1267        msgbus::send_order_event(endpoint, denied);
1268    }
1269
1270    fn deny_order_list(&self, orders: &[OrderAny], reason: &str) {
1271        for order in orders {
1272            if !order.is_closed() {
1273                self.deny_order(order.clone(), reason);
1274            }
1275        }
1276    }
1277
1278    fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1279        let ts_event = self.clock.borrow().timestamp_ns();
1280        let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1281            order.trader_id(),
1282            order.strategy_id(),
1283            order.instrument_id(),
1284            order.client_order_id(),
1285            reason.into(),
1286            UUID4::new(),
1287            ts_event,
1288            ts_event,
1289            false,
1290            order.venue_order_id(),
1291            order.account_id(),
1292        ));
1293
1294        let endpoint = MessagingSwitchboard::exec_engine_process();
1295        msgbus::send_order_event(endpoint, denied);
1296    }
1297
1298    fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1299        match self.trading_state {
1300            TradingState::Halted => match command {
1301                TradingCommand::SubmitOrder(submit_order) => {
1302                    let order = {
1303                        let cache = self.cache.borrow();
1304                        cache.order(&submit_order.client_order_id).cloned()
1305                    };
1306                    if let Some(order) = order {
1307                        self.deny_order(order, "TradingState::HALTED");
1308                    }
1309                }
1310                TradingCommand::SubmitOrderList(submit_order_list) => {
1311                    let orders: Vec<OrderAny> = self.cache.borrow().orders_for_ids(
1312                        &submit_order_list.order_list.client_order_ids,
1313                        &submit_order_list,
1314                    );
1315                    self.deny_order_list(&orders, "TradingState::HALTED");
1316                }
1317                _ => {}
1318            },
1319            TradingState::Reducing => match command {
1320                TradingCommand::SubmitOrder(submit_order) => {
1321                    let order = {
1322                        let cache = self.cache.borrow();
1323                        cache.order(&submit_order.client_order_id).cloned()
1324                    };
1325                    if let Some(order) = order {
1326                        if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1327                            self.deny_order(
1328                                order,
1329                                &format!(
1330                                    "BUY when TradingState::REDUCING and LONG {}",
1331                                    instrument.id()
1332                                ),
1333                            );
1334                        } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1335                            self.deny_order(
1336                                order,
1337                                &format!(
1338                                    "SELL when TradingState::REDUCING and SHORT {}",
1339                                    instrument.id()
1340                                ),
1341                            );
1342                        }
1343                    }
1344                }
1345                TradingCommand::SubmitOrderList(submit_order_list) => {
1346                    let orders: Vec<OrderAny> = self.cache.borrow().orders_for_ids(
1347                        &submit_order_list.order_list.client_order_ids,
1348                        &submit_order_list,
1349                    );
1350                    for order in &orders {
1351                        if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1352                            self.deny_order_list(
1353                                &orders,
1354                                &format!(
1355                                    "BUY when TradingState::REDUCING and LONG {}",
1356                                    instrument.id()
1357                                ),
1358                            );
1359                            return;
1360                        } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1361                            self.deny_order_list(
1362                                &orders,
1363                                &format!(
1364                                    "SELL when TradingState::REDUCING and SHORT {}",
1365                                    instrument.id()
1366                                ),
1367                            );
1368                            return;
1369                        }
1370                    }
1371                }
1372                _ => {}
1373            },
1374            TradingState::Active => match command {
1375                TradingCommand::SubmitOrder(submit_order) => {
1376                    self.throttled_submit_order.send(submit_order);
1377                }
1378                TradingCommand::SubmitOrderList(submit_order_list) => {
1379                    // TODO: implement throttler for order lists
1380                    self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1381                }
1382                _ => {}
1383            },
1384        }
1385    }
1386
1387    fn send_to_execution(&self, command: TradingCommand) {
1388        let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
1389        msgbus::send_trading_command(endpoint, command);
1390    }
1391
1392    fn handle_event(&mut self, event: OrderEventAny) {
1393        // We intend to extend the risk engine to be able to handle additional events.
1394        // For now we just log.
1395        if self.config.debug {
1396            log::debug!("{RECV}{EVT} {event:?}");
1397        }
1398    }
1399}