1pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc};
24
25use config::RiskEngineConfig;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 logging::{CMD, EVT, RECV},
30 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31 msgbus,
32 throttler::Throttler,
33};
34use nautilus_core::UUID4;
35use nautilus_model::{
36 accounts::{Account, AccountAny},
37 enums::{InstrumentClass, OrderSide, OrderStatus, TimeInForce, TradingState},
38 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
39 identifiers::InstrumentId,
40 instruments::{Instrument, InstrumentAny},
41 orders::{Order, OrderAny, OrderList},
42 types::{Currency, Money, Price, Quantity},
43};
44use nautilus_portfolio::Portfolio;
45use rust_decimal::{Decimal, prelude::ToPrimitive};
46use ustr::Ustr;
47
48type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
49type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
50
51#[allow(dead_code)]
58pub struct RiskEngine {
59 clock: Rc<RefCell<dyn Clock>>,
60 cache: Rc<RefCell<Cache>>,
61 portfolio: Portfolio,
62 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
63 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
64 max_notional_per_order: HashMap<InstrumentId, Decimal>,
65 trading_state: TradingState,
66 config: RiskEngineConfig,
67}
68
69impl Debug for RiskEngine {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct(stringify!(RiskEngine)).finish()
72 }
73}
74
75impl RiskEngine {
76 pub fn new(
78 config: RiskEngineConfig,
79 portfolio: Portfolio,
80 clock: Rc<RefCell<dyn Clock>>,
81 cache: Rc<RefCell<Cache>>,
82 ) -> Self {
83 let throttled_submit_order =
84 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
85
86 let throttled_modify_order =
87 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
88
89 Self {
90 clock,
91 cache,
92 portfolio,
93 throttled_submit_order,
94 throttled_modify_order,
95 max_notional_per_order: HashMap::new(),
96 trading_state: TradingState::Active,
97 config,
98 }
99 }
100
101 fn create_submit_order_throttler(
102 config: &RiskEngineConfig,
103 clock: Rc<RefCell<dyn Clock>>,
104 cache: Rc<RefCell<Cache>>,
105 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
106 let success_handler = {
107 Box::new(move |submit_order: SubmitOrder| {
108 msgbus::send_any(
109 "ExecEngine.execute".into(),
110 &TradingCommand::SubmitOrder(submit_order),
111 );
112 }) as Box<dyn Fn(SubmitOrder)>
113 };
114
115 let failure_handler = {
116 let cache = cache;
117 let clock = clock.clone();
118 Box::new(move |submit_order: SubmitOrder| {
119 let reason = "REJECTED BY THROTTLER";
120 log::warn!(
121 "SubmitOrder for {} DENIED: {}",
122 submit_order.client_order_id,
123 reason
124 );
125
126 Self::handle_submit_order_cache(&cache, &submit_order);
127
128 let denied = Self::create_order_denied(&submit_order, reason, &clock);
129
130 msgbus::send_any("ExecEngine.process".into(), &denied);
131 }) as Box<dyn Fn(SubmitOrder)>
132 };
133
134 Throttler::new(
135 config.max_order_submit.limit,
136 config.max_order_submit.interval_ns,
137 clock,
138 "ORDER_SUBMIT_THROTTLER".to_string(),
139 success_handler,
140 Some(failure_handler),
141 Ustr::from(&UUID4::new().to_string()),
142 )
143 }
144
145 fn create_modify_order_throttler(
146 config: &RiskEngineConfig,
147 clock: Rc<RefCell<dyn Clock>>,
148 cache: Rc<RefCell<Cache>>,
149 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
150 let success_handler = {
151 Box::new(move |order: ModifyOrder| {
152 msgbus::send_any(
153 "ExecEngine.execute".into(),
154 &TradingCommand::ModifyOrder(order),
155 );
156 }) as Box<dyn Fn(ModifyOrder)>
157 };
158
159 let failure_handler = {
160 let cache = cache;
161 let clock = clock.clone();
162 Box::new(move |order: ModifyOrder| {
163 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
164 log::warn!(
165 "SubmitOrder for {} DENIED: {}",
166 order.client_order_id,
167 reason
168 );
169
170 let order = match Self::get_existing_order(&cache, &order) {
171 Some(order) => order,
172 None => return,
173 };
174
175 let rejected = Self::create_modify_rejected(&order, reason, &clock);
176
177 msgbus::send_any("ExecEngine.process".into(), &rejected);
178 }) as Box<dyn Fn(ModifyOrder)>
179 };
180
181 Throttler::new(
182 config.max_order_modify.limit,
183 config.max_order_modify.interval_ns,
184 clock,
185 "ORDER_MODIFY_THROTTLER".to_string(),
186 success_handler,
187 Some(failure_handler),
188 Ustr::from(&UUID4::new().to_string()),
189 )
190 }
191
192 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
193 let mut cache = cache.borrow_mut();
194 if !cache.order_exists(&submit_order.client_order_id) {
195 cache
196 .add_order(submit_order.order.clone(), None, None, false)
197 .map_err(|e| {
198 log::error!("Cannot add order to cache: {e}");
199 })
200 .unwrap();
201 }
202 }
203
204 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
205 let cache = cache.borrow();
206 if let Some(order) = cache.order(&order.client_order_id) {
207 Some(order.clone())
208 } else {
209 log::error!(
210 "Order with command.client_order_id: {} not found",
211 order.client_order_id
212 );
213 None
214 }
215 }
216
217 fn create_order_denied(
218 submit_order: &SubmitOrder,
219 reason: &str,
220 clock: &Rc<RefCell<dyn Clock>>,
221 ) -> OrderEventAny {
222 let timestamp = clock.borrow().timestamp_ns();
223 OrderEventAny::Denied(OrderDenied::new(
224 submit_order.trader_id,
225 submit_order.strategy_id,
226 submit_order.instrument_id,
227 submit_order.client_order_id,
228 reason.into(),
229 UUID4::new(),
230 timestamp,
231 timestamp,
232 ))
233 }
234
235 fn create_modify_rejected(
236 order: &OrderAny,
237 reason: &str,
238 clock: &Rc<RefCell<dyn Clock>>,
239 ) -> OrderEventAny {
240 let timestamp = clock.borrow().timestamp_ns();
241 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
242 order.trader_id(),
243 order.strategy_id(),
244 order.instrument_id(),
245 order.client_order_id(),
246 reason.into(),
247 UUID4::new(),
248 timestamp,
249 timestamp,
250 false,
251 order.venue_order_id(),
252 None,
253 ))
254 }
255
256 pub fn execute(&mut self, command: TradingCommand) {
260 self.handle_command(command);
262 }
263
264 pub fn process(&mut self, event: OrderEventAny) {
266 self.handle_event(event);
268 }
269
270 pub fn set_trading_state(&mut self, state: TradingState) {
272 if state == self.trading_state {
273 log::warn!("No change to trading state: already set to {state:?}");
274 return;
275 }
276
277 self.trading_state = state;
278
279 let _ts_now = self.clock.borrow().timestamp_ns();
280
281 msgbus::publish("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
287 }
288
289 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
291 self.max_notional_per_order.insert(instrument_id, new_value);
292
293 let new_value_str = new_value.to_string();
294 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
295 }
296
297 fn handle_command(&mut self, command: TradingCommand) {
301 if self.config.debug {
302 log::debug!("{CMD}{RECV} {command:?}");
303 }
304
305 match command {
306 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
307 TradingCommand::SubmitOrderList(submit_order_list) => {
308 self.handle_submit_order_list(submit_order_list);
309 }
310 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
311 TradingCommand::QueryAccount(query_account) => {
312 self.send_to_execution(TradingCommand::QueryAccount(query_account));
313 }
314 _ => {
315 log::error!("Cannot handle command: {command}");
316 }
317 }
318 }
319
320 fn handle_submit_order(&mut self, command: SubmitOrder) {
321 if self.config.bypass {
322 self.send_to_execution(TradingCommand::SubmitOrder(command));
323 return;
324 }
325
326 let order = &command.order;
327 if let Some(position_id) = command.position_id
328 && order.is_reduce_only()
329 {
330 let position_exists = {
331 let cache = self.cache.borrow();
332 cache
333 .position(&position_id)
334 .map(|pos| (pos.side, pos.quantity))
335 };
336
337 if let Some((pos_side, pos_quantity)) = position_exists {
338 if !order.would_reduce_only(pos_side, pos_quantity) {
339 self.deny_command(
340 TradingCommand::SubmitOrder(command),
341 &format!("Reduce only order would increase position {position_id}"),
342 );
343 return; }
345 } else {
346 self.deny_command(
347 TradingCommand::SubmitOrder(command),
348 &format!("Position {position_id} not found for reduce-only order"),
349 );
350 return;
351 }
352 }
353
354 let instrument_exists = {
355 let cache = self.cache.borrow();
356 cache.instrument(&command.instrument_id).cloned()
357 };
358
359 let instrument = if let Some(instrument) = instrument_exists {
360 instrument
361 } else {
362 self.deny_command(
363 TradingCommand::SubmitOrder(command.clone()),
364 &format!("Instrument for {} not found", command.instrument_id),
365 );
366 return; };
368
369 if !self.check_order(instrument.clone(), order.clone()) {
373 return; }
375
376 if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
377 return; }
379
380 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
382 }
383
384 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
385 if self.config.bypass {
386 self.send_to_execution(TradingCommand::SubmitOrderList(command));
387 return;
388 }
389
390 let instrument_exists = {
391 let cache = self.cache.borrow();
392 cache.instrument(&command.instrument_id).cloned()
393 };
394
395 let instrument = if let Some(instrument) = instrument_exists {
396 instrument
397 } else {
398 self.deny_command(
399 TradingCommand::SubmitOrderList(command.clone()),
400 &format!("no instrument found for {}", command.instrument_id),
401 );
402 return; };
404
405 for order in command.order_list.orders.clone() {
409 if !self.check_order(instrument.clone(), order) {
410 return; }
412 }
413
414 if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
415 self.deny_order_list(
416 command.order_list.clone(),
417 &format!("OrderList {} DENIED", command.order_list.id),
418 );
419 return; }
421
422 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
423 }
424
425 fn handle_modify_order(&mut self, command: ModifyOrder) {
426 let order_exists = {
430 let cache = self.cache.borrow();
431 cache.order(&command.client_order_id).cloned()
432 };
433
434 let order = if let Some(order) = order_exists {
435 order
436 } else {
437 log::error!(
438 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
439 command.client_order_id
440 );
441 return;
442 };
443
444 if order.is_closed() {
445 self.reject_modify_order(
446 order,
447 &format!(
448 "Order with command.client_order_id: {} already closed",
449 command.client_order_id
450 ),
451 );
452 return;
453 } else if order.status() == OrderStatus::PendingCancel {
454 self.reject_modify_order(
455 order,
456 &format!(
457 "Order with command.client_order_id: {} is already pending cancel",
458 command.client_order_id
459 ),
460 );
461 return;
462 }
463
464 let maybe_instrument = {
465 let cache = self.cache.borrow();
466 cache.instrument(&command.instrument_id).cloned()
467 };
468
469 let instrument = if let Some(instrument) = maybe_instrument {
470 instrument
471 } else {
472 self.reject_modify_order(
473 order,
474 &format!("no instrument found for {:?}", command.instrument_id),
475 );
476 return; };
478
479 let mut risk_msg = self.check_price(&instrument, command.price);
481 if let Some(risk_msg) = risk_msg {
482 self.reject_modify_order(order, &risk_msg);
483 return; }
485
486 risk_msg = self.check_price(&instrument, command.trigger_price);
488 if let Some(risk_msg) = risk_msg {
489 self.reject_modify_order(order, &risk_msg);
490 return; }
492
493 risk_msg = self.check_quantity(&instrument, command.quantity);
495 if let Some(risk_msg) = risk_msg {
496 self.reject_modify_order(order, &risk_msg);
497 return; }
499
500 match self.trading_state {
502 TradingState::Halted => {
503 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
504 }
505 TradingState::Reducing => {
506 if let Some(quantity) = command.quantity
507 && quantity > order.quantity()
508 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
509 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
510 {
511 self.reject_modify_order(
512 order,
513 &format!(
514 "TradingState is REDUCING and update will increase exposure {}",
515 instrument.id()
516 ),
517 );
518 }
519 }
520 _ => {}
521 }
522
523 self.throttled_modify_order.send(command);
524 }
525
526 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
529 if order.time_in_force() == TimeInForce::Gtd {
533 let expire_time = order.expire_time().unwrap();
535 if expire_time <= self.clock.borrow().timestamp_ns() {
536 self.deny_order(
537 order,
538 &format!("GTD {} already past", expire_time.to_rfc3339()),
539 );
540 return false; }
542 }
543
544 if !self.check_order_price(instrument.clone(), order.clone())
545 || !self.check_order_quantity(instrument, order)
546 {
547 return false; }
549
550 true
551 }
552
553 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
554 if order.price().is_some() {
558 let risk_msg = self.check_price(&instrument, order.price());
559 if let Some(risk_msg) = risk_msg {
560 self.deny_order(order, &risk_msg);
561 return false; }
563 }
564
565 if order.trigger_price().is_some() {
569 let risk_msg = self.check_price(&instrument, order.trigger_price());
570 if let Some(risk_msg) = risk_msg {
571 self.deny_order(order, &risk_msg);
572 return false; }
574 }
575
576 true
577 }
578
579 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
580 let risk_msg = self.check_quantity(&instrument, Some(order.quantity()));
581 if let Some(risk_msg) = risk_msg {
582 self.deny_order(order, &risk_msg);
583 return false; }
585
586 true
587 }
588
589 fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
590 let mut last_px: Option<Price> = None;
594 let mut max_notional: Option<Money> = None;
595
596 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
598 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
599 max_notional = Some(Money::new(
600 max_notional_setting_val
601 .to_f64()
602 .expect("Invalid decimal conversion"),
603 instrument.quote_currency(),
604 ));
605 }
606
607 let account_exists = {
609 let cache = self.cache.borrow();
610 cache.account_for_venue(&instrument.id().venue).cloned()
611 };
612
613 let account = if let Some(account) = account_exists {
614 account
615 } else {
616 log::debug!("Cannot find account for venue {}", instrument.id().venue);
617 return true; };
619 let cash_account = match account {
620 AccountAny::Cash(cash_account) => cash_account,
621 AccountAny::Margin(_) => return true, };
623 let free = cash_account.balance_free(Some(instrument.quote_currency()));
624 if self.config.debug {
625 log::debug!("Free cash: {free:?}");
626 }
627
628 let mut cum_notional_buy: Option<Money> = None;
629 let mut cum_notional_sell: Option<Money> = None;
630 let mut base_currency: Option<Currency> = None;
631 for order in &orders {
632 last_px = match order {
634 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
635 if last_px.is_none() {
636 let cache = self.cache.borrow();
637 if let Some(last_quote) = cache.quote(&instrument.id()) {
638 match order.order_side() {
639 OrderSide::Buy => Some(last_quote.ask_price),
640 OrderSide::Sell => Some(last_quote.bid_price),
641 _ => panic!("Invalid order side"),
642 }
643 } else {
644 let cache = self.cache.borrow();
645 let last_trade = cache.trade(&instrument.id());
646
647 if let Some(last_trade) = last_trade {
648 Some(last_trade.price)
649 } else {
650 log::warn!(
651 "Cannot check MARKET order risk: no prices for {}",
652 instrument.id()
653 );
654 continue;
655 }
656 }
657 } else {
658 last_px
659 }
660 }
661 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
662 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
663 if let Some(trigger_price) = order.trigger_price() {
664 Some(trigger_price)
665 } else {
666 log::warn!(
667 "Cannot check {} order risk: no trigger price was set", order.order_type()
669 );
670 continue;
671 }
672 }
673 _ => order.price(),
674 };
675
676 let last_px = if let Some(px) = last_px {
677 px
678 } else {
679 log::error!("Cannot check order risk: no price available");
680 continue;
681 };
682
683 let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
684 instrument.calculate_base_quantity(order.quantity(), last_px)
685 } else {
686 order.quantity()
687 };
688
689 let notional =
690 instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
691
692 if self.config.debug {
693 log::debug!("Notional: {notional:?}");
694 }
695
696 if let Some(max_notional_value) = max_notional
698 && notional > max_notional_value
699 {
700 self.deny_order(
701 order.clone(),
702 &format!(
703 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
704 ),
705 );
706 return false; }
708
709 if let Some(min_notional) = instrument.min_notional()
711 && notional.currency == min_notional.currency
712 && notional < min_notional
713 {
714 self.deny_order(
715 order.clone(),
716 &format!(
717 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
718 ),
719 );
720 return false; }
722
723 if let Some(max_notional) = instrument.max_notional()
725 && notional.currency == max_notional.currency
726 && notional > max_notional
727 {
728 self.deny_order(
729 order.clone(),
730 &format!(
731 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
732 ),
733 );
734 return false; }
736
737 let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
739 let order_balance_impact = match order.order_side() {
740 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
741 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
742 OrderSide::NoOrderSide => {
743 panic!("invalid `OrderSide`, was {}", order.order_side());
744 }
745 };
746
747 if self.config.debug {
748 log::debug!("Balance impact: {order_balance_impact}");
749 }
750
751 if let Some(free_val) = free
752 && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
753 {
754 self.deny_order(
755 order.clone(),
756 &format!(
757 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
758 ),
759 );
760 return false;
761 }
762
763 if base_currency.is_none() {
764 base_currency = instrument.base_currency();
765 }
766 if order.is_buy() {
767 match cum_notional_buy.as_mut() {
768 Some(cum_notional_buy_val) => {
769 cum_notional_buy_val.raw += -order_balance_impact.raw;
770 }
771 None => {
772 cum_notional_buy = Some(Money::from_raw(
773 -order_balance_impact.raw,
774 order_balance_impact.currency,
775 ));
776 }
777 }
778
779 if self.config.debug {
780 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
781 }
782
783 if let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
784 && cum_notional_buy > free
785 {
786 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
787 return false; }
789 } else if order.is_sell() {
790 if cash_account.base_currency.is_some() {
791 match cum_notional_sell.as_mut() {
792 Some(cum_notional_buy_val) => {
793 cum_notional_buy_val.raw += order_balance_impact.raw;
794 }
795 None => {
796 cum_notional_sell = Some(Money::from_raw(
797 order_balance_impact.raw,
798 order_balance_impact.currency,
799 ));
800 }
801 }
802 if self.config.debug {
803 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
804 }
805
806 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
807 && cum_notional_sell > free
808 {
809 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
810 return false; }
812 }
813 else if let Some(base_currency) = base_currency {
815 let cash_value = Money::from_raw(
816 effective_quantity
817 .raw
818 .try_into()
819 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
820 .unwrap(),
821 base_currency,
822 );
823
824 if self.config.debug {
825 log::debug!("Cash value: {cash_value:?}");
826 log::debug!(
827 "Total: {:?}",
828 cash_account.balance_total(Some(base_currency))
829 );
830 log::debug!(
831 "Locked: {:?}",
832 cash_account.balance_locked(Some(base_currency))
833 );
834 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
835 }
836
837 match cum_notional_sell {
838 Some(mut cum_notional_sell) => {
839 cum_notional_sell.raw += cash_value.raw;
840 }
841 None => cum_notional_sell = Some(cash_value),
842 }
843
844 if self.config.debug {
845 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
846 }
847 if let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
848 && cum_notional_sell.raw > free.raw
849 {
850 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
851 return false; }
853 }
854 }
855 }
856
857 true }
860
861 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
862 let price_val = price?;
863
864 if price_val.precision > instrument.price_precision() {
865 return Some(format!(
866 "price {} invalid (precision {} > {})",
867 price_val,
868 price_val.precision,
869 instrument.price_precision()
870 ));
871 }
872
873 if instrument.instrument_class() != InstrumentClass::Option && price_val.raw <= 0 {
874 return Some(format!("price {price_val} invalid (<= 0)"));
875 }
876
877 None
878 }
879
880 fn check_quantity(
881 &self,
882 instrument: &InstrumentAny,
883 quantity: Option<Quantity>,
884 ) -> Option<String> {
885 let quantity_val = quantity?;
886
887 if quantity_val.precision > instrument.size_precision() {
889 return Some(format!(
890 "quantity {} invalid (precision {} > {})",
891 quantity_val,
892 quantity_val.precision,
893 instrument.size_precision()
894 ));
895 }
896
897 if let Some(max_quantity) = instrument.max_quantity()
899 && quantity_val > max_quantity
900 {
901 return Some(format!(
902 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
903 ));
904 }
905
906 if let Some(min_quantity) = instrument.min_quantity()
908 && quantity_val < min_quantity
909 {
910 return Some(format!(
911 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
912 ));
913 }
914
915 None
916 }
917
918 fn deny_command(&self, command: TradingCommand, reason: &str) {
921 match command {
922 TradingCommand::SubmitOrder(command) => {
923 self.deny_order(command.order, reason);
924 }
925 TradingCommand::SubmitOrderList(command) => {
926 self.deny_order_list(command.order_list, reason);
927 }
928 _ => {
929 panic!("Cannot deny command {command}");
930 }
931 }
932 }
933
934 fn deny_order(&self, order: OrderAny, reason: &str) {
935 log::warn!(
936 "SubmitOrder for {} DENIED: {}",
937 order.client_order_id(),
938 reason
939 );
940
941 if order.status() != OrderStatus::Initialized {
942 return;
943 }
944
945 let mut cache = self.cache.borrow_mut();
946 if !cache.order_exists(&order.client_order_id()) {
947 cache
948 .add_order(order.clone(), None, None, false)
949 .map_err(|e| {
950 log::error!("Cannot add order to cache: {e}");
951 })
952 .unwrap();
953 }
954
955 let denied = OrderEventAny::Denied(OrderDenied::new(
956 order.trader_id(),
957 order.strategy_id(),
958 order.instrument_id(),
959 order.client_order_id(),
960 reason.into(),
961 UUID4::new(),
962 self.clock.borrow().timestamp_ns(),
963 self.clock.borrow().timestamp_ns(),
964 ));
965
966 msgbus::send_any("ExecEngine.process".into(), &denied);
967 }
968
969 fn deny_order_list(&self, order_list: OrderList, reason: &str) {
970 for order in order_list.orders {
971 if !order.is_closed() {
972 self.deny_order(order, reason);
973 }
974 }
975 }
976
977 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
978 let ts_event = self.clock.borrow().timestamp_ns();
979 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
980 order.trader_id(),
981 order.strategy_id(),
982 order.instrument_id(),
983 order.client_order_id(),
984 reason.into(),
985 UUID4::new(),
986 ts_event,
987 ts_event,
988 false,
989 order.venue_order_id(),
990 order.account_id(),
991 ));
992
993 msgbus::send_any("ExecEngine.process".into(), &denied);
994 }
995
996 fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
999 match self.trading_state {
1000 TradingState::Halted => match command {
1001 TradingCommand::SubmitOrder(submit_order) => {
1002 self.deny_order(submit_order.order, "TradingState::HALTED");
1003 }
1004 TradingCommand::SubmitOrderList(submit_order_list) => {
1005 self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1006 }
1007 _ => {}
1008 },
1009 TradingState::Reducing => match command {
1010 TradingCommand::SubmitOrder(submit_order) => {
1011 let order = submit_order.order;
1012 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1013 self.deny_order(
1014 order,
1015 &format!(
1016 "BUY when TradingState::REDUCING and LONG {}",
1017 instrument.id()
1018 ),
1019 );
1020 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1021 self.deny_order(
1022 order,
1023 &format!(
1024 "SELL when TradingState::REDUCING and SHORT {}",
1025 instrument.id()
1026 ),
1027 );
1028 }
1029 }
1030 TradingCommand::SubmitOrderList(submit_order_list) => {
1031 let order_list = submit_order_list.order_list;
1032 for order in &order_list.orders {
1033 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1034 self.deny_order_list(
1035 order_list,
1036 &format!(
1037 "BUY when TradingState::REDUCING and LONG {}",
1038 instrument.id()
1039 ),
1040 );
1041 return;
1042 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1043 self.deny_order_list(
1044 order_list,
1045 &format!(
1046 "SELL when TradingState::REDUCING and SHORT {}",
1047 instrument.id()
1048 ),
1049 );
1050 return;
1051 }
1052 }
1053 }
1054 _ => {}
1055 },
1056 TradingState::Active => match command {
1057 TradingCommand::SubmitOrder(submit_order) => {
1058 self.throttled_submit_order.send(submit_order);
1059 }
1060 TradingCommand::SubmitOrderList(submit_order_list) => {
1061 self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1063 }
1064 _ => {}
1065 },
1066 }
1067 }
1068
1069 fn send_to_execution(&self, command: TradingCommand) {
1070 msgbus::send_any("ExecEngine.execute".into(), &command);
1071 }
1072
1073 fn handle_event(&mut self, event: OrderEventAny) {
1074 if self.config.debug {
1077 log::debug!("{RECV}{EVT} {event:?}");
1078 }
1079 }
1080}