1pub mod config;
19
20use std::{cell::RefCell, fmt::Debug, rc::Rc};
21
22use ahash::AHashMap;
23use config::RiskEngineConfig;
24use nautilus_common::{
25 cache::Cache,
26 clock::Clock,
27 logging::{CMD, EVT, RECV},
28 messages::execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
29 msgbus,
30 msgbus::{MessagingSwitchboard, TypedIntoHandler},
31 throttler::Throttler,
32};
33use nautilus_core::{UUID4, WeakCell};
34use nautilus_execution::trailing::{
35 trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
36};
37use nautilus_model::{
38 accounts::{Account, AccountAny},
39 enums::{
40 InstrumentClass, OrderSide, OrderStatus, PositionSide, TimeInForce, TradingState,
41 TrailingOffsetType, TriggerType,
42 },
43 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
44 identifiers::InstrumentId,
45 instruments::{Instrument, InstrumentAny},
46 orders::{Order, OrderAny},
47 types::{Currency, Money, Price, Quantity, quantity::QuantityRaw},
48};
49use nautilus_portfolio::Portfolio;
50use rust_decimal::{Decimal, prelude::ToPrimitive};
51use ustr::Ustr;
52
53type SubmitOrderFn = Box<dyn Fn(SubmitOrder)>;
54type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
55
56#[allow(dead_code)]
63pub struct RiskEngine {
64 clock: Rc<RefCell<dyn Clock>>,
65 cache: Rc<RefCell<Cache>>,
66 portfolio: Portfolio,
67 pub throttled_submit_order: Throttler<SubmitOrder, SubmitOrderFn>,
68 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
69 max_notional_per_order: AHashMap<InstrumentId, Decimal>,
70 trading_state: TradingState,
71 config: RiskEngineConfig,
72}
73
74impl Debug for RiskEngine {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct(stringify!(RiskEngine)).finish()
77 }
78}
79
80impl RiskEngine {
81 pub fn new(
83 config: RiskEngineConfig,
84 portfolio: Portfolio,
85 clock: Rc<RefCell<dyn Clock>>,
86 cache: Rc<RefCell<Cache>>,
87 ) -> Self {
88 let throttled_submit_order =
89 Self::create_submit_order_throttler(&config, clock.clone(), cache.clone());
90
91 let throttled_modify_order =
92 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
93
94 Self {
95 clock,
96 cache,
97 portfolio,
98 throttled_submit_order,
99 throttled_modify_order,
100 max_notional_per_order: AHashMap::new(),
101 trading_state: TradingState::Active,
102 config,
103 }
104 }
105
106 pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
108 let weak = WeakCell::from(Rc::downgrade(&engine));
109
110 msgbus::register_trading_command_endpoint(
111 MessagingSwitchboard::risk_engine_execute(),
112 TypedIntoHandler::from(move |cmd: TradingCommand| {
113 if let Some(rc) = weak.upgrade() {
114 rc.borrow_mut().execute(cmd);
115 }
116 }),
117 );
118 }
119
120 fn create_submit_order_throttler(
121 config: &RiskEngineConfig,
122 clock: Rc<RefCell<dyn Clock>>,
123 cache: Rc<RefCell<Cache>>,
124 ) -> Throttler<SubmitOrder, SubmitOrderFn> {
125 let success_handler = {
126 Box::new(move |submit_order: SubmitOrder| {
127 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
128 msgbus::send_trading_command(endpoint, TradingCommand::SubmitOrder(submit_order));
129 }) as Box<dyn Fn(SubmitOrder)>
130 };
131
132 let failure_handler = {
133 let cache = cache;
134 let clock = clock.clone();
135 Box::new(move |submit_order: SubmitOrder| {
136 let reason = "REJECTED BY THROTTLER";
137 log::warn!(
138 "SubmitOrder for {} DENIED: {}",
139 submit_order.client_order_id,
140 reason
141 );
142
143 Self::handle_submit_order_cache(&cache, &submit_order);
144
145 let denied = Self::create_order_denied(&submit_order, reason, &clock);
146
147 let endpoint = MessagingSwitchboard::exec_engine_process();
148 msgbus::send_order_event(endpoint, denied);
149 }) as Box<dyn Fn(SubmitOrder)>
150 };
151
152 Throttler::new(
153 config.max_order_submit.limit,
154 config.max_order_submit.interval_ns,
155 clock,
156 "ORDER_SUBMIT_THROTTLER".to_string(),
157 success_handler,
158 Some(failure_handler),
159 Ustr::from(UUID4::new().as_str()),
160 )
161 }
162
163 fn create_modify_order_throttler(
164 config: &RiskEngineConfig,
165 clock: Rc<RefCell<dyn Clock>>,
166 cache: Rc<RefCell<Cache>>,
167 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
168 let success_handler = {
169 Box::new(move |order: ModifyOrder| {
170 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
171 msgbus::send_trading_command(endpoint, TradingCommand::ModifyOrder(order));
172 }) as Box<dyn Fn(ModifyOrder)>
173 };
174
175 let failure_handler = {
176 let cache = cache;
177 let clock = clock.clone();
178 Box::new(move |order: ModifyOrder| {
179 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
180 log::warn!(
181 "SubmitOrder for {} DENIED: {}",
182 order.client_order_id,
183 reason
184 );
185
186 let order = match Self::get_existing_order(&cache, &order) {
187 Some(order) => order,
188 None => return,
189 };
190
191 let rejected = Self::create_modify_rejected(&order, reason, &clock);
192
193 let endpoint = MessagingSwitchboard::exec_engine_process();
194 msgbus::send_order_event(endpoint, rejected);
195 }) as Box<dyn Fn(ModifyOrder)>
196 };
197
198 Throttler::new(
199 config.max_order_modify.limit,
200 config.max_order_modify.interval_ns,
201 clock,
202 "ORDER_MODIFY_THROTTLER".to_string(),
203 success_handler,
204 Some(failure_handler),
205 Ustr::from(UUID4::new().as_str()),
206 )
207 }
208
209 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
210 let cache = cache.borrow();
211 if !cache.order_exists(&submit_order.client_order_id) {
212 log::error!(
213 "Order not found in cache for client_order_id: {}",
214 submit_order.client_order_id
215 );
216 }
217 }
218
219 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
220 let cache = cache.borrow();
221 if let Some(order) = cache.order(&order.client_order_id) {
222 Some(order.clone())
223 } else {
224 log::error!(
225 "Order with command.client_order_id: {} not found",
226 order.client_order_id
227 );
228 None
229 }
230 }
231
232 fn create_order_denied(
233 submit_order: &SubmitOrder,
234 reason: &str,
235 clock: &Rc<RefCell<dyn Clock>>,
236 ) -> OrderEventAny {
237 let timestamp = clock.borrow().timestamp_ns();
238 OrderEventAny::Denied(OrderDenied::new(
239 submit_order.trader_id,
240 submit_order.strategy_id,
241 submit_order.instrument_id,
242 submit_order.client_order_id,
243 reason.into(),
244 UUID4::new(),
245 timestamp,
246 timestamp,
247 ))
248 }
249
250 fn create_modify_rejected(
251 order: &OrderAny,
252 reason: &str,
253 clock: &Rc<RefCell<dyn Clock>>,
254 ) -> OrderEventAny {
255 let timestamp = clock.borrow().timestamp_ns();
256 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
257 order.trader_id(),
258 order.strategy_id(),
259 order.instrument_id(),
260 order.client_order_id(),
261 reason.into(),
262 UUID4::new(),
263 timestamp,
264 timestamp,
265 false,
266 order.venue_order_id(),
267 None,
268 ))
269 }
270
271 pub fn execute(&mut self, command: TradingCommand) {
273 self.handle_command(command);
275 }
276
277 pub fn process(&mut self, event: OrderEventAny) {
279 self.handle_event(event);
281 }
282
283 pub fn set_trading_state(&mut self, state: TradingState) {
285 if state == self.trading_state {
286 log::warn!("No change to trading state: already set to {state:?}");
287 return;
288 }
289
290 self.trading_state = state;
291
292 let _ts_now = self.clock.borrow().timestamp_ns();
293
294 msgbus::publish_any("events.risk".into(), &"message"); log::info!("Trading state set to {state:?}");
300 }
301
302 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
304 self.max_notional_per_order.insert(instrument_id, new_value);
305
306 let new_value_str = new_value.to_string();
307 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
308 }
309
310 pub fn start(&mut self) {
312 log::info!("Started");
313 }
314
315 pub fn stop(&mut self) {
317 log::info!("Stopped");
318 }
319
320 pub fn reset(&mut self) {
322 self.throttled_submit_order.reset();
323 self.throttled_modify_order.reset();
324 self.max_notional_per_order.clear();
325 self.trading_state = TradingState::Active;
326
327 log::info!("Reset");
328 }
329
330 pub fn dispose(&mut self) {
332 log::info!("Disposed");
333 }
334
335 #[must_use]
337 pub fn clock(&self) -> &Rc<RefCell<dyn Clock>> {
338 &self.clock
339 }
340
341 #[must_use]
343 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
344 &self.cache
345 }
346
347 #[must_use]
349 pub const fn config(&self) -> &RiskEngineConfig {
350 &self.config
351 }
352
353 #[must_use]
355 pub const fn trading_state(&self) -> TradingState {
356 self.trading_state
357 }
358
359 #[must_use]
361 pub const fn max_notional_per_order(&self) -> &AHashMap<InstrumentId, Decimal> {
362 &self.max_notional_per_order
363 }
364
365 fn handle_command(&mut self, command: TradingCommand) {
366 if self.config.debug {
367 log::debug!("{CMD}{RECV} {command:?}");
368 }
369
370 match command {
371 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
372 TradingCommand::SubmitOrderList(submit_order_list) => {
373 self.handle_submit_order_list(submit_order_list);
374 }
375 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
376 TradingCommand::QueryAccount(query_account) => {
377 self.send_to_execution(TradingCommand::QueryAccount(query_account));
378 }
379 _ => {
380 log::error!("Cannot handle command: {command}");
381 }
382 }
383 }
384
385 fn handle_submit_order(&mut self, command: SubmitOrder) {
386 if self.config.bypass {
387 self.send_to_execution(TradingCommand::SubmitOrder(command));
388 return;
389 }
390
391 let order = {
392 let cache = self.cache.borrow();
393 match cache.order(&command.client_order_id) {
394 Some(order) => order.clone(),
395 None => {
396 log::error!(
397 "Cannot handle submit order: order not found in cache for {}",
398 command.client_order_id
399 );
400 return;
401 }
402 }
403 };
404
405 if let Some(position_id) = command.position_id
406 && order.is_reduce_only()
407 {
408 let position_exists = {
409 let cache = self.cache.borrow();
410 cache
411 .position(&position_id)
412 .map(|pos| (pos.side, pos.quantity))
413 };
414
415 if let Some((pos_side, pos_quantity)) = position_exists {
416 if !order.would_reduce_only(pos_side, pos_quantity) {
417 self.deny_command(
418 TradingCommand::SubmitOrder(command),
419 &format!("Reduce only order would increase position {position_id}"),
420 );
421 return; }
423 } else {
424 self.deny_command(
425 TradingCommand::SubmitOrder(command),
426 &format!("Position {position_id} not found for reduce-only order"),
427 );
428 return;
429 }
430 }
431
432 let instrument_exists = {
433 let cache = self.cache.borrow();
434 cache.instrument(&command.instrument_id).cloned()
435 };
436
437 let instrument = if let Some(instrument) = instrument_exists {
438 instrument
439 } else {
440 self.deny_command(
441 TradingCommand::SubmitOrder(command.clone()),
442 &format!("Instrument for {} not found", command.instrument_id),
443 );
444 return; };
446
447 if !self.check_order(instrument.clone(), order.clone()) {
448 return; }
450
451 if !self.check_orders_risk(instrument.clone(), &[order]) {
452 return; }
454
455 self.execution_gateway(instrument, TradingCommand::SubmitOrder(command));
457 }
458
459 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
460 if self.config.bypass {
461 self.send_to_execution(TradingCommand::SubmitOrderList(command));
462 return;
463 }
464
465 let instrument_exists = {
466 let cache = self.cache.borrow();
467 cache.instrument(&command.instrument_id).cloned()
468 };
469
470 let instrument = if let Some(instrument) = instrument_exists {
471 instrument
472 } else {
473 self.deny_command(
474 TradingCommand::SubmitOrderList(command.clone()),
475 &format!("no instrument found for {}", command.instrument_id),
476 );
477 return; };
479
480 let orders: Vec<OrderAny> = self
481 .cache
482 .borrow()
483 .orders_for_ids(&command.order_list.client_order_ids, &command);
484
485 if orders.len() != command.order_list.client_order_ids.len() {
486 self.deny_order_list(
487 &orders,
488 &format!("Incomplete order list: missing orders in cache for {command}"),
489 );
490 return; }
492
493 for order in orders.clone() {
494 if !self.check_order(instrument.clone(), order) {
495 return; }
497 }
498
499 if !self.check_orders_risk(instrument.clone(), &orders) {
500 self.deny_order_list(
501 &orders,
502 &format!("OrderList {} DENIED", command.order_list.id),
503 );
504 return; }
506
507 self.execution_gateway(instrument, TradingCommand::SubmitOrderList(command));
508 }
509
510 fn handle_modify_order(&mut self, command: ModifyOrder) {
511 let order_exists = {
512 let cache = self.cache.borrow();
513 cache.order(&command.client_order_id).cloned()
514 };
515
516 let order = if let Some(order) = order_exists {
517 order
518 } else {
519 log::error!(
520 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
521 command.client_order_id
522 );
523 return;
524 };
525
526 if order.is_closed() {
527 self.reject_modify_order(
528 order,
529 &format!(
530 "Order with command.client_order_id: {} already closed",
531 command.client_order_id
532 ),
533 );
534 return;
535 } else if order.status() == OrderStatus::PendingCancel {
536 self.reject_modify_order(
537 order,
538 &format!(
539 "Order with command.client_order_id: {} is already pending cancel",
540 command.client_order_id
541 ),
542 );
543 return;
544 }
545
546 let maybe_instrument = {
547 let cache = self.cache.borrow();
548 cache.instrument(&command.instrument_id).cloned()
549 };
550
551 let instrument = if let Some(instrument) = maybe_instrument {
552 instrument
553 } else {
554 self.reject_modify_order(
555 order,
556 &format!("no instrument found for {:?}", command.instrument_id),
557 );
558 return; };
560
561 let mut risk_msg = self.check_price(&instrument, command.price);
563 if let Some(risk_msg) = risk_msg {
564 self.reject_modify_order(order, &risk_msg);
565 return; }
567
568 risk_msg = self.check_price(&instrument, command.trigger_price);
570 if let Some(risk_msg) = risk_msg {
571 self.reject_modify_order(order, &risk_msg);
572 return; }
574
575 risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
577 if let Some(risk_msg) = risk_msg {
578 self.reject_modify_order(order, &risk_msg);
579 return; }
581
582 match self.trading_state {
584 TradingState::Halted => {
585 self.reject_modify_order(order, "TradingState is HALTED: Cannot modify order");
586 }
587 TradingState::Reducing => {
588 if let Some(quantity) = command.quantity
589 && quantity > order.quantity()
590 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
591 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
592 {
593 self.reject_modify_order(
594 order,
595 &format!(
596 "TradingState is REDUCING and update will increase exposure {}",
597 instrument.id()
598 ),
599 );
600 }
601 }
602 _ => {}
603 }
604
605 self.throttled_modify_order.send(command);
606 }
607
608 fn check_order(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
609 if order.time_in_force() == TimeInForce::Gtd {
610 let expire_time = order.expire_time().unwrap();
612 if expire_time <= self.clock.borrow().timestamp_ns() {
613 self.deny_order(
614 order,
615 &format!("GTD {} already past", expire_time.to_rfc3339()),
616 );
617 return false; }
619 }
620
621 if !self.check_order_price(instrument.clone(), order.clone())
622 || !self.check_order_quantity(instrument, order)
623 {
624 return false; }
626
627 true
628 }
629
630 fn check_order_price(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
631 if order.price().is_some() {
632 let risk_msg = self.check_price(&instrument, order.price());
633 if let Some(risk_msg) = risk_msg {
634 self.deny_order(order, &risk_msg);
635 return false; }
637 }
638
639 if order.trigger_price().is_some() {
640 let risk_msg = self.check_price(&instrument, order.trigger_price());
641 if let Some(risk_msg) = risk_msg {
642 self.deny_order(order, &risk_msg);
643 return false; }
645 }
646
647 true
648 }
649
650 fn check_order_quantity(&self, instrument: InstrumentAny, order: OrderAny) -> bool {
651 let risk_msg = self.check_quantity(
652 &instrument,
653 Some(order.quantity()),
654 order.is_quote_quantity(),
655 );
656 if let Some(risk_msg) = risk_msg {
657 self.deny_order(order, &risk_msg);
658 return false; }
660
661 true
662 }
663
664 fn check_orders_risk(&self, instrument: InstrumentAny, orders: &[OrderAny]) -> bool {
665 let mut last_px: Option<Price> = None;
666 let mut max_notional: Option<Money> = None;
667
668 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
670 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
671 max_notional = Some(Money::new(
672 max_notional_setting_val
673 .to_f64()
674 .expect("Invalid decimal conversion"),
675 instrument.quote_currency(),
676 ));
677 }
678
679 let account_exists = {
681 let cache = self.cache.borrow();
682 cache.account_for_venue(&instrument.id().venue).cloned()
683 };
684
685 let account = if let Some(account) = account_exists {
686 account
687 } else {
688 log::debug!("Cannot find account for venue {}", instrument.id().venue);
689 return true; };
691 let cash_account = match account {
692 AccountAny::Cash(cash_account) => cash_account,
693 AccountAny::Margin(_) => return true, };
695 let free = cash_account.balance_free(Some(instrument.quote_currency()));
696 let allow_borrowing = cash_account.allow_borrowing;
697 if self.config.debug {
698 log::debug!("Free cash: {free:?}");
699 }
700
701 let (net_long_qty_raw, pending_sell_qty_raw) = {
704 let cache = self.cache.borrow();
705 let long_qty: QuantityRaw = cache
706 .positions_open(
707 None,
708 Some(&instrument.id()),
709 None,
710 None,
711 Some(PositionSide::Long),
712 )
713 .iter()
714 .map(|pos| pos.quantity.raw)
715 .sum();
716 let pending_sells: QuantityRaw = cache
717 .orders_open(
718 None,
719 Some(&instrument.id()),
720 None,
721 None,
722 Some(OrderSide::Sell),
723 )
724 .iter()
725 .map(|ord| ord.leaves_qty().raw)
726 .sum();
727 (long_qty, pending_sells)
728 };
729
730 let available_long_qty_raw = net_long_qty_raw.saturating_sub(pending_sell_qty_raw);
732
733 if self.config.debug && net_long_qty_raw > 0 {
734 log::debug!(
735 "Net LONG qty (raw): {net_long_qty_raw}, pending sells: {pending_sell_qty_raw}, available: {available_long_qty_raw}"
736 );
737 }
738
739 let mut cum_sell_qty_raw: QuantityRaw = 0;
741
742 let mut cum_notional_buy: Option<Money> = None;
743 let mut cum_notional_sell: Option<Money> = None;
744 let mut base_currency: Option<Currency> = None;
745 for order in orders {
746 last_px = match order {
748 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
749 if last_px.is_none() {
750 let cache = self.cache.borrow();
751 if let Some(last_quote) = cache.quote(&instrument.id()) {
752 match order.order_side() {
753 OrderSide::Buy => Some(last_quote.ask_price),
754 OrderSide::Sell => Some(last_quote.bid_price),
755 _ => panic!("Invalid order side"),
756 }
757 } else {
758 let cache = self.cache.borrow();
759 let last_trade = cache.trade(&instrument.id());
760
761 if let Some(last_trade) = last_trade {
762 Some(last_trade.price)
763 } else {
764 log::warn!(
765 "Cannot check MARKET order risk: no prices for {}",
766 instrument.id()
767 );
768 continue;
769 }
770 }
771 } else {
772 last_px
773 }
774 }
775 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
776 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
777 if let Some(trigger_price) = order.trigger_price() {
778 Some(trigger_price)
779 } else {
780 let offset_type = order.trailing_offset_type().unwrap();
782 if !matches!(
783 offset_type,
784 TrailingOffsetType::Price
785 | TrailingOffsetType::BasisPoints
786 | TrailingOffsetType::Ticks
787 ) {
788 self.deny_order(
789 order.clone(),
790 &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
791 );
792 return false;
793 }
794
795 let trigger_type = order.trigger_type().unwrap();
796 let cache = self.cache.borrow();
797
798 if trigger_type == TriggerType::BidAsk {
799 if let Some(quote) = cache.quote(&instrument.id()) {
800 match trailing_stop_calculate_with_bid_ask(
801 instrument.price_increment(),
802 order.trailing_offset_type().unwrap(),
803 order.order_side_specified(),
804 order.trailing_offset().unwrap(),
805 quote.bid_price,
806 quote.ask_price,
807 ) {
808 Ok(calculated_trigger) => Some(calculated_trigger),
809 Err(e) => {
810 log::warn!(
811 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
812 order.order_type()
813 );
814 continue;
815 }
816 }
817 } else {
818 log::warn!(
819 "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
820 order.order_type(),
821 instrument.id()
822 );
823 continue;
824 }
825 } else if let Some(last_trade) = cache.trade(&instrument.id()) {
826 match trailing_stop_calculate_with_last(
827 instrument.price_increment(),
828 order.trailing_offset_type().unwrap(),
829 order.order_side_specified(),
830 order.trailing_offset().unwrap(),
831 last_trade.price,
832 ) {
833 Ok(calculated_trigger) => Some(calculated_trigger),
834 Err(e) => {
835 log::warn!(
836 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {}",
837 order.order_type(),
838 e
839 );
840 continue;
841 }
842 }
843 } else if trigger_type == TriggerType::LastOrBidAsk {
844 if let Some(quote) = cache.quote(&instrument.id()) {
846 match trailing_stop_calculate_with_bid_ask(
847 instrument.price_increment(),
848 order.trailing_offset_type().unwrap(),
849 order.order_side_specified(),
850 order.trailing_offset().unwrap(),
851 quote.bid_price,
852 quote.ask_price,
853 ) {
854 Ok(calculated_trigger) => Some(calculated_trigger),
855 Err(e) => {
856 log::warn!(
857 "Cannot check {} order risk: failed to calculate trigger price from trailing offset: {e}",
858 order.order_type()
859 );
860 continue;
861 }
862 }
863 } else {
864 log::warn!(
865 "Cannot check {} order risk: no trigger price set and no market data available for {}",
866 order.order_type(),
867 instrument.id()
868 );
869 continue;
870 }
871 } else {
872 log::warn!(
873 "Cannot check {} order risk: no trigger price set and no market data available for {}",
874 order.order_type(),
875 instrument.id()
876 );
877 continue;
878 }
879 }
880 }
881 _ => order.price(),
882 };
883
884 let last_px = if let Some(px) = last_px {
885 px
886 } else {
887 log::error!("Cannot check order risk: no price available");
888 continue;
889 };
890
891 let effective_price = if order.is_quote_quantity()
893 && !instrument.is_inverse()
894 && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
895 {
896 let cache = self.cache.borrow();
898 if let Some(quote_tick) = cache.quote(&instrument.id()) {
899 match order.order_side() {
900 OrderSide::Buy => last_px.min(quote_tick.ask_price),
902 OrderSide::Sell => last_px.max(quote_tick.bid_price),
904 _ => last_px,
905 }
906 } else {
907 last_px }
909 } else {
910 last_px
911 };
912
913 let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
914 instrument.calculate_base_quantity(order.quantity(), effective_price)
915 } else {
916 order.quantity()
917 };
918
919 if let Some(max_quantity) = instrument.max_quantity()
921 && effective_quantity > max_quantity
922 {
923 self.deny_order(
924 order.clone(),
925 &format!(
926 "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
927 ),
928 );
929 return false; }
931
932 if let Some(min_quantity) = instrument.min_quantity()
933 && effective_quantity < min_quantity
934 {
935 self.deny_order(
936 order.clone(),
937 &format!(
938 "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
939 ),
940 );
941 return false; }
943
944 let notional =
945 instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
946
947 if self.config.debug {
948 log::debug!("Notional: {notional:?}");
949 }
950
951 if let Some(max_notional_value) = max_notional
953 && notional > max_notional_value
954 {
955 self.deny_order(
956 order.clone(),
957 &format!(
958 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
959 ),
960 );
961 return false; }
963
964 if let Some(min_notional) = instrument.min_notional()
966 && notional.currency == min_notional.currency
967 && notional < min_notional
968 {
969 self.deny_order(
970 order.clone(),
971 &format!(
972 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
973 ),
974 );
975 return false; }
977
978 if let Some(max_notional) = instrument.max_notional()
980 && notional.currency == max_notional.currency
981 && notional > max_notional
982 {
983 self.deny_order(
984 order.clone(),
985 &format!(
986 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
987 ),
988 );
989 return false; }
991
992 let notional = instrument.calculate_notional_value(effective_quantity, last_px, None);
994 let order_balance_impact = match order.order_side() {
995 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
996 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
997 OrderSide::NoOrderSide => {
998 panic!("invalid `OrderSide`, was {}", order.order_side());
999 }
1000 };
1001
1002 if self.config.debug {
1003 log::debug!("Balance impact: {order_balance_impact}");
1004 }
1005
1006 if !allow_borrowing
1008 && let Some(free_val) = free
1009 && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
1010 {
1011 self.deny_order(
1012 order.clone(),
1013 &format!(
1014 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
1015 ),
1016 );
1017 return false;
1018 }
1019
1020 if base_currency.is_none() {
1021 base_currency = instrument.base_currency();
1022 }
1023 if order.is_buy() {
1024 match cum_notional_buy.as_mut() {
1025 Some(cum_notional_buy_val) => {
1026 cum_notional_buy_val.raw += -order_balance_impact.raw;
1027 }
1028 None => {
1029 cum_notional_buy = Some(Money::from_raw(
1030 -order_balance_impact.raw,
1031 order_balance_impact.currency,
1032 ));
1033 }
1034 }
1035
1036 if self.config.debug {
1037 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
1038 }
1039
1040 if !allow_borrowing
1041 && let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
1042 && cum_notional_buy > free
1043 {
1044 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
1045 return false; }
1047 } else if order.is_sell() {
1048 let is_position_reducing_sell = order.is_reduce_only()
1049 || (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw;
1050 cum_sell_qty_raw += effective_quantity.raw;
1051
1052 if is_position_reducing_sell {
1053 if self.config.debug {
1054 log::debug!("Position-reducing SELL skips balance check");
1055 }
1056 continue;
1057 }
1058
1059 if cash_account.base_currency.is_some() {
1060 match cum_notional_sell.as_mut() {
1061 Some(cum_notional_buy_val) => {
1062 cum_notional_buy_val.raw += order_balance_impact.raw;
1063 }
1064 None => {
1065 cum_notional_sell = Some(Money::from_raw(
1066 order_balance_impact.raw,
1067 order_balance_impact.currency,
1068 ));
1069 }
1070 }
1071 if self.config.debug {
1072 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1073 }
1074
1075 if !allow_borrowing
1076 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1077 && cum_notional_sell > free
1078 {
1079 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1080 return false; }
1082 }
1083 else if let Some(base_currency) = base_currency {
1085 let cash_value = Money::from_raw(
1086 effective_quantity
1087 .raw
1088 .try_into()
1089 .map_err(|e| log::error!("Unable to convert Quantity to f64: {e}"))
1090 .unwrap(),
1091 base_currency,
1092 );
1093
1094 if self.config.debug {
1095 log::debug!("Cash value: {cash_value:?}");
1096 log::debug!(
1097 "Total: {:?}",
1098 cash_account.balance_total(Some(base_currency))
1099 );
1100 log::debug!(
1101 "Locked: {:?}",
1102 cash_account.balance_locked(Some(base_currency))
1103 );
1104 log::debug!("Free: {:?}", cash_account.balance_free(Some(base_currency)));
1105 }
1106
1107 match cum_notional_sell {
1108 Some(mut value) => {
1109 value.raw += cash_value.raw;
1110 cum_notional_sell = Some(value);
1111 }
1112 None => cum_notional_sell = Some(cash_value),
1113 }
1114
1115 if self.config.debug {
1116 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1117 }
1118 if !allow_borrowing
1119 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1120 && cum_notional_sell.raw > free.raw
1121 {
1122 self.deny_order(order.clone(), &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1123 return false; }
1125 }
1126 }
1127 }
1128
1129 true }
1132
1133 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1134 let price_val = price?;
1135
1136 if price_val.precision > instrument.price_precision() {
1137 return Some(format!(
1138 "price {} invalid (precision {} > {})",
1139 price_val,
1140 price_val.precision,
1141 instrument.price_precision()
1142 ));
1143 }
1144
1145 if !matches!(
1146 instrument.instrument_class(),
1147 InstrumentClass::Option
1148 | InstrumentClass::FuturesSpread
1149 | InstrumentClass::OptionSpread
1150 ) && price_val.raw <= 0
1151 {
1152 return Some(format!("price {price_val} invalid (<= 0)"));
1153 }
1154
1155 None
1156 }
1157
1158 fn check_quantity(
1159 &self,
1160 instrument: &InstrumentAny,
1161 quantity: Option<Quantity>,
1162 is_quote_quantity: bool,
1163 ) -> Option<String> {
1164 let quantity_val = quantity?;
1165
1166 if quantity_val.precision > instrument.size_precision() {
1168 return Some(format!(
1169 "quantity {} invalid (precision {} > {})",
1170 quantity_val,
1171 quantity_val.precision,
1172 instrument.size_precision()
1173 ));
1174 }
1175
1176 if is_quote_quantity {
1178 return None;
1179 }
1180
1181 if let Some(max_quantity) = instrument.max_quantity()
1183 && quantity_val > max_quantity
1184 {
1185 return Some(format!(
1186 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1187 ));
1188 }
1189
1190 if let Some(min_quantity) = instrument.min_quantity()
1192 && quantity_val < min_quantity
1193 {
1194 return Some(format!(
1195 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1196 ));
1197 }
1198
1199 None
1200 }
1201
1202 fn deny_command(&self, command: TradingCommand, reason: &str) {
1203 match command {
1204 TradingCommand::SubmitOrder(command) => {
1205 let order = {
1206 let cache = self.cache.borrow();
1207 cache.order(&command.client_order_id).cloned()
1208 };
1209 if let Some(order) = order {
1210 self.deny_order(order, reason);
1211 } else {
1212 log::error!(
1213 "Cannot deny order: not found in cache for {}",
1214 command.client_order_id
1215 );
1216 }
1217 }
1218 TradingCommand::SubmitOrderList(command) => {
1219 let orders: Vec<OrderAny> = self
1220 .cache
1221 .borrow()
1222 .orders_for_ids(&command.order_list.client_order_ids, &command);
1223 self.deny_order_list(&orders, reason);
1224 }
1225 _ => {
1226 panic!("Cannot deny command {command}");
1227 }
1228 }
1229 }
1230
1231 fn deny_order(&self, order: OrderAny, reason: &str) {
1232 log::warn!(
1233 "SubmitOrder for {} DENIED: {}",
1234 order.client_order_id(),
1235 reason
1236 );
1237
1238 if order.status() != OrderStatus::Initialized {
1239 return;
1240 }
1241
1242 {
1244 let mut cache = self.cache.borrow_mut();
1245 if !cache.order_exists(&order.client_order_id()) {
1246 cache
1247 .add_order(order.clone(), None, None, false)
1248 .map_err(|e| {
1249 log::error!("Cannot add order to cache: {e}");
1250 })
1251 .unwrap();
1252 }
1253 }
1254
1255 let denied = OrderEventAny::Denied(OrderDenied::new(
1256 order.trader_id(),
1257 order.strategy_id(),
1258 order.instrument_id(),
1259 order.client_order_id(),
1260 reason.into(),
1261 UUID4::new(),
1262 self.clock.borrow().timestamp_ns(),
1263 self.clock.borrow().timestamp_ns(),
1264 ));
1265
1266 let endpoint = MessagingSwitchboard::exec_engine_process();
1267 msgbus::send_order_event(endpoint, denied);
1268 }
1269
1270 fn deny_order_list(&self, orders: &[OrderAny], reason: &str) {
1271 for order in orders {
1272 if !order.is_closed() {
1273 self.deny_order(order.clone(), reason);
1274 }
1275 }
1276 }
1277
1278 fn reject_modify_order(&self, order: OrderAny, reason: &str) {
1279 let ts_event = self.clock.borrow().timestamp_ns();
1280 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1281 order.trader_id(),
1282 order.strategy_id(),
1283 order.instrument_id(),
1284 order.client_order_id(),
1285 reason.into(),
1286 UUID4::new(),
1287 ts_event,
1288 ts_event,
1289 false,
1290 order.venue_order_id(),
1291 order.account_id(),
1292 ));
1293
1294 let endpoint = MessagingSwitchboard::exec_engine_process();
1295 msgbus::send_order_event(endpoint, denied);
1296 }
1297
1298 fn execution_gateway(&mut self, instrument: InstrumentAny, command: TradingCommand) {
1299 match self.trading_state {
1300 TradingState::Halted => match command {
1301 TradingCommand::SubmitOrder(submit_order) => {
1302 let order = {
1303 let cache = self.cache.borrow();
1304 cache.order(&submit_order.client_order_id).cloned()
1305 };
1306 if let Some(order) = order {
1307 self.deny_order(order, "TradingState::HALTED");
1308 }
1309 }
1310 TradingCommand::SubmitOrderList(submit_order_list) => {
1311 let orders: Vec<OrderAny> = self.cache.borrow().orders_for_ids(
1312 &submit_order_list.order_list.client_order_ids,
1313 &submit_order_list,
1314 );
1315 self.deny_order_list(&orders, "TradingState::HALTED");
1316 }
1317 _ => {}
1318 },
1319 TradingState::Reducing => match command {
1320 TradingCommand::SubmitOrder(submit_order) => {
1321 let order = {
1322 let cache = self.cache.borrow();
1323 cache.order(&submit_order.client_order_id).cloned()
1324 };
1325 if let Some(order) = order {
1326 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1327 self.deny_order(
1328 order,
1329 &format!(
1330 "BUY when TradingState::REDUCING and LONG {}",
1331 instrument.id()
1332 ),
1333 );
1334 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1335 self.deny_order(
1336 order,
1337 &format!(
1338 "SELL when TradingState::REDUCING and SHORT {}",
1339 instrument.id()
1340 ),
1341 );
1342 }
1343 }
1344 }
1345 TradingCommand::SubmitOrderList(submit_order_list) => {
1346 let orders: Vec<OrderAny> = self.cache.borrow().orders_for_ids(
1347 &submit_order_list.order_list.client_order_ids,
1348 &submit_order_list,
1349 );
1350 for order in &orders {
1351 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1352 self.deny_order_list(
1353 &orders,
1354 &format!(
1355 "BUY when TradingState::REDUCING and LONG {}",
1356 instrument.id()
1357 ),
1358 );
1359 return;
1360 } else if order.is_sell() && self.portfolio.is_net_short(&instrument.id()) {
1361 self.deny_order_list(
1362 &orders,
1363 &format!(
1364 "SELL when TradingState::REDUCING and SHORT {}",
1365 instrument.id()
1366 ),
1367 );
1368 return;
1369 }
1370 }
1371 }
1372 _ => {}
1373 },
1374 TradingState::Active => match command {
1375 TradingCommand::SubmitOrder(submit_order) => {
1376 self.throttled_submit_order.send(submit_order);
1377 }
1378 TradingCommand::SubmitOrderList(submit_order_list) => {
1379 self.send_to_execution(TradingCommand::SubmitOrderList(submit_order_list));
1381 }
1382 _ => {}
1383 },
1384 }
1385 }
1386
1387 fn send_to_execution(&self, command: TradingCommand) {
1388 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
1389 msgbus::send_trading_command(endpoint, command);
1390 }
1391
1392 fn handle_event(&mut self, event: OrderEventAny) {
1393 if self.config.debug {
1396 log::debug!("{RECV}{EVT} {event:?}");
1397 }
1398 }
1399}