1pub mod config;
19
20#[cfg(test)]
21mod tests;
22
23use std::{cell::RefCell, fmt::Debug, rc::Rc};
24
25use ahash::AHashMap;
26use config::RiskEngineConfig;
27use nautilus_common::{
28 cache::Cache,
29 clock::Clock,
30 logging::{CMD, EVT, RECV},
31 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
32 msgbus,
33 throttler::Throttler,
34};
35use nautilus_core::UUID4;
36use nautilus_execution::trailing::{
37 trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
38};
39use nautilus_model::{
40 accounts::{Account, AccountAny},
41 enums::{
42 InstrumentClass, OrderSide, OrderStatus, PositionSide, TimeInForce, TradingState,
43 TrailingOffsetType, TriggerType,
44 },
45 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
46 identifiers::InstrumentId,
47 instruments::{Instrument, InstrumentAny},
48 orders::{Order, OrderAny, OrderList},
49 types::{Currency, Money, Price, Quantity, quantity::QuantityRaw},
50};
51use nautilus_portfolio::Portfolio;
52use rust_decimal::{Decimal, prelude::ToPrimitive};
53use ustr::Ustr;
54
55type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
56type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
57
58#[allow(dead_code)]
65pub struct RiskEngine {
66 clock: Rc<RefCell<dyn Clock>>,
67 cache: Rc<RefCell<Cache>>,
68 portfolio: Portfolio,
69 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
70 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
71 max_notional_per_order: AHashMap<InstrumentId, Decimal>,
72 trading_state: TradingState,
73 config: RiskEngineConfig,
74}
75
76impl Debug for RiskEngine {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct(stringify!(RiskEngine)).finish()
79 }
80}
81
82impl RiskEngine {
83 pub fn new(
85 config: RiskEngineConfig,
86 portfolio: Portfolio,
87 clock: Rc<RefCell<dyn Clock>>,
88 cache: Rc<RefCell<Cache>>,
89 ) -> Self {
90 let throttled_submit_order =
91 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
92
93 let throttled_modify_order =
94 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
95
96 Self {
97 clock,
98 cache,
99 portfolio,
100 throttled_submit_order,
101 throttled_modify_order,
102 max_notional_per_order: AHashMap::new(),
103 trading_state: TradingState::Active,
104 config,
105 }
106 }
107
108 fn create_submit_order_throttler(
109 config: &RiskEngineConfig,
110 clock: Rc<RefCell<dyn Clock>>,
111 cache: Rc<RefCell<Cache>>,
112 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
113 let success_handler = {
114 Box::new(move |submit_order: SubmitOrder| {
115 msgbus::send_any(
116 "ExecEngine.execute".into(),
117 &TradingCommand::SubmitOrder(submit_order),
118 );
119 }) as Box<dyn Fn(SubmitOrder)>
120 };
121
122 let failure_handler = {
123 let cache = cache;
124 let clock = clock.clone();
125 Box::new(move |submit_order: SubmitOrder| {
126 let reason = "REJECTED BY THROTTLER";
127 log::warn!(
128 "SubmitOrder for {} DENIED: {}",
129 submit_order.client_order_id,
130 reason
131 );
132
133 Self::handle_submit_order_cache(&cache, &submit_order);
134
135 let denied = Self::create_order_denied(&submit_order, reason, &clock);
136
137 msgbus::send_any("ExecEngine.process".into(), &denied);
138 }) as Box<dyn Fn(SubmitOrder)>
139 };
140
141 Throttler::new(
142 config.max_order_submit.limit,
143 config.max_order_submit.interval_ns,
144 clock,
145 "ORDER_SUBMIT_THROTTLER".to_string(),
146 success_handler,
147 Some(failure_handler),
148 Ustr::from(UUID4::new().as_str()),
149 )
150 }
151
152 fn create_modify_order_throttler(
153 config: &RiskEngineConfig,
154 clock: Rc<RefCell<dyn Clock>>,
155 cache: Rc<RefCell<Cache>>,
156 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
157 let success_handler = {
158 Box::new(move |order: ModifyOrder| {
159 msgbus::send_any(
160 "ExecEngine.execute".into(),
161 &TradingCommand::ModifyOrder(order),
162 );
163 }) as Box<dyn Fn(ModifyOrder)>
164 };
165
166 let failure_handler = {
167 let cache = cache;
168 let clock = clock.clone();
169 Box::new(move |order: ModifyOrder| {
170 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
171 log::warn!(
172 "SubmitOrder for {} DENIED: {}",
173 order.client_order_id,
174 reason
175 );
176
177 let order = match Self::get_existing_order(&cache, &order) {
178 Some(order) => order,
179 None => return,
180 };
181
182 let rejected = Self::create_modify_rejected(&order, reason, &clock);
183
184 msgbus::send_any("ExecEngine.process".into(), &rejected);
185 }) as Box<dyn Fn(ModifyOrder)>
186 };
187
188 Throttler::new(
189 config.max_order_modify.limit,
190 config.max_order_modify.interval_ns,
191 clock,
192 "ORDER_MODIFY_THROTTLER".to_string(),
193 success_handler,
194 Some(failure_handler),
195 Ustr::from(UUID4::new().as_str()),
196 )
197 }
198
199 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
200 let mut cache = cache.borrow_mut();
201 if !cache.order_exists(&submit_order.client_order_id) {
202 cache
203 .add_order(submit_order.order.clone(), None, None, false)
204 .map_err(|e| {
205 log::error!("Cannot add order to cache: {e}");
206 })
207 .unwrap();
208 }
209 }
210
211 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
212 let cache = cache.borrow();
213 if let Some(order) = cache.order(&order.client_order_id) {
214 Some(order.clone())
215 } else {
216 log::error!(
217 "Order with command.client_order_id: {} not found",
218 order.client_order_id
219 );
220 None
221 }
222 }
223
224 fn create_order_denied(
225 submit_order: &SubmitOrder,
226 reason: &str,
227 clock: &Rc<RefCell<dyn Clock>>,
228 ) -> OrderEventAny {
229 let timestamp = clock.borrow().timestamp_ns();
230 OrderEventAny::Denied(OrderDenied::new(
231 submit_order.trader_id,
232 submit_order.strategy_id,
233 submit_order.instrument_id,
234 submit_order.client_order_id,
235 reason.into(),
236 UUID4::new(),
237 timestamp,
238 timestamp,
239 ))
240 }
241
242 fn create_modify_rejected(
243 order: &OrderAny,
244 reason: &str,
245 clock: &Rc<RefCell<dyn Clock>>,
246 ) -> OrderEventAny {
247 let timestamp = clock.borrow().timestamp_ns();
248 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
249 order.trader_id(),
250 order.strategy_id(),
251 order.instrument_id(),
252 order.client_order_id(),
253 reason.into(),
254 UUID4::new(),
255 timestamp,
256 timestamp,
257 false,
258 order.venue_order_id(),
259 None,
260 ))
261 }
262
263 pub fn execute(&mut self, command: TradingCommand) {
267 self.handle_command(command);
269 }
270
271 pub fn process(&mut self, event: OrderEventAny) {
273 self.handle_event(event);
275 }
276
277 pub fn set_trading_state(&mut self, state: TradingState) {
279 if state == self.trading_state {
280 log::warn!("No change to trading state: already set to {state:?}");
281 return;
282 }
283
284 self.trading_state = state;
285
286 let _ts_now = self.clock.borrow().timestamp_ns();
287
288 msgbus::publish("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
294 }
295
296 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
298 self.max_notional_per_order.insert(instrument_id, new_value);
299
300 let new_value_str = new_value.to_string();
301 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
302 }
303
304 fn handle_command(&mut self, command: TradingCommand) {
308 if self.config.debug {
309 log::debug!("{CMD}{RECV} {command:?}");
310 }
311
312 match command {
313 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
314 TradingCommand::SubmitOrderList(submit_order_list) => {
315 self.handle_submit_order_list(submit_order_list);
316 }
317 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
318 TradingCommand::QueryAccount(query_account) => {
319 self.send_to_execution(TradingCommand::QueryAccount(query_account));
320 }
321 _ => {
322 log::error!("Cannot handle command: {command}");
323 }
324 }
325 }
326
327 fn handle_submit_order(&mut self, command: SubmitOrder) {
328 if self.config.bypass {
329 self.send_to_execution(TradingCommand::SubmitOrder(command));
330 return;
331 }
332
333 let order = &command.order;
334 if let Some(position_id) = command.position_id
335 && order.is_reduce_only()
336 {
337 let position_exists = {
338 let cache = self.cache.borrow();
339 cache
340 .position(&position_id)
341 .map(|pos| (pos.side, pos.quantity))
342 };
343
344 if let Some((pos_side, pos_quantity)) = position_exists {
345 if !order.would_reduce_only(pos_side, pos_quantity) {
346 self.deny_command(
347 TradingCommand::SubmitOrder(command),
348 &format!("Reduce only order would increase position {position_id}"),
349 );
350 return; }
352 } else {
353 self.deny_command(
354 TradingCommand::SubmitOrder(command),
355 &format!("Position {position_id} not found for reduce-only order"),
356 );
357 return;
358 }
359 }
360
361 let instrument_exists = {
362 let cache = self.cache.borrow();
363 cache.instrument(&command.instrument_id).cloned()
364 };
365
366 let instrument = if let Some(instrument) = instrument_exists {
367 instrument
368 } else {
369 self.deny_command(
370 TradingCommand::SubmitOrder(command.clone()),
371 &format!("Instrument for {} not found", command.instrument_id),
372 );
373 return; };
375
376 if !self.check_order(instrument.clone(), order.clone()) {
380 return; }
382
383 if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
384 return; }
386
387 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
389 }
390
391 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
392 if self.config.bypass {
393 self.send_to_execution(TradingCommand::SubmitOrderList(command));
394 return;
395 }
396
397 let instrument_exists = {
398 let cache = self.cache.borrow();
399 cache.instrument(&command.instrument_id).cloned()
400 };
401
402 let instrument = if let Some(instrument) = instrument_exists {
403 instrument
404 } else {
405 self.deny_command(
406 TradingCommand::SubmitOrderList(command.clone()),
407 &format!("no instrument found for {}", command.instrument_id),
408 );
409 return; };
411
412 for order in command.order_list.orders.clone() {
416 if !self.check_order(instrument.clone(), order) {
417 return; }
419 }
420
421 if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
422 self.deny_order_list(
423 command.order_list.clone(),
424 &format!("OrderList {} DENIED", command.order_list.id),
425 );
426 return; }
428
429 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
430 }
431
432 fn handle_modify_order(&mut self, command: ModifyOrder) {
433 let order_exists = {
437 let cache = self.cache.borrow();
438 cache.order(&command.client_order_id).cloned()
439 };
440
441 let order = if let Some(order) = order_exists {
442 order
443 } else {
444 log::error!(
445 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
446 command.client_order_id
447 );
448 return;
449 };
450
451 if order.is_closed() {
452 self.reject_modify_order(
453 order,
454 &format!(
455 "Order with command.client_order_id: {} already closed",
456 command.client_order_id
457 ),
458 );
459 return;
460 } else if order.status() == OrderStatus::PendingCancel {
461 self.reject_modify_order(
462 order,
463 &format!(
464 "Order with command.client_order_id: {} is already pending cancel",
465 command.client_order_id
466 ),
467 );
468 return;
469 }
470
471 let maybe_instrument = {
472 let cache = self.cache.borrow();
473 cache.instrument(&command.instrument_id).cloned()
474 };
475
476 let instrument = if let Some(instrument) = maybe_instrument {
477 instrument
478 } else {
479 self.reject_modify_order(
480 order,
481 &format!("no instrument found for {:?}", command.instrument_id),
482 );
483 return; };
485
486 let mut risk_msg = self.check_price(&instrument, command.price);
488 if let Some(risk_msg) = risk_msg {
489 self.reject_modify_order(order, &risk_msg);
490 return; }
492
493 risk_msg = self.check_price(&instrument, command.trigger_price);
495 if let Some(risk_msg) = risk_msg {
496 self.reject_modify_order(order, &risk_msg);
497 return; }
499
500 risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
502 if let Some(risk_msg) = risk_msg {
503 self.reject_modify_order(order, &risk_msg);
504 return; }
506
507 match self.trading_state {
509 TradingState::Halted => {
510 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
511 }
512 TradingState::Reducing => {
513 if let Some(quantity) = command.quantity
514 && quantity > order.quantity()
515 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
516 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
517 {
518 self.reject_modify_order(
519 order,
520 &format!(
521 "TradingState is REDUCING and update will increase exposure {}",
522 instrument.id()
523 ),
524 );
525 }
526 }
527 _ => {}
528 }
529
530 self.throttled_modify_order.send(command);
531 }
532
533 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
536 if order.time_in_force() == TimeInForce::Gtd {
540 let expire_time = order.expire_time().unwrap();
542 if expire_time <= self.clock.borrow().timestamp_ns() {
543 self.deny_order(
544 order,
545 &format!("GTD {} already past", expire_time.to_rfc3339()),
546 );
547 return false; }
549 }
550
551 if !self.check_order_price(instrument.clone(), order.clone())
552 || !self.check_order_quantity(instrument, order)
553 {
554 return false; }
556
557 true
558 }
559
560 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
561 if order.price().is_some() {
565 let risk_msg = self.check_price(&instrument, order.price());
566 if let Some(risk_msg) = risk_msg {
567 self.deny_order(order, &risk_msg);
568 return false; }
570 }
571
572 if order.trigger_price().is_some() {
576 let risk_msg = self.check_price(&instrument, order.trigger_price());
577 if let Some(risk_msg) = risk_msg {
578 self.deny_order(order, &risk_msg);
579 return false; }
581 }
582
583 true
584 }
585
586 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
587 let risk_msg = self.check_quantity(
588 &instrument,
589 Some(order.quantity()),
590 order.is_quote_quantity(),
591 );
592 if let Some(risk_msg) = risk_msg {
593 self.deny_order(order, &risk_msg);
594 return false; }
596
597 true
598 }
599
600 fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
601 let mut last_px: Option<Price> = None;
605 let mut max_notional: Option<Money> = None;
606
607 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
609 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
610 max_notional = Some(Money::new(
611 max_notional_setting_val
612 .to_f64()
613 .expect("Invalid decimal conversion"),
614 instrument.quote_currency(),
615 ));
616 }
617
618 let account_exists = {
620 let cache = self.cache.borrow();
621 cache.account_for_venue(&instrument.id().venue).cloned()
622 };
623
624 let account = if let Some(account) = account_exists {
625 account
626 } else {
627 log::debug!("Cannot find account for venue {}", instrument.id().venue);
628 return true; };
630 let cash_account = match account {
631 AccountAny::Cash(cash_account) => cash_account,
632 AccountAny::Margin(_) => return true, };
634 let free = cash_account.balance_free(Some(instrument.quote_currency()));
635 let allow_borrowing = cash_account.allow_borrowing;
636 if self.config.debug {
637 log::debug!("Free cash: {free:?}");
638 }
639
640 let (net_long_qty_raw, pending_sell_qty_raw) = {
643 let cache = self.cache.borrow();
644 let long_qty: QuantityRaw = cache
645 .positions_open(None, Some(&instrument.id()), None, Some(PositionSide::Long))
646 .iter()
647 .map(|pos| pos.quantity.raw)
648 .sum();
649 let pending_sells: QuantityRaw = cache
650 .orders_open(None, Some(&instrument.id()), None, Some(OrderSide::Sell))
651 .iter()
652 .map(|ord| ord.leaves_qty().raw)
653 .sum();
654 (long_qty, pending_sells)
655 };
656
657 let available_long_qty_raw = net_long_qty_raw.saturating_sub(pending_sell_qty_raw);
659
660 if self.config.debug && net_long_qty_raw > 0 {
661 log::debug!(
662 "Net LONG qty (raw): {net_long_qty_raw}, pending sells: {pending_sell_qty_raw}, available: {available_long_qty_raw}"
663 );
664 }
665
666 let mut cum_sell_qty_raw: QuantityRaw = 0;
668
669 let mut cum_notional_buy: Option<Money> = None;
670 let mut cum_notional_sell: Option<Money> = None;
671 let mut base_currency: Option<Currency> = None;
672 for order in &orders {
673 last_px = match order {
675 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
676 if last_px.is_none() {
677 let cache = self.cache.borrow();
678 if let Some(last_quote) = cache.quote(&instrument.id()) {
679 match order.order_side() {
680 OrderSide::Buy => Some(last_quote.ask_price),
681 OrderSide::Sell => Some(last_quote.bid_price),
682 _ => panic!("Invalid order side"),
683 }
684 } else {
685 let cache = self.cache.borrow();
686 let last_trade = cache.trade(&instrument.id());
687
688 if let Some(last_trade) = last_trade {
689 Some(last_trade.price)
690 } else {
691 log::warn!(
692 "Cannot check MARKET order risk: no prices for {}",
693 instrument.id()
694 );
695 continue;
696 }
697 }
698 } else {
699 last_px
700 }
701 }
702 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
703 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
704 if let Some(trigger_price) = order.trigger_price() {
705 Some(trigger_price)
706 } else {
707 let offset_type = order.trailing_offset_type().unwrap();
709 if !matches!(
710 offset_type,
711 TrailingOffsetType::Price
712 | TrailingOffsetType::BasisPoints
713 | TrailingOffsetType::Ticks
714 ) {
715 self.deny_order(
716 order.clone(),
717 &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
718 );
719 return false;
720 }
721
722 let trigger_type = order.trigger_type().unwrap();
723 let cache = self.cache.borrow();
724
725 if trigger_type == TriggerType::BidAsk {
726 if let Some(quote) = cache.quote(&instrument.id()) {
727 match trailing_stop_calculate_with_bid_ask(
728 instrument.price_increment(),
729 order.trailing_offset_type().unwrap(),
730 order.order_side_specified(),
731 order.trailing_offset().unwrap(),
732 quote.bid_price,
733 quote.ask_price,
734 ) {
735 Ok(calculated_trigger) => Some(calculated_trigger),
736 Err(e) => {
737 log::warn!(
738 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
739 order.order_type()
740 );
741 continue;
742 }
743 }
744 } else {
745 log::warn!(
746 "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
747 order.order_type(),
748 instrument.id()
749 );
750 continue;
751 }
752 } else if let Some(last_trade) = cache.trade(&instrument.id()) {
753 match trailing_stop_calculate_with_last(
754 instrument.price_increment(),
755 order.trailing_offset_type().unwrap(),
756 order.order_side_specified(),
757 order.trailing_offset().unwrap(),
758 last_trade.price,
759 ) {
760 Ok(calculated_trigger) => Some(calculated_trigger),
761 Err(e) => {
762 log::warn!(
763 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
764 order.order_type(),
765 e
766 );
767 continue;
768 }
769 }
770 } else if trigger_type == TriggerType::LastOrBidAsk {
771 if let Some(quote) = cache.quote(&instrument.id()) {
773 match trailing_stop_calculate_with_bid_ask(
774 instrument.price_increment(),
775 order.trailing_offset_type().unwrap(),
776 order.order_side_specified(),
777 order.trailing_offset().unwrap(),
778 quote.bid_price,
779 quote.ask_price,
780 ) {
781 Ok(calculated_trigger) => Some(calculated_trigger),
782 Err(e) => {
783 log::warn!(
784 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
785 order.order_type()
786 );
787 continue;
788 }
789 }
790 } else {
791 log::warn!(
792 "Cannot check {} order risk: no trigger price set and no market data available for {}",
793 order.order_type(),
794 instrument.id()
795 );
796 continue;
797 }
798 } else {
799 log::warn!(
800 "Cannot check {} order risk: no trigger price set and no market data available for {}",
801 order.order_type(),
802 instrument.id()
803 );
804 continue;
805 }
806 }
807 }
808 _ => order.price(),
809 };
810
811 let last_px = if let Some(px) = last_px {
812 px
813 } else {
814 log::error!("Cannot check order risk: no price available");
815 continue;
816 };
817
818 let effective_price = if order.is_quote_quantity()
820 && !instrument.is_inverse()
821 && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
822 {
823 let cache = self.cache.borrow();
825 if let Some(quote_tick) = cache.quote(&instrument.id()) {
826 match order.order_side() {
827 OrderSide::Buy => last_px.min(quote_tick.ask_price),
829 OrderSide::Sell => last_px.max(quote_tick.bid_price),
831 _ => last_px,
832 }
833 } else {
834 last_px }
836 } else {
837 last_px
838 };
839
840 let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
841 instrument.calculate_base_quantity(order.quantity(), effective_price)
842 } else {
843 order.quantity()
844 };
845
846 if let Some(max_quantity) = instrument.max_quantity()
848 && effective_quantity > max_quantity
849 {
850 self.deny_order(
851 order.clone(),
852 &format!(
853 "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
854 ),
855 );
856 return false; }
858
859 if let Some(min_quantity) = instrument.min_quantity()
860 && effective_quantity < min_quantity
861 {
862 self.deny_order(
863 order.clone(),
864 &format!(
865 "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
866 ),
867 );
868 return false; }
870
871 let notional =
872 instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
873
874 if self.config.debug {
875 log::debug!("Notional: {notional:?}");
876 }
877
878 if let Some(max_notional_value) = max_notional
880 && notional > max_notional_value
881 {
882 self.deny_order(
883 order.clone(),
884 &format!(
885 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
886 ),
887 );
888 return false; }
890
891 if let Some(min_notional) = instrument.min_notional()
893 && notional.currency == min_notional.currency
894 && notional < min_notional
895 {
896 self.deny_order(
897 order.clone(),
898 &format!(
899 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
900 ),
901 );
902 return false; }
904
905 if let Some(max_notional) = instrument.max_notional()
907 && notional.currency == max_notional.currency
908 && notional > max_notional
909 {
910 self.deny_order(
911 order.clone(),
912 &format!(
913 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
914 ),
915 );
916 return false; }
918
919 let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
921 let order_balance_impact = match order.order_side() {
922 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
923 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
924 OrderSide::NoOrderSide => {
925 panic!("invalid `OrderSide`, was {}", order.order_side());
926 }
927 };
928
929 if self.config.debug {
930 log::debug!("Balance impact: {order_balance_impact}");
931 }
932
933 if !allow_borrowing
935 && let Some(free_val) = free
936 && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
937 {
938 self.deny_order(
939 order.clone(),
940 &format!(
941 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
942 ),
943 );
944 return false;
945 }
946
947 if base_currency.is_none() {
948 base_currency = instrument.base_currency();
949 }
950 if order.is_buy() {
951 match cum_notional_buy.as_mut() {
952 Some(cum_notional_buy_val) => {
953 cum_notional_buy_val.raw += -order_balance_impact.raw;
954 }
955 None => {
956 cum_notional_buy = Some(Money::from_raw(
957 -order_balance_impact.raw,
958 order_balance_impact.currency,
959 ));
960 }
961 }
962
963 if self.config.debug {
964 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
965 }
966
967 if !allow_borrowing
968 && let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
969 && cum_notional_buy > free
970 {
971 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
972 return false; }
974 } else if order.is_sell() {
975 let is_position_reducing_sell = order.is_reduce_only()
976 || (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw;
977 cum_sell_qty_raw += effective_quantity.raw;
978
979 if is_position_reducing_sell {
980 if self.config.debug {
981 log::debug!("Position-reducing SELL skips balance check");
982 }
983 continue;
984 }
985
986 if cash_account.base_currency.is_some() {
987 match cum_notional_sell.as_mut() {
988 Some(cum_notional_buy_val) => {
989 cum_notional_buy_val.raw += order_balance_impact.raw;
990 }
991 None => {
992 cum_notional_sell = Some(Money::from_raw(
993 order_balance_impact.raw,
994 order_balance_impact.currency,
995 ));
996 }
997 }
998 if self.config.debug {
999 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1000 }
1001
1002 if !allow_borrowing
1003 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1004 && cum_notional_sell > free
1005 {
1006 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1007 return false; }
1009 }
1010 else if let Some(base_currency) = base_currency {
1012 let cash_value = Money::from_raw(
1013 effective_quantity
1014 .raw
1015 .try_into()
1016 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
1017 .unwrap(),
1018 base_currency,
1019 );
1020
1021 if self.config.debug {
1022 log::debug!("Cash value: {cash_value:?}");
1023 log::debug!(
1024 "Total: {:?}",
1025 cash_account.balance_total(Some(base_currency))
1026 );
1027 log::debug!(
1028 "Locked: {:?}",
1029 cash_account.balance_locked(Some(base_currency))
1030 );
1031 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1032 }
1033
1034 match cum_notional_sell {
1035 Some(mut value) => {
1036 value.raw += cash_value.raw;
1037 cum_notional_sell = Some(value);
1038 }
1039 None => cum_notional_sell = Some(cash_value),
1040 }
1041
1042 if self.config.debug {
1043 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1044 }
1045 if !allow_borrowing
1046 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1047 && cum_notional_sell.raw > free.raw
1048 {
1049 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1050 return false; }
1052 }
1053 }
1054 }
1055
1056 true }
1059
1060 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1061 let price_val = price?;
1062
1063 if price_val.precision > instrument.price_precision() {
1064 return Some(format!(
1065 "price {} invalid (precision {} > {})",
1066 price_val,
1067 price_val.precision,
1068 instrument.price_precision()
1069 ));
1070 }
1071
1072 if !matches!(
1073 instrument.instrument_class(),
1074 InstrumentClass::Option
1075 | InstrumentClass::FuturesSpread
1076 | InstrumentClass::OptionSpread
1077 ) && price_val.raw <= 0
1078 {
1079 return Some(format!("price {price_val} invalid (<= 0)"));
1080 }
1081
1082 None
1083 }
1084
1085 fn check_quantity(
1086 &self,
1087 instrument: &InstrumentAny,
1088 quantity: Option<Quantity>,
1089 is_quote_quantity: bool,
1090 ) -> Option<String> {
1091 let quantity_val = quantity?;
1092
1093 if quantity_val.precision > instrument.size_precision() {
1095 return Some(format!(
1096 "quantity {} invalid (precision {} > {})",
1097 quantity_val,
1098 quantity_val.precision,
1099 instrument.size_precision()
1100 ));
1101 }
1102
1103 if is_quote_quantity {
1105 return None;
1106 }
1107
1108 if let Some(max_quantity) = instrument.max_quantity()
1110 && quantity_val > max_quantity
1111 {
1112 return Some(format!(
1113 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1114 ));
1115 }
1116
1117 if let Some(min_quantity) = instrument.min_quantity()
1119 && quantity_val < min_quantity
1120 {
1121 return Some(format!(
1122 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1123 ));
1124 }
1125
1126 None
1127 }
1128
1129 fn deny_command(&self, command: TradingCommand, reason: &str) {
1132 match command {
1133 TradingCommand::SubmitOrder(command) => {
1134 self.deny_order(command.order, reason);
1135 }
1136 TradingCommand::SubmitOrderList(command) => {
1137 self.deny_order_list(command.order_list, reason);
1138 }
1139 _ => {
1140 panic!("Cannot deny command {command}");
1141 }
1142 }
1143 }
1144
1145 fn deny_order(&self, order: OrderAny, reason: &str) {
1146 log::warn!(
1147 "SubmitOrder for {} DENIED: {}",
1148 order.client_order_id(),
1149 reason
1150 );
1151
1152 if order.status() != OrderStatus::Initialized {
1153 return;
1154 }
1155
1156 {
1158 let mut cache = self.cache.borrow_mut();
1159 if !cache.order_exists(&order.client_order_id()) {
1160 cache
1161 .add_order(order.clone(), None, None, false)
1162 .map_err(|e| {
1163 log::error!("Cannot add order to cache: {e}");
1164 })
1165 .unwrap();
1166 }
1167 }
1168
1169 let denied = OrderEventAny::Denied(OrderDenied::new(
1170 order.trader_id(),
1171 order.strategy_id(),
1172 order.instrument_id(),
1173 order.client_order_id(),
1174 reason.into(),
1175 UUID4::new(),
1176 self.clock.borrow().timestamp_ns(),
1177 self.clock.borrow().timestamp_ns(),
1178 ));
1179
1180 msgbus::send_any("ExecEngine.process".into(), &denied);
1181 }
1182
1183 fn deny_order_list(&self, order_list: OrderList, reason: &str) {
1184 for order in order_list.orders {
1185 if !order.is_closed() {
1186 self.deny_order(order, reason);
1187 }
1188 }
1189 }
1190
1191 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1192 let ts_event = self.clock.borrow().timestamp_ns();
1193 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1194 order.trader_id(),
1195 order.strategy_id(),
1196 order.instrument_id(),
1197 order.client_order_id(),
1198 reason.into(),
1199 UUID4::new(),
1200 ts_event,
1201 ts_event,
1202 false,
1203 order.venue_order_id(),
1204 order.account_id(),
1205 ));
1206
1207 msgbus::send_any("ExecEngine.process".into(), &denied);
1208 }
1209
1210 fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1213 match self.trading_state {
1214 TradingState::Halted => match command {
1215 TradingCommand::SubmitOrder(submit_order) => {
1216 self.deny_order(submit_order.order, "TradingState::HALTED");
1217 }
1218 TradingCommand::SubmitOrderList(submit_order_list) => {
1219 self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1220 }
1221 _ => {}
1222 },
1223 TradingState::Reducing => match command {
1224 TradingCommand::SubmitOrder(submit_order) => {
1225 let order = submit_order.order;
1226 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1227 self.deny_order(
1228 order,
1229 &format!(
1230 "BUY when TradingState::REDUCING and LONG {}",
1231 instrument.id()
1232 ),
1233 );
1234 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1235 self.deny_order(
1236 order,
1237 &format!(
1238 "SELL when TradingState::REDUCING and SHORT {}",
1239 instrument.id()
1240 ),
1241 );
1242 }
1243 }
1244 TradingCommand::SubmitOrderList(submit_order_list) => {
1245 let order_list = submit_order_list.order_list;
1246 for order in &order_list.orders {
1247 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1248 self.deny_order_list(
1249 order_list,
1250 &format!(
1251 "BUY when TradingState::REDUCING and LONG {}",
1252 instrument.id()
1253 ),
1254 );
1255 return;
1256 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1257 self.deny_order_list(
1258 order_list,
1259 &format!(
1260 "SELL when TradingState::REDUCING and SHORT {}",
1261 instrument.id()
1262 ),
1263 );
1264 return;
1265 }
1266 }
1267 }
1268 _ => {}
1269 },
1270 TradingState::Active => match command {
1271 TradingCommand::SubmitOrder(submit_order) => {
1272 self.throttled_submit_order.send(submit_order);
1273 }
1274 TradingCommand::SubmitOrderList(submit_order_list) => {
1275 self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1277 }
1278 _ => {}
1279 },
1280 }
1281 }
1282
1283 fn send_to_execution(&self, command: TradingCommand) {
1284 msgbus::send_any("ExecEngine.execute".into(), &command);
1285 }
1286
1287 fn handle_event(&mut self, event: OrderEventAny) {
1288 if self.config.debug {
1291 log::debug!("{RECV}{EVT} {event:?}");
1292 }
1293 }
1294}