1pub mod config;
19
20use std::{cell::RefCell, fmt::Debug, rc::Rc};
21
22use ahash::AHashMap;
23use config::RiskEngineConfig;
24use nautilus_common::{
25 cache::Cache,
26 clock::Clock,
27 logging::{CMD, EVT, RECV},
28 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
29 msgbus,
30 throttler::Throttler,
31};
32use nautilus_core::UUID4;
33use nautilus_execution::trailing::{
34 trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
35};
36use nautilus_model::{
37 accounts::{Account, AccountAny},
38 enums::{
39 InstrumentClass, OrderSide, OrderStatus, PositionSide, TimeInForce, TradingState,
40 TrailingOffsetType, TriggerType,
41 },
42 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
43 identifiers::InstrumentId,
44 instruments::{Instrument, InstrumentAny},
45 orders::{Order, OrderAny, OrderList},
46 types::{Currency, Money, Price, Quantity, quantity::QuantityRaw},
47};
48use nautilus_portfolio::Portfolio;
49use rust_decimal::{Decimal, prelude::ToPrimitive};
50use ustr::Ustr;
51
52type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
53type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
54
55#[allow(dead_code)]
62pub struct RiskEngine {
63 clock: Rc<RefCell<dyn Clock>>,
64 cache: Rc<RefCell<Cache>>,
65 portfolio: Portfolio,
66 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
67 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
68 max_notional_per_order: AHashMap<InstrumentId, Decimal>,
69 trading_state: TradingState,
70 config: RiskEngineConfig,
71}
72
73impl Debug for RiskEngine {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct(stringify!(RiskEngine)).finish()
76 }
77}
78
79impl RiskEngine {
80 pub fn new(
82 config: RiskEngineConfig,
83 portfolio: Portfolio,
84 clock: Rc<RefCell<dyn Clock>>,
85 cache: Rc<RefCell<Cache>>,
86 ) -> Self {
87 let throttled_submit_order =
88 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
89
90 let throttled_modify_order =
91 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
92
93 Self {
94 clock,
95 cache,
96 portfolio,
97 throttled_submit_order,
98 throttled_modify_order,
99 max_notional_per_order: AHashMap::new(),
100 trading_state: TradingState::Active,
101 config,
102 }
103 }
104
105 fn create_submit_order_throttler(
106 config: &RiskEngineConfig,
107 clock: Rc<RefCell<dyn Clock>>,
108 cache: Rc<RefCell<Cache>>,
109 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
110 let success_handler = {
111 Box::new(move |submit_order: SubmitOrder| {
112 msgbus::send_any(
113 "ExecEngine.execute".into(),
114 &TradingCommand::SubmitOrder(submit_order),
115 );
116 }) as Box<dyn Fn(SubmitOrder)>
117 };
118
119 let failure_handler = {
120 let cache = cache;
121 let clock = clock.clone();
122 Box::new(move |submit_order: SubmitOrder| {
123 let reason = "REJECTED BY THROTTLER";
124 log::warn!(
125 "SubmitOrder for {} DENIED: {}",
126 submit_order.client_order_id(),
127 reason
128 );
129
130 Self::handle_submit_order_cache(&cache, &submit_order);
131
132 let denied = Self::create_order_denied(&submit_order, reason, &clock);
133
134 msgbus::send_any("ExecEngine.process".into(), &denied);
135 }) as Box<dyn Fn(SubmitOrder)>
136 };
137
138 Throttler::new(
139 config.max_order_submit.limit,
140 config.max_order_submit.interval_ns,
141 clock,
142 "ORDER_SUBMIT_THROTTLER".to_string(),
143 success_handler,
144 Some(failure_handler),
145 Ustr::from(UUID4::new().as_str()),
146 )
147 }
148
149 fn create_modify_order_throttler(
150 config: &RiskEngineConfig,
151 clock: Rc<RefCell<dyn Clock>>,
152 cache: Rc<RefCell<Cache>>,
153 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
154 let success_handler = {
155 Box::new(move |order: ModifyOrder| {
156 msgbus::send_any(
157 "ExecEngine.execute".into(),
158 &TradingCommand::ModifyOrder(order),
159 );
160 }) as Box<dyn Fn(ModifyOrder)>
161 };
162
163 let failure_handler = {
164 let cache = cache;
165 let clock = clock.clone();
166 Box::new(move |order: ModifyOrder| {
167 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
168 log::warn!(
169 "SubmitOrder for {} DENIED: {}",
170 order.client_order_id,
171 reason
172 );
173
174 let order = match Self::get_existing_order(&cache, &order) {
175 Some(order) => order,
176 None => return,
177 };
178
179 let rejected = Self::create_modify_rejected(&order, reason, &clock);
180
181 msgbus::send_any("ExecEngine.process".into(), &rejected);
182 }) as Box<dyn Fn(ModifyOrder)>
183 };
184
185 Throttler::new(
186 config.max_order_modify.limit,
187 config.max_order_modify.interval_ns,
188 clock,
189 "ORDER_MODIFY_THROTTLER".to_string(),
190 success_handler,
191 Some(failure_handler),
192 Ustr::from(UUID4::new().as_str()),
193 )
194 }
195
196 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
197 let mut cache = cache.borrow_mut();
198 if !cache.order_exists(&submit_order.client_order_id()) {
199 cache
200 .add_order(submit_order.order.clone(), None, None, false)
201 .map_err(|e| {
202 log::error!("Cannot add order to cache: {e}");
203 })
204 .unwrap();
205 }
206 }
207
208 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
209 let cache = cache.borrow();
210 if let Some(order) = cache.order(&order.client_order_id) {
211 Some(order.clone())
212 } else {
213 log::error!(
214 "Order with command.client_order_id: {} not found",
215 order.client_order_id
216 );
217 None
218 }
219 }
220
221 fn create_order_denied(
222 submit_order: &SubmitOrder,
223 reason: &str,
224 clock: &Rc<RefCell<dyn Clock>>,
225 ) -> OrderEventAny {
226 let timestamp = clock.borrow().timestamp_ns();
227 OrderEventAny::Denied(OrderDenied::new(
228 submit_order.trader_id,
229 submit_order.strategy_id,
230 submit_order.instrument_id,
231 submit_order.client_order_id(),
232 reason.into(),
233 UUID4::new(),
234 timestamp,
235 timestamp,
236 ))
237 }
238
239 fn create_modify_rejected(
240 order: &OrderAny,
241 reason: &str,
242 clock: &Rc<RefCell<dyn Clock>>,
243 ) -> OrderEventAny {
244 let timestamp = clock.borrow().timestamp_ns();
245 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
246 order.trader_id(),
247 order.strategy_id(),
248 order.instrument_id(),
249 order.client_order_id(),
250 reason.into(),
251 UUID4::new(),
252 timestamp,
253 timestamp,
254 false,
255 order.venue_order_id(),
256 None,
257 ))
258 }
259
260 pub fn execute(&mut self, command: TradingCommand) {
262 self.handle_command(command);
264 }
265
266 pub fn process(&mut self, event: OrderEventAny) {
268 self.handle_event(event);
270 }
271
272 pub fn set_trading_state(&mut self, state: TradingState) {
274 if state == self.trading_state {
275 log::warn!("No change to trading state: already set to {state:?}");
276 return;
277 }
278
279 self.trading_state = state;
280
281 let _ts_now = self.clock.borrow().timestamp_ns();
282
283 msgbus::publish("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
289 }
290
291 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
293 self.max_notional_per_order.insert(instrument_id, new_value);
294
295 let new_value_str = new_value.to_string();
296 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
297 }
298
299 pub fn start(&mut self) {
301 log::info!("Started");
302 }
303
304 pub fn stop(&mut self) {
306 log::info!("Stopped");
307 }
308
309 pub fn reset(&mut self) {
311 self.throttled_submit_order.reset();
312 self.throttled_modify_order.reset();
313 self.max_notional_per_order.clear();
314 self.trading_state = TradingState::Active;
315
316 log::info!("Reset");
317 }
318
319 pub fn dispose(&mut self) {
321 log::info!("Disposed");
322 }
323
324 #[must_use]
326 pub fn clock(&self) -> &Rc<RefCell<dyn Clock>> {
327 &self.clock
328 }
329
330 #[must_use]
332 pub const fn config(&self) -> &RiskEngineConfig {
333 &self.config
334 }
335
336 #[must_use]
338 pub const fn trading_state(&self) -> TradingState {
339 self.trading_state
340 }
341
342 #[must_use]
344 pub const fn max_notional_per_order(&self) -> &AHashMap<InstrumentId, Decimal> {
345 &self.max_notional_per_order
346 }
347
348 fn handle_command(&mut self, command: TradingCommand) {
349 if self.config.debug {
350 log::debug!("{CMD}{RECV} {command:?}");
351 }
352
353 match command {
354 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
355 TradingCommand::SubmitOrderList(submit_order_list) => {
356 self.handle_submit_order_list(submit_order_list);
357 }
358 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
359 TradingCommand::QueryAccount(query_account) => {
360 self.send_to_execution(TradingCommand::QueryAccount(query_account));
361 }
362 _ => {
363 log::error!("Cannot handle command: {command}");
364 }
365 }
366 }
367
368 fn handle_submit_order(&mut self, command: SubmitOrder) {
369 if self.config.bypass {
370 self.send_to_execution(TradingCommand::SubmitOrder(command));
371 return;
372 }
373
374 let order = &command.order;
375 if let Some(position_id) = command.position_id
376 && order.is_reduce_only()
377 {
378 let position_exists = {
379 let cache = self.cache.borrow();
380 cache
381 .position(&position_id)
382 .map(|pos| (pos.side, pos.quantity))
383 };
384
385 if let Some((pos_side, pos_quantity)) = position_exists {
386 if !order.would_reduce_only(pos_side, pos_quantity) {
387 self.deny_command(
388 TradingCommand::SubmitOrder(command),
389 &format!("Reduce only order would increase position {position_id}"),
390 );
391 return; }
393 } else {
394 self.deny_command(
395 TradingCommand::SubmitOrder(command),
396 &format!("Position {position_id} not found for reduce-only order"),
397 );
398 return;
399 }
400 }
401
402 let instrument_exists = {
403 let cache = self.cache.borrow();
404 cache.instrument(&command.instrument_id).cloned()
405 };
406
407 let instrument = if let Some(instrument) = instrument_exists {
408 instrument
409 } else {
410 self.deny_command(
411 TradingCommand::SubmitOrder(command.clone()),
412 &format!("Instrument for {} not found", command.instrument_id),
413 );
414 return; };
416
417 if !self.check_order(instrument.clone(), order.clone()) {
421 return; }
423
424 if !self.check_orders_risk(instrument.clone(), Vec::from([order.clone()])) {
425 return; }
427
428 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
430 }
431
432 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
433 if self.config.bypass {
434 self.send_to_execution(TradingCommand::SubmitOrderList(command));
435 return;
436 }
437
438 let instrument_exists = {
439 let cache = self.cache.borrow();
440 cache.instrument(&command.instrument_id).cloned()
441 };
442
443 let instrument = if let Some(instrument) = instrument_exists {
444 instrument
445 } else {
446 self.deny_command(
447 TradingCommand::SubmitOrderList(command.clone()),
448 &format!("no instrument found for {}", command.instrument_id),
449 );
450 return; };
452
453 for order in command.order_list.orders.clone() {
457 if !self.check_order(instrument.clone(), order) {
458 return; }
460 }
461
462 if !self.check_orders_risk(instrument.clone(), command.order_list.clone().orders) {
463 self.deny_order_list(
464 command.order_list.clone(),
465 &format!("OrderList {} DENIED", command.order_list.id),
466 );
467 return; }
469
470 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
471 }
472
473 fn handle_modify_order(&mut self, command: ModifyOrder) {
474 let order_exists = {
478 let cache = self.cache.borrow();
479 cache.order(&command.client_order_id).cloned()
480 };
481
482 let order = if let Some(order) = order_exists {
483 order
484 } else {
485 log::error!(
486 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
487 command.client_order_id
488 );
489 return;
490 };
491
492 if order.is_closed() {
493 self.reject_modify_order(
494 order,
495 &format!(
496 "Order with command.client_order_id: {} already closed",
497 command.client_order_id
498 ),
499 );
500 return;
501 } else if order.status() == OrderStatus::PendingCancel {
502 self.reject_modify_order(
503 order,
504 &format!(
505 "Order with command.client_order_id: {} is already pending cancel",
506 command.client_order_id
507 ),
508 );
509 return;
510 }
511
512 let maybe_instrument = {
513 let cache = self.cache.borrow();
514 cache.instrument(&command.instrument_id).cloned()
515 };
516
517 let instrument = if let Some(instrument) = maybe_instrument {
518 instrument
519 } else {
520 self.reject_modify_order(
521 order,
522 &format!("no instrument found for {:?}", command.instrument_id),
523 );
524 return; };
526
527 let mut risk_msg = self.check_price(&instrument, command.price);
529 if let Some(risk_msg) = risk_msg {
530 self.reject_modify_order(order, &risk_msg);
531 return; }
533
534 risk_msg = self.check_price(&instrument, command.trigger_price);
536 if let Some(risk_msg) = risk_msg {
537 self.reject_modify_order(order, &risk_msg);
538 return; }
540
541 risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
543 if let Some(risk_msg) = risk_msg {
544 self.reject_modify_order(order, &risk_msg);
545 return; }
547
548 match self.trading_state {
550 TradingState::Halted => {
551 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
552 }
553 TradingState::Reducing => {
554 if let Some(quantity) = command.quantity
555 && quantity > order.quantity()
556 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
557 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
558 {
559 self.reject_modify_order(
560 order,
561 &format!(
562 "TradingState is REDUCING and update will increase exposure {}",
563 instrument.id()
564 ),
565 );
566 }
567 }
568 _ => {}
569 }
570
571 self.throttled_modify_order.send(command);
572 }
573
574 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
575 if order.time_in_force() == TimeInForce::Gtd {
579 let expire_time = order.expire_time().unwrap();
581 if expire_time <= self.clock.borrow().timestamp_ns() {
582 self.deny_order(
583 order,
584 &format!("GTD {} already past", expire_time.to_rfc3339()),
585 );
586 return false; }
588 }
589
590 if !self.check_order_price(instrument.clone(), order.clone())
591 || !self.check_order_quantity(instrument, order)
592 {
593 return false; }
595
596 true
597 }
598
599 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
600 if order.price().is_some() {
604 let risk_msg = self.check_price(&instrument, order.price());
605 if let Some(risk_msg) = risk_msg {
606 self.deny_order(order, &risk_msg);
607 return false; }
609 }
610
611 if order.trigger_price().is_some() {
615 let risk_msg = self.check_price(&instrument, order.trigger_price());
616 if let Some(risk_msg) = risk_msg {
617 self.deny_order(order, &risk_msg);
618 return false; }
620 }
621
622 true
623 }
624
625 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
626 let risk_msg = self.check_quantity(
627 &instrument,
628 Some(order.quantity()),
629 order.is_quote_quantity(),
630 );
631 if let Some(risk_msg) = risk_msg {
632 self.deny_order(order, &risk_msg);
633 return false; }
635
636 true
637 }
638
639 fn check_orders_risk(&self, instrument: InstrumentAny, orders: Vec<OrderAny>) -> bool {
640 let mut last_px: Option<Price> = None;
644 let mut max_notional: Option<Money> = None;
645
646 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
648 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
649 max_notional = Some(Money::new(
650 max_notional_setting_val
651 .to_f64()
652 .expect("Invalid decimal conversion"),
653 instrument.quote_currency(),
654 ));
655 }
656
657 let account_exists = {
659 let cache = self.cache.borrow();
660 cache.account_for_venue(&instrument.id().venue).cloned()
661 };
662
663 let account = if let Some(account) = account_exists {
664 account
665 } else {
666 log::debug!("Cannot find account for venue {}", instrument.id().venue);
667 return true; };
669 let cash_account = match account {
670 AccountAny::Cash(cash_account) => cash_account,
671 AccountAny::Margin(_) => return true, };
673 let free = cash_account.balance_free(Some(instrument.quote_currency()));
674 let allow_borrowing = cash_account.allow_borrowing;
675 if self.config.debug {
676 log::debug!("Free cash: {free:?}");
677 }
678
679 let (net_long_qty_raw, pending_sell_qty_raw) = {
682 let cache = self.cache.borrow();
683 let long_qty: QuantityRaw = cache
684 .positions_open(None, Some(&instrument.id()), None, Some(PositionSide::Long))
685 .iter()
686 .map(|pos| pos.quantity.raw)
687 .sum();
688 let pending_sells: QuantityRaw = cache
689 .orders_open(None, Some(&instrument.id()), None, Some(OrderSide::Sell))
690 .iter()
691 .map(|ord| ord.leaves_qty().raw)
692 .sum();
693 (long_qty, pending_sells)
694 };
695
696 let available_long_qty_raw = net_long_qty_raw.saturating_sub(pending_sell_qty_raw);
698
699 if self.config.debug && net_long_qty_raw > 0 {
700 log::debug!(
701 "Net LONG qty (raw): {net_long_qty_raw}, pending sells: {pending_sell_qty_raw}, available: {available_long_qty_raw}"
702 );
703 }
704
705 let mut cum_sell_qty_raw: QuantityRaw = 0;
707
708 let mut cum_notional_buy: Option<Money> = None;
709 let mut cum_notional_sell: Option<Money> = None;
710 let mut base_currency: Option<Currency> = None;
711 for order in &orders {
712 last_px = match order {
714 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
715 if last_px.is_none() {
716 let cache = self.cache.borrow();
717 if let Some(last_quote) = cache.quote(&instrument.id()) {
718 match order.order_side() {
719 OrderSide::Buy => Some(last_quote.ask_price),
720 OrderSide::Sell => Some(last_quote.bid_price),
721 _ => panic!("Invalid order side"),
722 }
723 } else {
724 let cache = self.cache.borrow();
725 let last_trade = cache.trade(&instrument.id());
726
727 if let Some(last_trade) = last_trade {
728 Some(last_trade.price)
729 } else {
730 log::warn!(
731 "Cannot check MARKET order risk: no prices for {}",
732 instrument.id()
733 );
734 continue;
735 }
736 }
737 } else {
738 last_px
739 }
740 }
741 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
742 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
743 if let Some(trigger_price) = order.trigger_price() {
744 Some(trigger_price)
745 } else {
746 let offset_type = order.trailing_offset_type().unwrap();
748 if !matches!(
749 offset_type,
750 TrailingOffsetType::Price
751 | TrailingOffsetType::BasisPoints
752 | TrailingOffsetType::Ticks
753 ) {
754 self.deny_order(
755 order.clone(),
756 &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
757 );
758 return false;
759 }
760
761 let trigger_type = order.trigger_type().unwrap();
762 let cache = self.cache.borrow();
763
764 if trigger_type == TriggerType::BidAsk {
765 if let Some(quote) = cache.quote(&instrument.id()) {
766 match trailing_stop_calculate_with_bid_ask(
767 instrument.price_increment(),
768 order.trailing_offset_type().unwrap(),
769 order.order_side_specified(),
770 order.trailing_offset().unwrap(),
771 quote.bid_price,
772 quote.ask_price,
773 ) {
774 Ok(calculated_trigger) => Some(calculated_trigger),
775 Err(e) => {
776 log::warn!(
777 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
778 order.order_type()
779 );
780 continue;
781 }
782 }
783 } else {
784 log::warn!(
785 "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
786 order.order_type(),
787 instrument.id()
788 );
789 continue;
790 }
791 } else if let Some(last_trade) = cache.trade(&instrument.id()) {
792 match trailing_stop_calculate_with_last(
793 instrument.price_increment(),
794 order.trailing_offset_type().unwrap(),
795 order.order_side_specified(),
796 order.trailing_offset().unwrap(),
797 last_trade.price,
798 ) {
799 Ok(calculated_trigger) => Some(calculated_trigger),
800 Err(e) => {
801 log::warn!(
802 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
803 order.order_type(),
804 e
805 );
806 continue;
807 }
808 }
809 } else if trigger_type == TriggerType::LastOrBidAsk {
810 if let Some(quote) = cache.quote(&instrument.id()) {
812 match trailing_stop_calculate_with_bid_ask(
813 instrument.price_increment(),
814 order.trailing_offset_type().unwrap(),
815 order.order_side_specified(),
816 order.trailing_offset().unwrap(),
817 quote.bid_price,
818 quote.ask_price,
819 ) {
820 Ok(calculated_trigger) => Some(calculated_trigger),
821 Err(e) => {
822 log::warn!(
823 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
824 order.order_type()
825 );
826 continue;
827 }
828 }
829 } else {
830 log::warn!(
831 "Cannot check {} order risk: no trigger price set and no market data available for {}",
832 order.order_type(),
833 instrument.id()
834 );
835 continue;
836 }
837 } else {
838 log::warn!(
839 "Cannot check {} order risk: no trigger price set and no market data available for {}",
840 order.order_type(),
841 instrument.id()
842 );
843 continue;
844 }
845 }
846 }
847 _ => order.price(),
848 };
849
850 let last_px = if let Some(px) = last_px {
851 px
852 } else {
853 log::error!("Cannot check order risk: no price available");
854 continue;
855 };
856
857 let effective_price = if order.is_quote_quantity()
859 && !instrument.is_inverse()
860 && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
861 {
862 let cache = self.cache.borrow();
864 if let Some(quote_tick) = cache.quote(&instrument.id()) {
865 match order.order_side() {
866 OrderSide::Buy => last_px.min(quote_tick.ask_price),
868 OrderSide::Sell => last_px.max(quote_tick.bid_price),
870 _ => last_px,
871 }
872 } else {
873 last_px }
875 } else {
876 last_px
877 };
878
879 let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
880 instrument.calculate_base_quantity(order.quantity(), effective_price)
881 } else {
882 order.quantity()
883 };
884
885 if let Some(max_quantity) = instrument.max_quantity()
887 && effective_quantity > max_quantity
888 {
889 self.deny_order(
890 order.clone(),
891 &format!(
892 "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
893 ),
894 );
895 return false; }
897
898 if let Some(min_quantity) = instrument.min_quantity()
899 && effective_quantity < min_quantity
900 {
901 self.deny_order(
902 order.clone(),
903 &format!(
904 "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
905 ),
906 );
907 return false; }
909
910 let notional =
911 instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
912
913 if self.config.debug {
914 log::debug!("Notional: {notional:?}");
915 }
916
917 if let Some(max_notional_value) = max_notional
919 && notional > max_notional_value
920 {
921 self.deny_order(
922 order.clone(),
923 &format!(
924 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
925 ),
926 );
927 return false; }
929
930 if let Some(min_notional) = instrument.min_notional()
932 && notional.currency == min_notional.currency
933 && notional < min_notional
934 {
935 self.deny_order(
936 order.clone(),
937 &format!(
938 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
939 ),
940 );
941 return false; }
943
944 if let Some(max_notional) = instrument.max_notional()
946 && notional.currency == max_notional.currency
947 && notional > max_notional
948 {
949 self.deny_order(
950 order.clone(),
951 &format!(
952 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
953 ),
954 );
955 return false; }
957
958 let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
960 let order_balance_impact = match order.order_side() {
961 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
962 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
963 OrderSide::NoOrderSide => {
964 panic!("invalid `OrderSide`, was {}", order.order_side());
965 }
966 };
967
968 if self.config.debug {
969 log::debug!("Balance impact: {order_balance_impact}");
970 }
971
972 if !allow_borrowing
974 && let Some(free_val) = free
975 && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
976 {
977 self.deny_order(
978 order.clone(),
979 &format!(
980 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
981 ),
982 );
983 return false;
984 }
985
986 if base_currency.is_none() {
987 base_currency = instrument.base_currency();
988 }
989 if order.is_buy() {
990 match cum_notional_buy.as_mut() {
991 Some(cum_notional_buy_val) => {
992 cum_notional_buy_val.raw += -order_balance_impact.raw;
993 }
994 None => {
995 cum_notional_buy = Some(Money::from_raw(
996 -order_balance_impact.raw,
997 order_balance_impact.currency,
998 ));
999 }
1000 }
1001
1002 if self.config.debug {
1003 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
1004 }
1005
1006 if !allow_borrowing
1007 && let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
1008 && cum_notional_buy > free
1009 {
1010 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
1011 return false; }
1013 } else if order.is_sell() {
1014 let is_position_reducing_sell = order.is_reduce_only()
1015 || (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw;
1016 cum_sell_qty_raw += effective_quantity.raw;
1017
1018 if is_position_reducing_sell {
1019 if self.config.debug {
1020 log::debug!("Position-reducing SELL skips balance check");
1021 }
1022 continue;
1023 }
1024
1025 if cash_account.base_currency.is_some() {
1026 match cum_notional_sell.as_mut() {
1027 Some(cum_notional_buy_val) => {
1028 cum_notional_buy_val.raw += order_balance_impact.raw;
1029 }
1030 None => {
1031 cum_notional_sell = Some(Money::from_raw(
1032 order_balance_impact.raw,
1033 order_balance_impact.currency,
1034 ));
1035 }
1036 }
1037 if self.config.debug {
1038 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1039 }
1040
1041 if !allow_borrowing
1042 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1043 && cum_notional_sell > free
1044 {
1045 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1046 return false; }
1048 }
1049 else if let Some(base_currency) = base_currency {
1051 let cash_value = Money::from_raw(
1052 effective_quantity
1053 .raw
1054 .try_into()
1055 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
1056 .unwrap(),
1057 base_currency,
1058 );
1059
1060 if self.config.debug {
1061 log::debug!("Cash value: {cash_value:?}");
1062 log::debug!(
1063 "Total: {:?}",
1064 cash_account.balance_total(Some(base_currency))
1065 );
1066 log::debug!(
1067 "Locked: {:?}",
1068 cash_account.balance_locked(Some(base_currency))
1069 );
1070 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1071 }
1072
1073 match cum_notional_sell {
1074 Some(mut value) => {
1075 value.raw += cash_value.raw;
1076 cum_notional_sell = Some(value);
1077 }
1078 None => cum_notional_sell = Some(cash_value),
1079 }
1080
1081 if self.config.debug {
1082 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1083 }
1084 if !allow_borrowing
1085 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1086 && cum_notional_sell.raw > free.raw
1087 {
1088 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1089 return false; }
1091 }
1092 }
1093 }
1094
1095 true }
1098
1099 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1100 let price_val = price?;
1101
1102 if price_val.precision > instrument.price_precision() {
1103 return Some(format!(
1104 "price {} invalid (precision {} > {})",
1105 price_val,
1106 price_val.precision,
1107 instrument.price_precision()
1108 ));
1109 }
1110
1111 if !matches!(
1112 instrument.instrument_class(),
1113 InstrumentClass::Option
1114 | InstrumentClass::FuturesSpread
1115 | InstrumentClass::OptionSpread
1116 ) && price_val.raw <= 0
1117 {
1118 return Some(format!("price {price_val} invalid (<= 0)"));
1119 }
1120
1121 None
1122 }
1123
1124 fn check_quantity(
1125 &self,
1126 instrument: &InstrumentAny,
1127 quantity: Option<Quantity>,
1128 is_quote_quantity: bool,
1129 ) -> Option<String> {
1130 let quantity_val = quantity?;
1131
1132 if quantity_val.precision > instrument.size_precision() {
1134 return Some(format!(
1135 "quantity {} invalid (precision {} > {})",
1136 quantity_val,
1137 quantity_val.precision,
1138 instrument.size_precision()
1139 ));
1140 }
1141
1142 if is_quote_quantity {
1144 return None;
1145 }
1146
1147 if let Some(max_quantity) = instrument.max_quantity()
1149 && quantity_val > max_quantity
1150 {
1151 return Some(format!(
1152 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1153 ));
1154 }
1155
1156 if let Some(min_quantity) = instrument.min_quantity()
1158 && quantity_val < min_quantity
1159 {
1160 return Some(format!(
1161 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1162 ));
1163 }
1164
1165 None
1166 }
1167
1168 fn deny_command(&self, command: TradingCommand, reason: &str) {
1169 match command {
1170 TradingCommand::SubmitOrder(command) => {
1171 self.deny_order(command.order, reason);
1172 }
1173 TradingCommand::SubmitOrderList(command) => {
1174 self.deny_order_list(command.order_list, reason);
1175 }
1176 _ => {
1177 panic!("Cannot deny command {command}");
1178 }
1179 }
1180 }
1181
1182 fn deny_order(&self, order: OrderAny, reason: &str) {
1183 log::warn!(
1184 "SubmitOrder for {} DENIED: {}",
1185 order.client_order_id(),
1186 reason
1187 );
1188
1189 if order.status() != OrderStatus::Initialized {
1190 return;
1191 }
1192
1193 {
1195 let mut cache = self.cache.borrow_mut();
1196 if !cache.order_exists(&order.client_order_id()) {
1197 cache
1198 .add_order(order.clone(), None, None, false)
1199 .map_err(|e| {
1200 log::error!("Cannot add order to cache: {e}");
1201 })
1202 .unwrap();
1203 }
1204 }
1205
1206 let denied = OrderEventAny::Denied(OrderDenied::new(
1207 order.trader_id(),
1208 order.strategy_id(),
1209 order.instrument_id(),
1210 order.client_order_id(),
1211 reason.into(),
1212 UUID4::new(),
1213 self.clock.borrow().timestamp_ns(),
1214 self.clock.borrow().timestamp_ns(),
1215 ));
1216
1217 msgbus::send_any("ExecEngine.process".into(), &denied);
1218 }
1219
1220 fn deny_order_list(&self, order_list: OrderList, reason: &str) {
1221 for order in order_list.orders {
1222 if !order.is_closed() {
1223 self.deny_order(order, reason);
1224 }
1225 }
1226 }
1227
1228 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1229 let ts_event = self.clock.borrow().timestamp_ns();
1230 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1231 order.trader_id(),
1232 order.strategy_id(),
1233 order.instrument_id(),
1234 order.client_order_id(),
1235 reason.into(),
1236 UUID4::new(),
1237 ts_event,
1238 ts_event,
1239 false,
1240 order.venue_order_id(),
1241 order.account_id(),
1242 ));
1243
1244 msgbus::send_any("ExecEngine.process".into(), &denied);
1245 }
1246
1247 fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1248 match self.trading_state {
1249 TradingState::Halted => match command {
1250 TradingCommand::SubmitOrder(submit_order) => {
1251 self.deny_order(submit_order.order, "TradingState::HALTED");
1252 }
1253 TradingCommand::SubmitOrderList(submit_order_list) => {
1254 self.deny_order_list(submit_order_list.order_list, "TradingState::HALTED");
1255 }
1256 _ => {}
1257 },
1258 TradingState::Reducing => match command {
1259 TradingCommand::SubmitOrder(submit_order) => {
1260 let order = submit_order.order;
1261 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1262 self.deny_order(
1263 order,
1264 &format!(
1265 "BUY when TradingState::REDUCING and LONG {}",
1266 instrument.id()
1267 ),
1268 );
1269 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1270 self.deny_order(
1271 order,
1272 &format!(
1273 "SELL when TradingState::REDUCING and SHORT {}",
1274 instrument.id()
1275 ),
1276 );
1277 }
1278 }
1279 TradingCommand::SubmitOrderList(submit_order_list) => {
1280 let order_list = submit_order_list.order_list;
1281 for order in &order_list.orders {
1282 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1283 self.deny_order_list(
1284 order_list,
1285 &format!(
1286 "BUY when TradingState::REDUCING and LONG {}",
1287 instrument.id()
1288 ),
1289 );
1290 return;
1291 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1292 self.deny_order_list(
1293 order_list,
1294 &format!(
1295 "SELL when TradingState::REDUCING and SHORT {}",
1296 instrument.id()
1297 ),
1298 );
1299 return;
1300 }
1301 }
1302 }
1303 _ => {}
1304 },
1305 TradingState::Active => match command {
1306 TradingCommand::SubmitOrder(submit_order) => {
1307 self.throttled_submit_order.send(submit_order);
1308 }
1309 TradingCommand::SubmitOrderList(submit_order_list) => {
1310 self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1312 }
1313 _ => {}
1314 },
1315 }
1316 }
1317
1318 fn send_to_execution(&self, command: TradingCommand) {
1319 msgbus::send_any("ExecEngine.execute".into(), &command);
1320 }
1321
1322 fn handle_event(&mut self, event: OrderEventAny) {
1323 if self.config.debug {
1326 log::debug!("{RECV}{EVT} {event:?}");
1327 }
1328 }
1329}