1pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
24
25use config::RiskEngineConfig;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 logging::{CMD, EVT, RECV},
30 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31 msgbus,
32 throttler::Throttler,
33};
34use nautilus_core::UUID4;
35use nautilus_execution::trailing::{
36 trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
37};
38use nautilus_model::{
39 accounts::{Account, AccountAny},
40 enums::{
41 InstrumentClass, OrderSide, OrderStatus, TimeInForce, TradingState, TrailingOffsetType,
42 TriggerType,
43 },
44 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
45 identifiers::InstrumentId,
46 instruments::{Instrument, InstrumentAny},
47 orders::{Order, OrderAny, OrderList},
48 types::{Currency, Money, Price, Quantity},
49};
50use nautilus_portfolio::Portfolio;
51use rust_decimal::{Decimal, prelude::ToPrimitive};
52use ustr::Ustr;
53
54type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
55type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
56
57#[allow(dead_code)]
64pub struct RiskEngine {
65 clock: Rc<RefCell<dyn Clock>>,
66 cache: Rc<RefCell<Cache>>,
67 portfolio: Portfolio,
68 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
69 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
70 max_notional_per_order: HashMap<InstrumentId, Decimal>,
71 trading_state: TradingState,
72 config: RiskEngineConfig,
73}
74
75impl Debug for RiskEngine {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct(stringify!(RiskEngine)).finish()
78 }
79}
80
81impl RiskEngine {
82 pub fn new(
84 config: RiskEngineConfig,
85 portfolio: Portfolio,
86 clock: Rc<RefCell<dyn Clock>>,
87 cache: Rc<RefCell<Cache>>,
88 ) -> Self {
89 let throttled_submit_order =
90 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
91
92 let throttled_modify_order =
93 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
94
95 Self {
96 clock,
97 cache,
98 portfolio,
99 throttled_submit_order,
100 throttled_modify_order,
101 max_notional_per_order: HashMap::new(),
102 trading_state: TradingState::Active,
103 config,
104 }
105 }
106
107 fn create_submit_order_throttler(
108 config: &RiskEngineConfig,
109 clock: Rc<RefCell<dyn Clock>>,
110 cache: Rc<RefCell<Cache>>,
111 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
112 let success_handler = {
113 Box::new(move |submit_order: SubmitOrder| {
114 msgbus::send_any(
115 "ExecEngine.execute".into(),
116 &TradingCommand::SubmitOrder(submit_order),
117 );
118 }) as Box<dyn Fn(SubmitOrder)>
119 };
120
121 let failure_handler = {
122 let cache = cache;
123 let clock = clock.clone();
124 Box::new(move |submit_order: SubmitOrder| {
125 let reason = "REJECTED BY THROTTLER";
126 log::warn!(
127 "SubmitOrder for {} DENIED: {}",
128 submit_order.client_order_id,
129 reason
130 );
131
132 Self::handle_submit_order_cache(&cache, &submit_order);
133
134 let denied = Self::create_order_denied(&submit_order, reason, &clock);
135
136 msgbus::send_any("ExecEngine.process".into(), &denied);
137 }) as Box<dyn Fn(SubmitOrder)>
138 };
139
140 Throttler::new(
141 config.max_order_submit.limit,
142 config.max_order_submit.interval_ns,
143 clock,
144 "ORDER_SUBMIT_THROTTLER".to_string(),
145 success_handler,
146 Some(failure_handler),
147 Ustr::from(&UUID4::new().to_string()),
148 )
149 }
150
151 fn create_modify_order_throttler(
152 config: &RiskEngineConfig,
153 clock: Rc<RefCell<dyn Clock>>,
154 cache: Rc<RefCell<Cache>>,
155 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
156 let success_handler = {
157 Box::new(move |order: ModifyOrder| {
158 msgbus::send_any(
159 "ExecEngine.execute".into(),
160 &TradingCommand::ModifyOrder(order),
161 );
162 }) as Box<dyn Fn(ModifyOrder)>
163 };
164
165 let failure_handler = {
166 let cache = cache;
167 let clock = clock.clone();
168 Box::new(move |order: ModifyOrder| {
169 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
170 log::warn!(
171 "SubmitOrder for {} DENIED: {}",
172 order.client_order_id,
173 reason
174 );
175
176 let order = match Self::get_existing_order(&cache, &order) {
177 Some(order) => order,
178 None => return,
179 };
180
181 let rejected = Self::create_modify_rejected(&order, reason, &clock);
182
183 msgbus::send_any("ExecEngine.process".into(), &rejected);
184 }) as Box<dyn Fn(ModifyOrder)>
185 };
186
187 Throttler::new(
188 config.max_order_modify.limit,
189 config.max_order_modify.interval_ns,
190 clock,
191 "ORDER_MODIFY_THROTTLER".to_string(),
192 success_handler,
193 Some(failure_handler),
194 Ustr::from(&UUID4::new().to_string()),
195 )
196 }
197
198 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
199 let mut cache = cache.borrow_mut();
200 if !cache.order_exists(&submit_order.client_order_id) {
201 cache
202 .add_order(submit_order.order.clone(), None, None, false)
203 .map_err(|e| {
204 log::error!("Cannot add order to cache: {e}");
205 })
206 .unwrap();
207 }
208 }
209
210 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
211 let cache = cache.borrow();
212 if let Some(order) = cache.order(&order.client_order_id) {
213 Some(order.clone())
214 } else {
215 log::error!(
216 "Order with command.client_order_id: {} not found",
217 order.client_order_id
218 );
219 None
220 }
221 }
222
223 fn create_order_denied(
224 submit_order: &SubmitOrder,
225 reason: &str,
226 clock: &Rc<RefCell<dyn Clock>>,
227 ) -> OrderEventAny {
228 let timestamp = clock.borrow().timestamp_ns();
229 OrderEventAny::Denied(OrderDenied::new(
230 submit_order.trader_id,
231 submit_order.strategy_id,
232 submit_order.instrument_id,
233 submit_order.client_order_id,
234 reason.into(),
235 UUID4::new(),
236 timestamp,
237 timestamp,
238 ))
239 }
240
241 fn create_modify_rejected(
242 order: &OrderAny,
243 reason: &str,
244 clock: &Rc<RefCell<dyn Clock>>,
245 ) -> OrderEventAny {
246 let timestamp = clock.borrow().timestamp_ns();
247 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
248 order.trader_id(),
249 order.strategy_id(),
250 order.instrument_id(),
251 order.client_order_id(),
252 reason.into(),
253 UUID4::new(),
254 timestamp,
255 timestamp,
256 false,
257 order.venue_order_id(),
258 None,
259 ))
260 }
261
262 pub fn execute(&mut self, command: TradingCommand) {
266 self.handle_command(command);
268 }
269
270 pub fn process(&mut self, event: OrderEventAny) {
272 self.handle_event(event);
274 }
275
276 pub fn set_trading_state(&mut self, state: TradingState) {
278 if state == self.trading_state {
279 log::warn!("No change to trading state: already set to {state:?}");
280 return;
281 }
282
283 self.trading_state = state;
284
285 let _ts_now = self.clock.borrow().timestamp_ns();
286
287 msgbus::publish("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
293 }
294
295 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
297 self.max_notional_per_order.insert(instrument_id, new_value);
298
299 let new_value_str = new_value.to_string();
300 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
301 }
302
303 fn handle_command(&mut self, command: TradingCommand) {
307 if self.config.debug {
308 log::debug!("{CMD}{RECV} {command:?}");
309 }
310
311 match command {
312 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
313 TradingCommand::SubmitOrderList(submit_order_list) => {
314 self.handle_submit_order_list(submit_order_list);
315 }
316 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
317 TradingCommand::QueryAccount(query_account) => {
318 self.send_to_execution(TradingCommand::QueryAccount(query_account));
319 }
320 _ => {
321 log::error!("Cannot handle command: {command}");
322 }
323 }
324 }
325
326 fn handle_submit_order(&mut self, command: SubmitOrder) {
327 if self.config.bypass {
328 self.send_to_execution(TradingCommand::SubmitOrder(command));
329 return;
330 }
331
332 let order = &command.order;
333 if let Some(position_id) = command.position_id
334 && order.is_reduce_only()
335 {
336 let position_exists = {
337 let cache = self.cache.borrow();
338 cache
339 .position(&position_id)
340 .map(|pos| (pos.side, pos.quantity))
341 };
342
343 if let Some((pos_side, pos_quantity)) = position_exists {
344 if !order.would_reduce_only(pos_side, pos_quantity) {
345 self.deny_command(
346 TradingCommand::SubmitOrder(command),
347 &format!("Reduce only order would increase position {position_id}"),
348 );
349 return; }
351 } else {
352 self.deny_command(
353 TradingCommand::SubmitOrder(command),
354 &format!("Position {position_id} not found for reduce-only order"),
355 );
356 return;
357 }
358 }
359
360 let instrument_exists = {
361 let cache = self.cache.borrow();
362 cache.instrument(&command.instrument_id).cloned()
363 };
364
365 let instrument = if let Some(instrument) = instrument_exists {
366 instrument
367 } else {
368 self.deny_command(
369 TradingCommand::SubmitOrder(command.clone()),
370 &format!("Instrument for {} not found", command.instrument_id),
371 );
372 return; };
374
375 if !self.check_order(instrument.clone(), order.clone()) {
379 return; }
381
382 if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
383 return; }
385
386 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
388 }
389
390 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
391 if self.config.bypass {
392 self.send_to_execution(TradingCommand::SubmitOrderList(command));
393 return;
394 }
395
396 let instrument_exists = {
397 let cache = self.cache.borrow();
398 cache.instrument(&command.instrument_id).cloned()
399 };
400
401 let instrument = if let Some(instrument) = instrument_exists {
402 instrument
403 } else {
404 self.deny_command(
405 TradingCommand::SubmitOrderList(command.clone()),
406 &format!("no instrument found for {}", command.instrument_id),
407 );
408 return; };
410
411 for order in command.order_list.orders.clone() {
415 if !self.check_order(instrument.clone(), order) {
416 return; }
418 }
419
420 if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
421 self.deny_order_list(
422 command.order_list.clone(),
423 &format!("OrderList {} DENIED", command.order_list.id),
424 );
425 return; }
427
428 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
429 }
430
431 fn handle_modify_order(&mut self, command: ModifyOrder) {
432 let order_exists = {
436 let cache = self.cache.borrow();
437 cache.order(&command.client_order_id).cloned()
438 };
439
440 let order = if let Some(order) = order_exists {
441 order
442 } else {
443 log::error!(
444 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
445 command.client_order_id
446 );
447 return;
448 };
449
450 if order.is_closed() {
451 self.reject_modify_order(
452 order,
453 &format!(
454 "Order with command.client_order_id: {} already closed",
455 command.client_order_id
456 ),
457 );
458 return;
459 } else if order.status() == OrderStatus::PendingCancel {
460 self.reject_modify_order(
461 order,
462 &format!(
463 "Order with command.client_order_id: {} is already pending cancel",
464 command.client_order_id
465 ),
466 );
467 return;
468 }
469
470 let maybe_instrument = {
471 let cache = self.cache.borrow();
472 cache.instrument(&command.instrument_id).cloned()
473 };
474
475 let instrument = if let Some(instrument) = maybe_instrument {
476 instrument
477 } else {
478 self.reject_modify_order(
479 order,
480 &format!("no instrument found for {:?}", command.instrument_id),
481 );
482 return; };
484
485 let mut risk_msg = self.check_price(&instrument, command.price);
487 if let Some(risk_msg) = risk_msg {
488 self.reject_modify_order(order, &risk_msg);
489 return; }
491
492 risk_msg = self.check_price(&instrument, command.trigger_price);
494 if let Some(risk_msg) = risk_msg {
495 self.reject_modify_order(order, &risk_msg);
496 return; }
498
499 risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
501 if let Some(risk_msg) = risk_msg {
502 self.reject_modify_order(order, &risk_msg);
503 return; }
505
506 match self.trading_state {
508 TradingState::Halted => {
509 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
510 }
511 TradingState::Reducing => {
512 if let Some(quantity) = command.quantity
513 && quantity > order.quantity()
514 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
515 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
516 {
517 self.reject_modify_order(
518 order,
519 &format!(
520 "TradingState is REDUCING and update will increase exposure {}",
521 instrument.id()
522 ),
523 );
524 }
525 }
526 _ => {}
527 }
528
529 self.throttled_modify_order.send(command);
530 }
531
532 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
535 if order.time_in_force() == TimeInForce::Gtd {
539 let expire_time = order.expire_time().unwrap();
541 if expire_time <= self.clock.borrow().timestamp_ns() {
542 self.deny_order(
543 order,
544 &format!("GTD {} already past", expire_time.to_rfc3339()),
545 );
546 return false; }
548 }
549
550 if !self.check_order_price(instrument.clone(), order.clone())
551 || !self.check_order_quantity(instrument, order)
552 {
553 return false; }
555
556 true
557 }
558
559 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
560 if order.price().is_some() {
564 let risk_msg = self.check_price(&instrument, order.price());
565 if let Some(risk_msg) = risk_msg {
566 self.deny_order(order, &risk_msg);
567 return false; }
569 }
570
571 if order.trigger_price().is_some() {
575 let risk_msg = self.check_price(&instrument, order.trigger_price());
576 if let Some(risk_msg) = risk_msg {
577 self.deny_order(order, &risk_msg);
578 return false; }
580 }
581
582 true
583 }
584
585 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
586 let risk_msg = self.check_quantity(
587 &instrument,
588 Some(order.quantity()),
589 order.is_quote_quantity(),
590 );
591 if let Some(risk_msg) = risk_msg {
592 self.deny_order(order, &risk_msg);
593 return false; }
595
596 true
597 }
598
599 fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
600 let mut last_px: Option<Price> = None;
604 let mut max_notional: Option<Money> = None;
605
606 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
608 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
609 max_notional = Some(Money::new(
610 max_notional_setting_val
611 .to_f64()
612 .expect("Invalid decimal conversion"),
613 instrument.quote_currency(),
614 ));
615 }
616
617 let account_exists = {
619 let cache = self.cache.borrow();
620 cache.account_for_venue(&instrument.id().venue).cloned()
621 };
622
623 let account = if let Some(account) = account_exists {
624 account
625 } else {
626 log::debug!("Cannot find account for venue {}", instrument.id().venue);
627 return true; };
629 let cash_account = match account {
630 AccountAny::Cash(cash_account) => cash_account,
631 AccountAny::Margin(_) => return true, };
633 let free = cash_account.balance_free(Some(instrument.quote_currency()));
634 if self.config.debug {
635 log::debug!("Free cash: {free:?}");
636 }
637
638 let mut cum_notional_buy: Option<Money> = None;
639 let mut cum_notional_sell: Option<Money> = None;
640 let mut base_currency: Option<Currency> = None;
641 for order in &orders {
642 last_px = match order {
644 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
645 if last_px.is_none() {
646 let cache = self.cache.borrow();
647 if let Some(last_quote) = cache.quote(&instrument.id()) {
648 match order.order_side() {
649 OrderSide::Buy => Some(last_quote.ask_price),
650 OrderSide::Sell => Some(last_quote.bid_price),
651 _ => panic!("Invalid order side"),
652 }
653 } else {
654 let cache = self.cache.borrow();
655 let last_trade = cache.trade(&instrument.id());
656
657 if let Some(last_trade) = last_trade {
658 Some(last_trade.price)
659 } else {
660 log::warn!(
661 "Cannot check MARKET order risk: no prices for {}",
662 instrument.id()
663 );
664 continue;
665 }
666 }
667 } else {
668 last_px
669 }
670 }
671 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
672 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
673 if let Some(trigger_price) = order.trigger_price() {
674 Some(trigger_price)
675 } else {
676 let offset_type = order.trailing_offset_type().unwrap();
678 if !matches!(
679 offset_type,
680 TrailingOffsetType::Price
681 | TrailingOffsetType::BasisPoints
682 | TrailingOffsetType::Ticks
683 ) {
684 self.deny_order(
685 order.clone(),
686 &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
687 );
688 return false;
689 }
690
691 let trigger_type = order.trigger_type().unwrap();
692 let cache = self.cache.borrow();
693
694 if trigger_type == TriggerType::BidAsk {
695 if let Some(quote) = cache.quote(&instrument.id()) {
696 match trailing_stop_calculate_with_bid_ask(
697 instrument.price_increment(),
698 order.trailing_offset_type().unwrap(),
699 order.order_side_specified(),
700 order.trailing_offset().unwrap(),
701 quote.bid_price,
702 quote.ask_price,
703 ) {
704 Ok(calculated_trigger) => Some(calculated_trigger),
705 Err(e) => {
706 log::warn!(
707 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
708 order.order_type()
709 );
710 continue;
711 }
712 }
713 } else {
714 log::warn!(
715 "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
716 order.order_type(),
717 instrument.id()
718 );
719 continue;
720 }
721 } else if let Some(last_trade) = cache.trade(&instrument.id()) {
722 match trailing_stop_calculate_with_last(
723 instrument.price_increment(),
724 order.trailing_offset_type().unwrap(),
725 order.order_side_specified(),
726 order.trailing_offset().unwrap(),
727 last_trade.price,
728 ) {
729 Ok(calculated_trigger) => Some(calculated_trigger),
730 Err(e) => {
731 log::warn!(
732 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
733 order.order_type(),
734 e
735 );
736 continue;
737 }
738 }
739 } else if trigger_type == TriggerType::LastOrBidAsk {
740 if let Some(quote) = cache.quote(&instrument.id()) {
742 match trailing_stop_calculate_with_bid_ask(
743 instrument.price_increment(),
744 order.trailing_offset_type().unwrap(),
745 order.order_side_specified(),
746 order.trailing_offset().unwrap(),
747 quote.bid_price,
748 quote.ask_price,
749 ) {
750 Ok(calculated_trigger) => Some(calculated_trigger),
751 Err(e) => {
752 log::warn!(
753 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
754 order.order_type()
755 );
756 continue;
757 }
758 }
759 } else {
760 log::warn!(
761 "Cannot check {} order risk: no trigger price set and no market data available for {}",
762 order.order_type(),
763 instrument.id()
764 );
765 continue;
766 }
767 } else {
768 log::warn!(
769 "Cannot check {} order risk: no trigger price set and no market data available for {}",
770 order.order_type(),
771 instrument.id()
772 );
773 continue;
774 }
775 }
776 }
777 _ => order.price(),
778 };
779
780 let last_px = if let Some(px) = last_px {
781 px
782 } else {
783 log::error!("Cannot check order risk: no price available");
784 continue;
785 };
786
787 let effective_price = if order.is_quote_quantity()
789 && !instrument.is_inverse()
790 && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
791 {
792 let cache = self.cache.borrow();
794 if let Some(quote_tick) = cache.quote(&instrument.id()) {
795 match order.order_side() {
796 OrderSide::Buy => last_px.min(quote_tick.ask_price),
798 OrderSide::Sell => last_px.max(quote_tick.bid_price),
800 _ => last_px,
801 }
802 } else {
803 last_px }
805 } else {
806 last_px
807 };
808
809 let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
810 instrument.calculate_base_quantity(order.quantity(), effective_price)
811 } else {
812 order.quantity()
813 };
814
815 if let Some(max_quantity) = instrument.max_quantity()
817 && effective_quantity > max_quantity
818 {
819 self.deny_order(
820 order.clone(),
821 &format!(
822 "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
823 ),
824 );
825 return false; }
827
828 if let Some(min_quantity) = instrument.min_quantity()
829 && effective_quantity < min_quantity
830 {
831 self.deny_order(
832 order.clone(),
833 &format!(
834 "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
835 ),
836 );
837 return false; }
839
840 let notional =
841 instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
842
843 if self.config.debug {
844 log::debug!("Notional: {notional:?}");
845 }
846
847 if let Some(max_notional_value) = max_notional
849 && notional > max_notional_value
850 {
851 self.deny_order(
852 order.clone(),
853 &format!(
854 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
855 ),
856 );
857 return false; }
859
860 if let Some(min_notional) = instrument.min_notional()
862 && notional.currency == min_notional.currency
863 && notional < min_notional
864 {
865 self.deny_order(
866 order.clone(),
867 &format!(
868 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
869 ),
870 );
871 return false; }
873
874 if let Some(max_notional) = instrument.max_notional()
876 && notional.currency == max_notional.currency
877 && notional > max_notional
878 {
879 self.deny_order(
880 order.clone(),
881 &format!(
882 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
883 ),
884 );
885 return false; }
887
888 let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
890 let order_balance_impact = match order.order_side() {
891 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
892 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
893 OrderSide::NoOrderSide => {
894 panic!("invalid `OrderSide`, was {}", order.order_side());
895 }
896 };
897
898 if self.config.debug {
899 log::debug!("Balance impact: {order_balance_impact}");
900 }
901
902 if let Some(free_val) = free
903 && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
904 {
905 self.deny_order(
906 order.clone(),
907 &format!(
908 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
909 ),
910 );
911 return false;
912 }
913
914 if base_currency.is_none() {
915 base_currency = instrument.base_currency();
916 }
917 if order.is_buy() {
918 match cum_notional_buy.as_mut() {
919 Some(cum_notional_buy_val) => {
920 cum_notional_buy_val.raw += -order_balance_impact.raw;
921 }
922 None => {
923 cum_notional_buy = Some(Money::from_raw(
924 -order_balance_impact.raw,
925 order_balance_impact.currency,
926 ));
927 }
928 }
929
930 if self.config.debug {
931 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
932 }
933
934 if let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
935 && cum_notional_buy > free
936 {
937 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
938 return false; }
940 } else if order.is_sell() {
941 if cash_account.base_currency.is_some() {
942 if order.is_reduce_only() {
943 if self.config.debug {
944 log::debug!(
945 "Reduce-only SELL skips cumulative notional free-balance check"
946 );
947 }
948 } else {
949 match cum_notional_sell.as_mut() {
950 Some(cum_notional_buy_val) => {
951 cum_notional_buy_val.raw += order_balance_impact.raw;
952 }
953 None => {
954 cum_notional_sell = Some(Money::from_raw(
955 order_balance_impact.raw,
956 order_balance_impact.currency,
957 ));
958 }
959 }
960 if self.config.debug {
961 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
962 }
963
964 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
965 && cum_notional_sell > free
966 {
967 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
968 return false; }
970 }
971 }
972 else if let Some(base_currency) = base_currency {
974 if order.is_reduce_only() {
975 if self.config.debug {
976 log::debug!(
977 "Reduce-only SELL skips base-currency cumulative free check"
978 );
979 }
980 continue;
981 }
982
983 let cash_value = Money::from_raw(
984 effective_quantity
985 .raw
986 .try_into()
987 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
988 .unwrap(),
989 base_currency,
990 );
991
992 if self.config.debug {
993 log::debug!("Cash value: {cash_value:?}");
994 log::debug!(
995 "Total: {:?}",
996 cash_account.balance_total(Some(base_currency))
997 );
998 log::debug!(
999 "Locked: {:?}",
1000 cash_account.balance_locked(Some(base_currency))
1001 );
1002 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1003 }
1004
1005 match cum_notional_sell {
1006 Some(mut value) => {
1007 value.raw += cash_value.raw;
1008 cum_notional_sell = Some(value);
1009 }
1010 None => cum_notional_sell = Some(cash_value),
1011 }
1012
1013 if self.config.debug {
1014 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1015 }
1016 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1017 && cum_notional_sell.raw > free.raw
1018 {
1019 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1020 return false; }
1022 }
1023 }
1024 }
1025
1026 true }
1029
1030 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1031 let price_val = price?;
1032
1033 if price_val.precision > instrument.price_precision() {
1034 return Some(format!(
1035 "price {} invalid (precision {} > {})",
1036 price_val,
1037 price_val.precision,
1038 instrument.price_precision()
1039 ));
1040 }
1041
1042 if !matches!(
1043 instrument.instrument_class(),
1044 InstrumentClass::Option
1045 | InstrumentClass::FuturesSpread
1046 | InstrumentClass::OptionSpread
1047 ) && price_val.raw <= 0
1048 {
1049 return Some(format!("price {price_val} invalid (<= 0)"));
1050 }
1051
1052 None
1053 }
1054
1055 fn check_quantity(
1056 &self,
1057 instrument: &InstrumentAny,
1058 quantity: Option<Quantity>,
1059 is_quote_quantity: bool,
1060 ) -> Option<String> {
1061 let quantity_val = quantity?;
1062
1063 if quantity_val.precision > instrument.size_precision() {
1065 return Some(format!(
1066 "quantity {} invalid (precision {} > {})",
1067 quantity_val,
1068 quantity_val.precision,
1069 instrument.size_precision()
1070 ));
1071 }
1072
1073 if is_quote_quantity {
1075 return None;
1076 }
1077
1078 if let Some(max_quantity) = instrument.max_quantity()
1080 && quantity_val > max_quantity
1081 {
1082 return Some(format!(
1083 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1084 ));
1085 }
1086
1087 if let Some(min_quantity) = instrument.min_quantity()
1089 && quantity_val < min_quantity
1090 {
1091 return Some(format!(
1092 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1093 ));
1094 }
1095
1096 None
1097 }
1098
1099 fn deny_command(&self, command: TradingCommand, reason: &str) {
1102 match command {
1103 TradingCommand::SubmitOrder(command) => {
1104 self.deny_order(command.order, reason);
1105 }
1106 TradingCommand::SubmitOrderList(command) => {
1107 self.deny_order_list(command.order_list, reason);
1108 }
1109 _ => {
1110 panic!("Cannot deny command {command}");
1111 }
1112 }
1113 }
1114
1115 fn deny_order(&self, order: OrderAny, reason: &str) {
1116 log::warn!(
1117 "SubmitOrder for {} DENIED: {}",
1118 order.client_order_id(),
1119 reason
1120 );
1121
1122 if order.status() != OrderStatus::Initialized {
1123 return;
1124 }
1125
1126 let mut cache = self.cache.borrow_mut();
1127 if !cache.order_exists(&order.client_order_id()) {
1128 cache
1129 .add_order(order.clone(), None, None, false)
1130 .map_err(|e| {
1131 log::error!("Cannot add order to cache: {e}");
1132 })
1133 .unwrap();
1134 }
1135
1136 let denied = OrderEventAny::Denied(OrderDenied::new(
1137 order.trader_id(),
1138 order.strategy_id(),
1139 order.instrument_id(),
1140 order.client_order_id(),
1141 reason.into(),
1142 UUID4::new(),
1143 self.clock.borrow().timestamp_ns(),
1144 self.clock.borrow().timestamp_ns(),
1145 ));
1146
1147 msgbus::send_any("ExecEngine.process".into(), &denied);
1148 }
1149
1150 fn deny_order_list(&self, order_list: OrderList, reason: &str) {
1151 for order in order_list.orders {
1152 if !order.is_closed() {
1153 self.deny_order(order, reason);
1154 }
1155 }
1156 }
1157
1158 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1159 let ts_event = self.clock.borrow().timestamp_ns();
1160 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1161 order.trader_id(),
1162 order.strategy_id(),
1163 order.instrument_id(),
1164 order.client_order_id(),
1165 reason.into(),
1166 UUID4::new(),
1167 ts_event,
1168 ts_event,
1169 false,
1170 order.venue_order_id(),
1171 order.account_id(),
1172 ));
1173
1174 msgbus::send_any("ExecEngine.process".into(), &denied);
1175 }
1176
1177 fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1180 match self.trading_state {
1181 TradingState::Halted => match command {
1182 TradingCommand::SubmitOrder(submit_order) => {
1183 self.deny_order(submit_order.order, "TradingState::HALTED");
1184 }
1185 TradingCommand::SubmitOrderList(submit_order_list) => {
1186 self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1187 }
1188 _ => {}
1189 },
1190 TradingState::Reducing => match command {
1191 TradingCommand::SubmitOrder(submit_order) => {
1192 let order = submit_order.order;
1193 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1194 self.deny_order(
1195 order,
1196 &format!(
1197 "BUY when TradingState::REDUCING and LONG {}",
1198 instrument.id()
1199 ),
1200 );
1201 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1202 self.deny_order(
1203 order,
1204 &format!(
1205 "SELL when TradingState::REDUCING and SHORT {}",
1206 instrument.id()
1207 ),
1208 );
1209 }
1210 }
1211 TradingCommand::SubmitOrderList(submit_order_list) => {
1212 let order_list = submit_order_list.order_list;
1213 for order in &order_list.orders {
1214 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1215 self.deny_order_list(
1216 order_list,
1217 &format!(
1218 "BUY when TradingState::REDUCING and LONG {}",
1219 instrument.id()
1220 ),
1221 );
1222 return;
1223 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1224 self.deny_order_list(
1225 order_list,
1226 &format!(
1227 "SELL when TradingState::REDUCING and SHORT {}",
1228 instrument.id()
1229 ),
1230 );
1231 return;
1232 }
1233 }
1234 }
1235 _ => {}
1236 },
1237 TradingState::Active => match command {
1238 TradingCommand::SubmitOrder(submit_order) => {
1239 self.throttled_submit_order.send(submit_order);
1240 }
1241 TradingCommand::SubmitOrderList(submit_order_list) => {
1242 self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1244 }
1245 _ => {}
1246 },
1247 }
1248 }
1249
1250 fn send_to_execution(&self, command: TradingCommand) {
1251 msgbus::send_any("ExecEngine.execute".into(), &command);
1252 }
1253
1254 fn handle_event(&mut self, event: OrderEventAny) {
1255 if self.config.debug {
1258 log::debug!("{RECV}{EVT} {event:?}");
1259 }
1260 }
1261}