1pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
24
25use config::RiskEngineConfig;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 logging::{CMD, EVT, RECV},
30 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31 msgbus,
32 throttler::Throttler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36 accounts::{Account, AccountAny},
37 enums::{InstrumentClass, OrderSide, OrderStatus, TimeInForce, TradingState},
38 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
39 identifiers::InstrumentId,
40 instruments::{Instrument, InstrumentAny},
41 orders::{Order, OrderAny, OrderList},
42 types::{Currency, Money, Price, Quantity},
43};
44use nautilus_portfolio::Portfolio;
45use rust_decimal::{Decimal, prelude::ToPrimitive};
46use ustr::Ustr;
47
48type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
49type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
50
51#[allow(dead_code)]
58pub struct RiskEngine {
59 clock: Rc<RefCell<dyn Clock>>,
60 cache: Rc<RefCell<Cache>>,
61 portfolio: Portfolio,
62 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
63 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
64 max_notional_per_order: HashMap<InstrumentId, Decimal>,
65 trading_state: TradingState,
66 config: RiskEngineConfig,
67}
68
69impl Debug for RiskEngine {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct(stringify!(RiskEngine)).finish()
72 }
73}
74
75impl RiskEngine {
76 pub fn new(
78 config: RiskEngineConfig,
79 portfolio: Portfolio,
80 clock: Rc<RefCell<dyn Clock>>,
81 cache: Rc<RefCell<Cache>>,
82 ) -> Self {
83 let throttled_submit_order =
84 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
85
86 let throttled_modify_order =
87 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
88
89 Self {
90 clock,
91 cache,
92 portfolio,
93 throttled_submit_order,
94 throttled_modify_order,
95 max_notional_per_order: HashMap::new(),
96 trading_state: TradingState::Active,
97 config,
98 }
99 }
100
101 fn create_submit_order_throttler(
102 config: &RiskEngineConfig,
103 clock: Rc<RefCell<dyn Clock>>,
104 cache: Rc<RefCell<Cache>>,
105 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
106 let success_handler = {
107 Box::new(move |submit_order: SubmitOrder| {
108 msgbus::send_any(
109 "ExecEngine.execute".into(),
110 &TradingCommand::SubmitOrder(submit_order),
111 );
112 }) as Box<dyn Fn(SubmitOrder)>
113 };
114
115 let failure_handler = {
116 let cache = cache;
117 let clock = clock.clone();
118 Box::new(move |submit_order: SubmitOrder| {
119 let reason = "REJECTED BY THROTTLER";
120 log::warn!(
121 "SubmitOrder for {} DENIED: {}",
122 submit_order.client_order_id,
123 reason
124 );
125
126 Self::handle_submit_order_cache(&cache, &submit_order);
127
128 let denied = Self::create_order_denied(&submit_order, reason, &clock);
129
130 msgbus::send_any("ExecEngine.process".into(), &denied);
131 }) as Box<dyn Fn(SubmitOrder)>
132 };
133
134 Throttler::new(
135 config.max_order_submit.limit,
136 config.max_order_submit.interval_ns,
137 clock,
138 "ORDER_SUBMIT_THROTTLER".to_string(),
139 success_handler,
140 Some(failure_handler),
141 Ustr::from(&UUID4::new().to_string()),
142 )
143 }
144
145 fn create_modify_order_throttler(
146 config: &RiskEngineConfig,
147 clock: Rc<RefCell<dyn Clock>>,
148 cache: Rc<RefCell<Cache>>,
149 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
150 let success_handler = {
151 Box::new(move |order: ModifyOrder| {
152 msgbus::send_any(
153 "ExecEngine.execute".into(),
154 &TradingCommand::ModifyOrder(order),
155 );
156 }) as Box<dyn Fn(ModifyOrder)>
157 };
158
159 let failure_handler = {
160 let cache = cache;
161 let clock = clock.clone();
162 Box::new(move |order: ModifyOrder| {
163 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
164 log::warn!(
165 "SubmitOrder for {} DENIED: {}",
166 order.client_order_id,
167 reason
168 );
169
170 let order = match Self::get_existing_order(&cache, &order) {
171 Some(order) => order,
172 None => return,
173 };
174
175 let rejected = Self::create_modify_rejected(&order, reason, &clock);
176
177 msgbus::send_any("ExecEngine.process".into(), &rejected);
178 }) as Box<dyn Fn(ModifyOrder)>
179 };
180
181 Throttler::new(
182 config.max_order_modify.limit,
183 config.max_order_modify.interval_ns,
184 clock,
185 "ORDER_MODIFY_THROTTLER".to_string(),
186 success_handler,
187 Some(failure_handler),
188 Ustr::from(&UUID4::new().to_string()),
189 )
190 }
191
192 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
193 let mut cache = cache.borrow_mut();
194 if !cache.order_exists(&submit_order.client_order_id) {
195 cache
196 .add_order(submit_order.order.clone(), None, None, false)
197 .map_err(|e| {
198 log::error!("Cannot add order to cache: {e}");
199 })
200 .unwrap();
201 }
202 }
203
204 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
205 let cache = cache.borrow();
206 if let Some(order) = cache.order(&order.client_order_id) {
207 Some(order.clone())
208 } else {
209 log::error!(
210 "Order with command.client_order_id: {} not found",
211 order.client_order_id
212 );
213 None
214 }
215 }
216
217 fn create_order_denied(
218 submit_order: &SubmitOrder,
219 reason: &str,
220 clock: &Rc<RefCell<dyn Clock>>,
221 ) -> OrderEventAny {
222 let timestamp = clock.borrow().timestamp_ns();
223 OrderEventAny::Denied(OrderDenied::new(
224 submit_order.trader_id,
225 submit_order.strategy_id,
226 submit_order.instrument_id,
227 submit_order.client_order_id,
228 reason.into(),
229 UUID4::new(),
230 timestamp,
231 timestamp,
232 ))
233 }
234
235 fn create_modify_rejected(
236 order: &OrderAny,
237 reason: &str,
238 clock: &Rc<RefCell<dyn Clock>>,
239 ) -> OrderEventAny {
240 let timestamp = clock.borrow().timestamp_ns();
241 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
242 order.trader_id(),
243 order.strategy_id(),
244 order.instrument_id(),
245 order.client_order_id(),
246 reason.into(),
247 UUID4::new(),
248 timestamp,
249 timestamp,
250 false,
251 order.venue_order_id(),
252 None,
253 ))
254 }
255
256 pub fn execute(&mut self, command: TradingCommand) {
260 self.handle_command(command);
262 }
263
264 pub fn process(&mut self, event: OrderEventAny) {
266 self.handle_event(event);
268 }
269
270 pub fn set_trading_state(&mut self, state: TradingState) {
272 if state == self.trading_state {
273 log::warn!("No change to trading state: already set to {state:?}");
274 return;
275 }
276
277 self.trading_state = state;
278
279 let _ts_now = self.clock.borrow().timestamp_ns();
280
281 msgbus::publish("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
287 }
288
289 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
291 self.max_notional_per_order.insert(instrument_id, new_value);
292
293 let new_value_str = new_value.to_string();
294 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
295 }
296
297 fn handle_command(&mut self, command: TradingCommand) {
301 if self.config.debug {
302 log::debug!("{CMD}{RECV} {command:?}");
303 }
304
305 match command {
306 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
307 TradingCommand::SubmitOrderList(submit_order_list) => {
308 self.handle_submit_order_list(submit_order_list);
309 }
310 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
311 TradingCommand::QueryAccount(query_account) => {
312 self.send_to_execution(TradingCommand::QueryAccount(query_account));
313 }
314 _ => {
315 log::error!("Cannot handle command: {command}");
316 }
317 }
318 }
319
320 fn handle_submit_order(&mut self, command: SubmitOrder) {
321 if self.config.bypass {
322 self.send_to_execution(TradingCommand::SubmitOrder(command));
323 return;
324 }
325
326 let order = &command.order;
327 if let Some(position_id) = command.position_id
328 && order.is_reduce_only()
329 {
330 let position_exists = {
331 let cache = self.cache.borrow();
332 cache
333 .position(&position_id)
334 .map(|pos| (pos.side, pos.quantity))
335 };
336
337 if let Some((pos_side, pos_quantity)) = position_exists {
338 if !order.would_reduce_only(pos_side, pos_quantity) {
339 self.deny_command(
340 TradingCommand::SubmitOrder(command),
341 &format!("Reduce only order would increase position {position_id}"),
342 );
343 return; }
345 } else {
346 self.deny_command(
347 TradingCommand::SubmitOrder(command),
348 &format!("Position {position_id} not found for reduce-only order"),
349 );
350 return;
351 }
352 }
353
354 let instrument_exists = {
355 let cache = self.cache.borrow();
356 cache.instrument(&command.instrument_id).cloned()
357 };
358
359 let instrument = if let Some(instrument) = instrument_exists {
360 instrument
361 } else {
362 self.deny_command(
363 TradingCommand::SubmitOrder(command.clone()),
364 &format!("Instrument for {} not found", command.instrument_id),
365 );
366 return; };
368
369 if !self.check_order(instrument.clone(), order.clone()) {
373 return; }
375
376 if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
377 return; }
379
380 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
382 }
383
384 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
385 if self.config.bypass {
386 self.send_to_execution(TradingCommand::SubmitOrderList(command));
387 return;
388 }
389
390 let instrument_exists = {
391 let cache = self.cache.borrow();
392 cache.instrument(&command.instrument_id).cloned()
393 };
394
395 let instrument = if let Some(instrument) = instrument_exists {
396 instrument
397 } else {
398 self.deny_command(
399 TradingCommand::SubmitOrderList(command.clone()),
400 &format!("no instrument found for {}", command.instrument_id),
401 );
402 return; };
404
405 for order in command.order_list.orders.clone() {
409 if !self.check_order(instrument.clone(), order) {
410 return; }
412 }
413
414 if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
415 self.deny_order_list(
416 command.order_list.clone(),
417 &format!("OrderList {} DENIED", command.order_list.id),
418 );
419 return; }
421
422 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
423 }
424
425 fn handle_modify_order(&mut self, command: ModifyOrder) {
426 let order_exists = {
430 let cache = self.cache.borrow();
431 cache.order(&command.client_order_id).cloned()
432 };
433
434 let order = if let Some(order) = order_exists {
435 order
436 } else {
437 log::error!(
438 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
439 command.client_order_id
440 );
441 return;
442 };
443
444 if order.is_closed() {
445 self.reject_modify_order(
446 order,
447 &format!(
448 "Order with command.client_order_id: {} already closed",
449 command.client_order_id
450 ),
451 );
452 return;
453 } else if order.status() == OrderStatus::PendingCancel {
454 self.reject_modify_order(
455 order,
456 &format!(
457 "Order with command.client_order_id: {} is already pending cancel",
458 command.client_order_id
459 ),
460 );
461 return;
462 }
463
464 let maybe_instrument = {
465 let cache = self.cache.borrow();
466 cache.instrument(&command.instrument_id).cloned()
467 };
468
469 let instrument = if let Some(instrument) = maybe_instrument {
470 instrument
471 } else {
472 self.reject_modify_order(
473 order,
474 &format!("no instrument found for {:?}", command.instrument_id),
475 );
476 return; };
478
479 let mut risk_msg = self.check_price(&instrument, command.price);
481 if let Some(risk_msg) = risk_msg {
482 self.reject_modify_order(order, &risk_msg);
483 return; }
485
486 risk_msg = self.check_price(&instrument, command.trigger_price);
488 if let Some(risk_msg) = risk_msg {
489 self.reject_modify_order(order, &risk_msg);
490 return; }
492
493 risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
495 if let Some(risk_msg) = risk_msg {
496 self.reject_modify_order(order, &risk_msg);
497 return; }
499
500 match self.trading_state {
502 TradingState::Halted => {
503 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
504 }
505 TradingState::Reducing => {
506 if let Some(quantity) = command.quantity
507 && quantity > order.quantity()
508 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
509 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
510 {
511 self.reject_modify_order(
512 order,
513 &format!(
514 "TradingState is REDUCING and update will increase exposure {}",
515 instrument.id()
516 ),
517 );
518 }
519 }
520 _ => {}
521 }
522
523 self.throttled_modify_order.send(command);
524 }
525
526 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
529 if order.time_in_force() == TimeInForce::Gtd {
533 let expire_time = order.expire_time().unwrap();
535 if expire_time <= self.clock.borrow().timestamp_ns() {
536 self.deny_order(
537 order,
538 &format!("GTD {} already past", expire_time.to_rfc3339()),
539 );
540 return false; }
542 }
543
544 if !self.check_order_price(instrument.clone(), order.clone())
545 || !self.check_order_quantity(instrument, order)
546 {
547 return false; }
549
550 true
551 }
552
553 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
554 if order.price().is_some() {
558 let risk_msg = self.check_price(&instrument, order.price());
559 if let Some(risk_msg) = risk_msg {
560 self.deny_order(order, &risk_msg);
561 return false; }
563 }
564
565 if order.trigger_price().is_some() {
569 let risk_msg = self.check_price(&instrument, order.trigger_price());
570 if let Some(risk_msg) = risk_msg {
571 self.deny_order(order, &risk_msg);
572 return false; }
574 }
575
576 true
577 }
578
579 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
580 let risk_msg = self.check_quantity(
581 &instrument,
582 Some(order.quantity()),
583 order.is_quote_quantity(),
584 );
585 if let Some(risk_msg) = risk_msg {
586 self.deny_order(order, &risk_msg);
587 return false; }
589
590 true
591 }
592
593 fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
594 let mut last_px: Option<Price> = None;
598 let mut max_notional: Option<Money> = None;
599
600 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
602 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
603 max_notional = Some(Money::new(
604 max_notional_setting_val
605 .to_f64()
606 .expect("Invalid decimal conversion"),
607 instrument.quote_currency(),
608 ));
609 }
610
611 let account_exists = {
613 let cache = self.cache.borrow();
614 cache.account_for_venue(&instrument.id().venue).cloned()
615 };
616
617 let account = if let Some(account) = account_exists {
618 account
619 } else {
620 log::debug!("Cannot find account for venue {}", instrument.id().venue);
621 return true; };
623 let cash_account = match account {
624 AccountAny::Cash(cash_account) => cash_account,
625 AccountAny::Margin(_) => return true, };
627 let free = cash_account.balance_free(Some(instrument.quote_currency()));
628 if self.config.debug {
629 log::debug!("Free cash: {free:?}");
630 }
631
632 let mut cum_notional_buy: Option<Money> = None;
633 let mut cum_notional_sell: Option<Money> = None;
634 let mut base_currency: Option<Currency> = None;
635 for order in &orders {
636 last_px = match order {
638 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
639 if last_px.is_none() {
640 let cache = self.cache.borrow();
641 if let Some(last_quote) = cache.quote(&instrument.id()) {
642 match order.order_side() {
643 OrderSide::Buy => Some(last_quote.ask_price),
644 OrderSide::Sell => Some(last_quote.bid_price),
645 _ => panic!("Invalid order side"),
646 }
647 } else {
648 let cache = self.cache.borrow();
649 let last_trade = cache.trade(&instrument.id());
650
651 if let Some(last_trade) = last_trade {
652 Some(last_trade.price)
653 } else {
654 log::warn!(
655 "Cannot check MARKET order risk: no prices for {}",
656 instrument.id()
657 );
658 continue;
659 }
660 }
661 } else {
662 last_px
663 }
664 }
665 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
666 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
667 if let Some(trigger_price) = order.trigger_price() {
668 Some(trigger_price)
669 } else {
670 log::warn!(
671 "Cannot check {} order risk: no trigger price was set", order.order_type()
673 );
674 continue;
675 }
676 }
677 _ => order.price(),
678 };
679
680 let last_px = if let Some(px) = last_px {
681 px
682 } else {
683 log::error!("Cannot check order risk: no price available");
684 continue;
685 };
686
687 let effective_price = if order.is_quote_quantity()
689 && !instrument.is_inverse()
690 && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
691 {
692 let cache = self.cache.borrow();
694 if let Some(quote_tick) = cache.quote(&instrument.id()) {
695 match order.order_side() {
696 OrderSide::Buy => last_px.min(quote_tick.ask_price),
698 OrderSide::Sell => last_px.max(quote_tick.bid_price),
700 _ => last_px,
701 }
702 } else {
703 last_px }
705 } else {
706 last_px
707 };
708
709 let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
710 instrument.calculate_base_quantity(order.quantity(), effective_price)
711 } else {
712 order.quantity()
713 };
714
715 if let Some(max_quantity) = instrument.max_quantity()
717 && effective_quantity > max_quantity
718 {
719 self.deny_order(
720 order.clone(),
721 &format!(
722 "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
723 ),
724 );
725 return false; }
727
728 if let Some(min_quantity) = instrument.min_quantity()
729 && effective_quantity < min_quantity
730 {
731 self.deny_order(
732 order.clone(),
733 &format!(
734 "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
735 ),
736 );
737 return false; }
739
740 let notional =
741 instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
742
743 if self.config.debug {
744 log::debug!("Notional: {notional:?}");
745 }
746
747 if let Some(max_notional_value) = max_notional
749 && notional > max_notional_value
750 {
751 self.deny_order(
752 order.clone(),
753 &format!(
754 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
755 ),
756 );
757 return false; }
759
760 if let Some(min_notional) = instrument.min_notional()
762 && notional.currency == min_notional.currency
763 && notional < min_notional
764 {
765 self.deny_order(
766 order.clone(),
767 &format!(
768 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
769 ),
770 );
771 return false; }
773
774 if let Some(max_notional) = instrument.max_notional()
776 && notional.currency == max_notional.currency
777 && notional > max_notional
778 {
779 self.deny_order(
780 order.clone(),
781 &format!(
782 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
783 ),
784 );
785 return false; }
787
788 let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
790 let order_balance_impact = match order.order_side() {
791 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
792 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
793 OrderSide::NoOrderSide => {
794 panic!("invalid `OrderSide`, was {}", order.order_side());
795 }
796 };
797
798 if self.config.debug {
799 log::debug!("Balance impact: {order_balance_impact}");
800 }
801
802 if let Some(free_val) = free
803 && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
804 {
805 self.deny_order(
806 order.clone(),
807 &format!(
808 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
809 ),
810 );
811 return false;
812 }
813
814 if base_currency.is_none() {
815 base_currency = instrument.base_currency();
816 }
817 if order.is_buy() {
818 match cum_notional_buy.as_mut() {
819 Some(cum_notional_buy_val) => {
820 cum_notional_buy_val.raw += -order_balance_impact.raw;
821 }
822 None => {
823 cum_notional_buy = Some(Money::from_raw(
824 -order_balance_impact.raw,
825 order_balance_impact.currency,
826 ));
827 }
828 }
829
830 if self.config.debug {
831 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
832 }
833
834 if let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
835 && cum_notional_buy > free
836 {
837 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
838 return false; }
840 } else if order.is_sell() {
841 if cash_account.base_currency.is_some() {
842 if order.is_reduce_only() {
843 if self.config.debug {
844 log::debug!(
845 "Reduce-only SELL skips cumulative notional free-balance check"
846 );
847 }
848 } else {
849 match cum_notional_sell.as_mut() {
850 Some(cum_notional_buy_val) => {
851 cum_notional_buy_val.raw += order_balance_impact.raw;
852 }
853 None => {
854 cum_notional_sell = Some(Money::from_raw(
855 order_balance_impact.raw,
856 order_balance_impact.currency,
857 ));
858 }
859 }
860 if self.config.debug {
861 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
862 }
863
864 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
865 && cum_notional_sell > free
866 {
867 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
868 return false; }
870 }
871 }
872 else if let Some(base_currency) = base_currency {
874 if order.is_reduce_only() {
875 if self.config.debug {
876 log::debug!(
877 "Reduce-only SELL skips base-currency cumulative free check"
878 );
879 }
880 continue;
881 }
882
883 let cash_value = Money::from_raw(
884 effective_quantity
885 .raw
886 .try_into()
887 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
888 .unwrap(),
889 base_currency,
890 );
891
892 if self.config.debug {
893 log::debug!("Cash value: {cash_value:?}");
894 log::debug!(
895 "Total: {:?}",
896 cash_account.balance_total(Some(base_currency))
897 );
898 log::debug!(
899 "Locked: {:?}",
900 cash_account.balance_locked(Some(base_currency))
901 );
902 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
903 }
904
905 match cum_notional_sell {
906 Some(mut cum_notional_sell) => {
907 cum_notional_sell.raw += cash_value.raw;
908 }
909 None => cum_notional_sell = Some(cash_value),
910 }
911
912 if self.config.debug {
913 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
914 }
915 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
916 && cum_notional_sell.raw > free.raw
917 {
918 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
919 return false; }
921 }
922 }
923 }
924
925 true }
928
929 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
930 let price_val = price?;
931
932 if price_val.precision > instrument.price_precision() {
933 return Some(format!(
934 "price {} invalid (precision {} > {})",
935 price_val,
936 price_val.precision,
937 instrument.price_precision()
938 ));
939 }
940
941 if instrument.instrument_class() != InstrumentClass::Option && price_val.raw <= 0 {
942 return Some(format!("price {price_val} invalid (<= 0)"));
943 }
944
945 None
946 }
947
948 fn check_quantity(
949 &self,
950 instrument: &InstrumentAny,
951 quantity: Option<Quantity>,
952 is_quote_quantity: bool,
953 ) -> Option<String> {
954 let quantity_val = quantity?;
955
956 if quantity_val.precision > instrument.size_precision() {
958 return Some(format!(
959 "quantity {} invalid (precision {} > {})",
960 quantity_val,
961 quantity_val.precision,
962 instrument.size_precision()
963 ));
964 }
965
966 if is_quote_quantity {
968 return None;
969 }
970
971 if let Some(max_quantity) = instrument.max_quantity()
973 && quantity_val > max_quantity
974 {
975 return Some(format!(
976 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
977 ));
978 }
979
980 if let Some(min_quantity) = instrument.min_quantity()
982 && quantity_val < min_quantity
983 {
984 return Some(format!(
985 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
986 ));
987 }
988
989 None
990 }
991
992 fn deny_command(&self, command: TradingCommand, reason: &str) {
995 match command {
996 TradingCommand::SubmitOrder(command) => {
997 self.deny_order(command.order, reason);
998 }
999 TradingCommand::SubmitOrderList(command) => {
1000 self.deny_order_list(command.order_list, reason);
1001 }
1002 _ => {
1003 panic!("Cannot deny command {command}");
1004 }
1005 }
1006 }
1007
1008 fn deny_order(&self, order: OrderAny, reason: &str) {
1009 log::warn!(
1010 "SubmitOrder for {} DENIED: {}",
1011 order.client_order_id(),
1012 reason
1013 );
1014
1015 if order.status() != OrderStatus::Initialized {
1016 return;
1017 }
1018
1019 let mut cache = self.cache.borrow_mut();
1020 if !cache.order_exists(&order.client_order_id()) {
1021 cache
1022 .add_order(order.clone(), None, None, false)
1023 .map_err(|e| {
1024 log::error!("Cannot add order to cache: {e}");
1025 })
1026 .unwrap();
1027 }
1028
1029 let denied = OrderEventAny::Denied(OrderDenied::new(
1030 order.trader_id(),
1031 order.strategy_id(),
1032 order.instrument_id(),
1033 order.client_order_id(),
1034 reason.into(),
1035 UUID4::new(),
1036 self.clock.borrow().timestamp_ns(),
1037 self.clock.borrow().timestamp_ns(),
1038 ));
1039
1040 msgbus::send_any("ExecEngine.process".into(), &denied);
1041 }
1042
1043 fn deny_order_list(&self, order_list: OrderList, reason: &str) {
1044 for order in order_list.orders {
1045 if !order.is_closed() {
1046 self.deny_order(order, reason);
1047 }
1048 }
1049 }
1050
1051 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1052 let ts_event = self.clock.borrow().timestamp_ns();
1053 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1054 order.trader_id(),
1055 order.strategy_id(),
1056 order.instrument_id(),
1057 order.client_order_id(),
1058 reason.into(),
1059 UUID4::new(),
1060 ts_event,
1061 ts_event,
1062 false,
1063 order.venue_order_id(),
1064 order.account_id(),
1065 ));
1066
1067 msgbus::send_any("ExecEngine.process".into(), &denied);
1068 }
1069
1070 fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1073 match self.trading_state {
1074 TradingState::Halted => match command {
1075 TradingCommand::SubmitOrder(submit_order) => {
1076 self.deny_order(submit_order.order, "TradingState::HALTED");
1077 }
1078 TradingCommand::SubmitOrderList(submit_order_list) => {
1079 self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1080 }
1081 _ => {}
1082 },
1083 TradingState::Reducing => match command {
1084 TradingCommand::SubmitOrder(submit_order) => {
1085 let order = submit_order.order;
1086 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1087 self.deny_order(
1088 order,
1089 &format!(
1090 "BUY when TradingState::REDUCING and LONG {}",
1091 instrument.id()
1092 ),
1093 );
1094 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1095 self.deny_order(
1096 order,
1097 &format!(
1098 "SELL when TradingState::REDUCING and SHORT {}",
1099 instrument.id()
1100 ),
1101 );
1102 }
1103 }
1104 TradingCommand::SubmitOrderList(submit_order_list) => {
1105 let order_list = submit_order_list.order_list;
1106 for order in &order_list.orders {
1107 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1108 self.deny_order_list(
1109 order_list,
1110 &format!(
1111 "BUY when TradingState::REDUCING and LONG {}",
1112 instrument.id()
1113 ),
1114 );
1115 return;
1116 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1117 self.deny_order_list(
1118 order_list,
1119 &format!(
1120 "SELL when TradingState::REDUCING and SHORT {}",
1121 instrument.id()
1122 ),
1123 );
1124 return;
1125 }
1126 }
1127 }
1128 _ => {}
1129 },
1130 TradingState::Active => match command {
1131 TradingCommand::SubmitOrder(submit_order) => {
1132 self.throttled_submit_order.send(submit_order);
1133 }
1134 TradingCommand::SubmitOrderList(submit_order_list) => {
1135 self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1137 }
1138 _ => {}
1139 },
1140 }
1141 }
1142
1143 fn send_to_execution(&self, command: TradingCommand) {
1144 msgbus::send_any("ExecEngine.execute".into(), &command);
1145 }
1146
1147 fn handle_event(&mut self, event: OrderEventAny) {
1148 if self.config.debug {
1151 log::debug!("{RECV}{EVT} {event:?}");
1152 }
1153 }
1154}