1use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::{StreamExt, pin_mut};
23use nautilus_common::{
24 clients::ExecutionClient,
25 live::{get_runtime, runner::get_exec_event_sender},
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
29 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
30 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
31 SubmitOrderList,
32 },
33};
34use nautilus_core::{
35 MUTEX_POISONED, UnixNanos,
36 datetime::NANOSECONDS_IN_SECOND,
37 time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41 accounts::AccountAny,
42 enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce, TriggerType},
43 events::OrderEventAny,
44 identifiers::{AccountId, ClientId, Venue},
45 orders::{Order, OrderAny},
46 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52 common::consts::{DERIBIT_VENUE, DERIBIT_WS_HEARTBEAT_SECS},
53 config::DeribitExecClientConfig,
54 http::{client::DeribitHttpClient, models::DeribitCurrency, query::GetOrderStateParams},
55 websocket::{
56 auth::DERIBIT_EXECUTION_SESSION_NAME,
57 client::DeribitWebSocketClient,
58 messages::{DeribitOrderParams, NautilusWsMessage},
59 parse::parse_user_order_msg,
60 },
61};
62
63#[derive(Debug)]
65pub struct DeribitExecutionClient {
66 core: ExecutionClientCore,
67 clock: &'static AtomicTime,
68 config: DeribitExecClientConfig,
69 emitter: ExecutionEventEmitter,
70 http_client: DeribitHttpClient,
71 ws_client: DeribitWebSocketClient,
72 ws_stream_handle: Option<JoinHandle<()>>,
73 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
74}
75
76impl DeribitExecutionClient {
77 pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
83 let http_client = if config.has_api_credentials() {
84 DeribitHttpClient::new_with_env(
85 config.api_key.clone(),
86 config.api_secret.clone(),
87 config.use_testnet,
88 config.http_timeout_secs,
89 config.max_retries,
90 config.retry_delay_initial_ms,
91 config.retry_delay_max_ms,
92 None, )?
94 } else {
95 DeribitHttpClient::new(
96 config.base_url_http.clone(),
97 config.use_testnet,
98 config.http_timeout_secs,
99 config.max_retries,
100 config.retry_delay_initial_ms,
101 config.retry_delay_max_ms,
102 None, )?
104 };
105
106 let mut ws_client = DeribitWebSocketClient::new(
107 config.base_url_ws.clone(),
108 config.api_key.clone(),
109 config.api_secret.clone(),
110 Some(DERIBIT_WS_HEARTBEAT_SECS),
111 config.use_testnet,
112 )
113 .context("failed to create WebSocket client for execution")?;
114 ws_client.set_account_id(core.account_id);
116
117 let clock = get_atomic_clock_realtime();
118 let emitter = ExecutionEventEmitter::new(
119 clock,
120 core.trader_id,
121 core.account_id,
122 AccountType::Margin,
123 None,
124 );
125
126 Ok(Self {
127 core,
128 clock,
129 config,
130 emitter,
131 http_client,
132 ws_client,
133 ws_stream_handle: None,
134 pending_tasks: Mutex::new(Vec::new()),
135 })
136 }
137
138 fn spawn_task<F>(&self, description: &'static str, fut: F)
140 where
141 F: Future<Output = anyhow::Result<()>> + Send + 'static,
142 {
143 let runtime = get_runtime();
144 let handle = runtime.spawn(async move {
145 if let Err(e) = fut.await {
146 log::warn!("{description} failed: {e:?}");
147 }
148 });
149
150 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
151 tasks.retain(|handle| !handle.is_finished());
152 tasks.push(handle);
153 }
154
155 fn abort_pending_tasks(&self) {
157 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
158 for handle in tasks.drain(..) {
159 handle.abort();
160 }
161 }
162
163 fn build_order_params(order: &dyn Order) -> DeribitOrderParams {
165 let order_type = match order.order_type() {
166 OrderType::Limit => "limit",
167 OrderType::Market => "market",
168 OrderType::StopLimit => "stop_limit",
169 OrderType::StopMarket => "stop_market",
170 other => {
171 log::warn!(
172 "Unsupported order type {other:?} for Deribit, falling back to limit order"
173 );
174 "limit"
175 }
176 }
177 .to_string();
178
179 let time_in_force = Some(
180 match order.time_in_force() {
181 TimeInForce::Gtc => "good_til_cancelled",
182 TimeInForce::Ioc => "immediate_or_cancel",
183 TimeInForce::Fok => "fill_or_kill",
184 TimeInForce::Gtd => {
185 if order.expire_time().is_some() {
186 log::warn!(
187 "Deribit GTD orders expire at 8:00 UTC only - custom expire_time is ignored. \
188 For custom expiry times, use managed GTD with emulation_trigger"
189 );
190 }
191 "good_til_day"
192 }
193 other => {
194 log::warn!(
195 "Unsupported time_in_force {other:?} for Deribit, falling back to GTC"
196 );
197 "good_til_cancelled"
198 }
199 }
200 .to_string(),
201 );
202
203 let valid_until = None;
206
207 let trigger = order.trigger_type().and_then(|tt| {
209 match tt {
210 TriggerType::LastPrice => Some("last_price".to_string()),
211 TriggerType::MarkPrice => Some("mark_price".to_string()),
212 TriggerType::IndexPrice => Some("index_price".to_string()),
213 TriggerType::Default => Some("last_price".to_string()), _ => None,
215 }
216 });
217
218 DeribitOrderParams {
219 instrument_name: order.instrument_id().symbol.to_string(),
220 amount: order.quantity().as_decimal(),
221 order_type,
222 label: Some(order.client_order_id().to_string()),
223 price: order.price().map(|p| p.as_decimal()),
224 time_in_force,
225 post_only: if order.is_post_only() {
226 Some(true)
227 } else {
228 None
229 },
230 reject_post_only: if order.is_post_only() {
231 Some(true)
232 } else {
233 None
234 },
235 reduce_only: if order.is_reduce_only() {
236 Some(true)
237 } else {
238 None
239 },
240 trigger_price: order.trigger_price().map(|p| p.as_decimal()),
241 trigger,
242 max_show: None,
243 valid_until,
244 }
245 }
246
247 fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) -> anyhow::Result<()> {
251 if order.is_closed() {
252 log::warn!("Cannot submit closed order {}", order.client_order_id());
253 return Ok(());
254 }
255
256 let params = Self::build_order_params(order);
257 let client_order_id = order.client_order_id();
258 let trader_id = order.trader_id();
259 let strategy_id = order.strategy_id();
260 let instrument_id = order.instrument_id();
261 let order_side = order.order_side();
262
263 log::debug!("OrderSubmitted client_order_id={client_order_id}");
264 self.emitter.emit_order_submitted(order);
265
266 let ws_client = self.ws_client.clone();
267 let emitter = self.emitter.clone();
268 let clock = self.clock;
269
270 self.spawn_task(task_name, async move {
271 let result = ws_client
272 .submit_order(
273 order_side,
274 params,
275 client_order_id,
276 trader_id,
277 strategy_id,
278 instrument_id,
279 )
280 .await;
281
282 if let Err(e) = result {
283 let ts_event = clock.get_time_ns();
284 emitter.emit_order_rejected_event(
285 strategy_id,
286 instrument_id,
287 client_order_id,
288 &format!("{task_name}-error: {e}"),
289 ts_event,
290 false,
291 );
292 return Err(e.into());
293 }
294
295 Ok(())
296 });
297
298 Ok(())
299 }
300
301 fn spawn_stream_handler(
303 &mut self,
304 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
305 ) {
306 if self.ws_stream_handle.is_some() {
307 return;
308 }
309
310 let emitter = self.emitter.clone();
311
312 let handle = get_runtime().spawn(async move {
313 pin_mut!(stream);
314 while let Some(message) = stream.next().await {
315 dispatch_ws_message(message, &emitter);
316 }
317 });
318
319 self.ws_stream_handle = Some(handle);
320 log::info!("WebSocket stream handler started");
321 }
322}
323
324#[async_trait(?Send)]
325impl ExecutionClient for DeribitExecutionClient {
326 fn is_connected(&self) -> bool {
327 self.core.is_connected()
328 }
329
330 fn client_id(&self) -> ClientId {
331 self.core.client_id
332 }
333
334 fn account_id(&self) -> AccountId {
335 self.core.account_id
336 }
337
338 fn venue(&self) -> Venue {
339 *DERIBIT_VENUE
340 }
341
342 fn oms_type(&self) -> OmsType {
343 self.core.oms_type
344 }
345
346 fn get_account(&self) -> Option<AccountAny> {
347 self.core.cache().account(&self.core.account_id).cloned()
348 }
349
350 fn generate_account_state(
351 &self,
352 balances: Vec<AccountBalance>,
353 margins: Vec<MarginBalance>,
354 reported: bool,
355 ts_event: UnixNanos,
356 ) -> anyhow::Result<()> {
357 self.emitter
358 .emit_account_state(balances, margins, reported, ts_event);
359 Ok(())
360 }
361
362 fn start(&mut self) -> anyhow::Result<()> {
363 if self.core.is_started() {
364 return Ok(());
365 }
366
367 let sender = get_exec_event_sender();
368 self.emitter.set_sender(sender);
369 self.core.set_started();
370
371 log::info!(
372 "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, use_testnet={}",
373 self.core.client_id,
374 self.core.account_id,
375 self.core.account_type,
376 self.config.product_types,
377 self.config.use_testnet
378 );
379 Ok(())
380 }
381
382 fn stop(&mut self) -> anyhow::Result<()> {
383 if self.core.is_stopped() {
384 return Ok(());
385 }
386
387 self.core.set_stopped();
388 self.core.set_disconnected();
389 self.abort_pending_tasks();
390 log::info!("Stopped: client_id={}", self.core.client_id);
391 Ok(())
392 }
393
394 async fn connect(&mut self) -> anyhow::Result<()> {
395 if self.core.is_connected() {
396 return Ok(());
397 }
398
399 if !self.config.has_api_credentials() {
401 anyhow::bail!("Missing API credentials; set Deribit environment variables");
402 }
403
404 self.ws_client.set_account_id(self.core.account_id);
406
407 if !self.core.instruments_initialized() {
409 for product_type in &self.config.product_types {
410 let instruments = self
411 .http_client
412 .request_instruments(DeribitCurrency::ANY, Some(*product_type))
413 .await
414 .with_context(|| {
415 format!("failed to request instruments for {product_type:?}")
416 })?;
417
418 if instruments.is_empty() {
419 log::warn!("No instruments returned for {product_type:?}");
420 continue;
421 }
422
423 log::info!("Fetched {} {product_type:?} instruments", instruments.len());
424 self.ws_client.cache_instruments(instruments.clone());
425 self.http_client.cache_instruments(instruments);
426 }
427 self.core.set_instruments_initialized();
428 }
429
430 let account_state = self
432 .http_client
433 .request_account_state(self.core.account_id)
434 .await
435 .context("failed to request account state")?;
436
437 self.emitter.send_account_state(account_state);
438
439 self.ws_client
440 .connect()
441 .await
442 .context("failed to connect WebSocket client for execution")?;
443
444 self.ws_client
445 .authenticate_session(DERIBIT_EXECUTION_SESSION_NAME)
446 .await
447 .map_err(|e| anyhow::anyhow!("failed to authenticate WebSocket session: {e}"))?;
448
449 log::info!("WebSocket client authenticated for execution");
450
451 self.ws_client
453 .subscribe_user_orders()
454 .await
455 .map_err(|e| anyhow::anyhow!("failed to subscribe to user orders: {e}"))?;
456 self.ws_client
457 .subscribe_user_trades()
458 .await
459 .map_err(|e| anyhow::anyhow!("failed to subscribe to user trades: {e}"))?;
460 self.ws_client
461 .subscribe_user_portfolio()
462 .await
463 .map_err(|e| anyhow::anyhow!("failed to subscribe to user portfolio: {e}"))?;
464
465 log::info!("Subscribed to user order, trade, and portfolio updates");
466
467 let stream = self.ws_client.stream();
469 self.spawn_stream_handler(stream);
470
471 self.core.set_connected();
472 log::info!("Connected: client_id={}", self.core.client_id);
473 Ok(())
474 }
475
476 async fn disconnect(&mut self) -> anyhow::Result<()> {
477 if self.core.is_disconnected() {
478 return Ok(());
479 }
480
481 self.abort_pending_tasks();
482
483 if let Some(handle) = self.ws_stream_handle.take() {
485 handle.abort();
486 }
487
488 if let Err(e) = self.ws_client.close().await {
490 log::warn!("Error closing WebSocket client: {e}");
491 }
492
493 self.core.set_disconnected();
494 log::info!("Disconnected: client_id={}", self.core.client_id);
495 Ok(())
496 }
497
498 async fn generate_order_status_report(
499 &self,
500 cmd: &GenerateOrderStatusReport,
501 ) -> anyhow::Result<Option<OrderStatusReport>> {
502 if let Some(venue_order_id) = &cmd.venue_order_id {
504 let params = GetOrderStateParams {
505 order_id: venue_order_id.to_string(),
506 };
507 let ts_init = self.clock.get_time_ns();
508
509 match self.http_client.inner.get_order_state(params).await {
510 Ok(response) => {
511 if let Some(order) = response.result {
512 let symbol = ustr::Ustr::from(&order.instrument_name);
513 if let Some(instrument) = self.http_client.get_instrument(&symbol) {
514 let report = parse_user_order_msg(
515 &order,
516 &instrument,
517 self.core.account_id,
518 ts_init,
519 )?;
520 return Ok(Some(report));
521 } else {
522 log::warn!(
523 "Instrument {} not in cache for order {}",
524 order.instrument_name,
525 order.order_id
526 );
527 }
528 }
529 }
530 Err(e) => {
531 log::warn!("Failed to get order state: {e}");
532 }
533 }
534 return Ok(None);
535 }
536
537 if let Some(client_order_id) = &cmd.client_order_id {
539 let reports = self
540 .http_client
541 .request_order_status_reports(
542 self.core.account_id,
543 cmd.instrument_id,
544 None,
545 None,
546 true, )
548 .await?;
549
550 for report in reports {
552 if report.client_order_id == Some(*client_order_id) {
553 return Ok(Some(report));
554 }
555 }
556 }
557
558 Ok(None)
559 }
560
561 async fn generate_order_status_reports(
562 &self,
563 cmd: &GenerateOrderStatusReports,
564 ) -> anyhow::Result<Vec<OrderStatusReport>> {
565 self.http_client
566 .request_order_status_reports(
567 self.core.account_id,
568 cmd.instrument_id,
569 cmd.start,
570 cmd.end,
571 cmd.open_only,
572 )
573 .await
574 }
575
576 async fn generate_fill_reports(
577 &self,
578 cmd: GenerateFillReports,
579 ) -> anyhow::Result<Vec<FillReport>> {
580 let mut reports = self
581 .http_client
582 .request_fill_reports(self.core.account_id, cmd.instrument_id, cmd.start, cmd.end)
583 .await?;
584
585 if let Some(venue_order_id) = &cmd.venue_order_id {
587 reports.retain(|r| r.venue_order_id.to_string() == venue_order_id.to_string());
588 }
589
590 Ok(reports)
591 }
592
593 async fn generate_position_status_reports(
594 &self,
595 cmd: &GeneratePositionStatusReports,
596 ) -> anyhow::Result<Vec<PositionStatusReport>> {
597 self.http_client
598 .request_position_status_reports(self.core.account_id, cmd.instrument_id)
599 .await
600 }
601
602 async fn generate_mass_status(
603 &self,
604 lookback_mins: Option<u64>,
605 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
606 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
607 let ts_now = self.clock.get_time_ns();
608 let start = lookback_mins.map(|mins| {
609 let lookback_ns = mins
610 .saturating_mul(60)
611 .saturating_mul(NANOSECONDS_IN_SECOND);
612 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
613 });
614
615 let order_cmd = GenerateOrderStatusReportsBuilder::default()
616 .ts_init(ts_now)
617 .open_only(false) .start(start)
619 .build()
620 .context("Failed to build GenerateOrderStatusReports")?;
621
622 let fill_cmd = GenerateFillReportsBuilder::default()
623 .ts_init(ts_now)
624 .start(start)
625 .build()
626 .context("Failed to build GenerateFillReports")?;
627
628 let position_cmd = GeneratePositionStatusReportsBuilder::default()
629 .ts_init(ts_now)
630 .start(start)
631 .build()
632 .context("Failed to build GeneratePositionStatusReports")?;
633
634 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
635 self.generate_order_status_reports(&order_cmd),
636 self.generate_fill_reports(fill_cmd),
637 self.generate_position_status_reports(&position_cmd),
638 )?;
639
640 log::info!("Received {} OrderStatusReports", order_reports.len());
641 log::info!("Received {} FillReports", fill_reports.len());
642 log::info!("Received {} PositionReports", position_reports.len());
643
644 let mut mass_status = ExecutionMassStatus::new(
645 self.core.client_id,
646 self.core.account_id,
647 *DERIBIT_VENUE,
648 ts_now,
649 None,
650 );
651
652 mass_status.add_order_reports(order_reports);
653 mass_status.add_fill_reports(fill_reports);
654 mass_status.add_position_reports(position_reports);
655
656 Ok(Some(mass_status))
657 }
658
659 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
660 let http_client = self.http_client.clone();
661 let account_id = self.core.account_id;
662 let emitter = self.emitter.clone();
663
664 self.spawn_task("query_account", async move {
665 let account_state = http_client
666 .request_account_state(account_id)
667 .await
668 .context("failed to query account state (check API credentials are valid)")?;
669
670 emitter.send_account_state(account_state);
671 Ok(())
672 });
673
674 Ok(())
675 }
676
677 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
678 let ws_client = self.ws_client.clone();
679
680 let order_id = cmd
682 .venue_order_id
683 .as_ref()
684 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for query_order"))?
685 .to_string();
686
687 let client_order_id = cmd.client_order_id;
688 let trader_id = cmd.trader_id;
689 let strategy_id = cmd.strategy_id;
690 let instrument_id = cmd.instrument_id;
691
692 log::info!("Querying order state: order_id={order_id}, client_order_id={client_order_id}");
693
694 self.spawn_task("query_order", async move {
697 ws_client
698 .query_order(
699 &order_id,
700 client_order_id,
701 trader_id,
702 strategy_id,
703 instrument_id,
704 )
705 .await
706 .map_err(|e| anyhow::anyhow!("Query order state failed: {e}"))?;
707 Ok(())
708 });
709
710 Ok(())
711 }
712
713 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
714 let order = self
715 .core
716 .cache()
717 .order(&cmd.client_order_id)
718 .cloned()
719 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
720 self.submit_single_order(&order, "submit_order")
721 }
722
723 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
724 if cmd.order_list.client_order_ids.is_empty() {
725 log::debug!("submit_order_list called with empty order list");
726 return Ok(());
727 }
728
729 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
730
731 log::info!(
732 "Submitting order list {} with {} orders for instrument={}",
733 cmd.order_list.id,
734 orders.len(),
735 cmd.instrument_id
736 );
737
738 for order in &orders {
741 self.submit_single_order(order, "submit_order_list_item")?;
742 }
743
744 Ok(())
745 }
746
747 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
748 let ws_client = self.ws_client.clone();
749
750 let order_id = cmd
752 .venue_order_id
753 .as_ref()
754 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify_order"))?
755 .to_string();
756
757 let quantity = if let Some(qty) = cmd.quantity {
759 qty
760 } else {
761 let cache = self.core.cache();
763 let order = cache
764 .order(&cmd.client_order_id)
765 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
766 order.quantity()
767 };
768
769 let price = cmd
770 .price
771 .ok_or_else(|| anyhow::anyhow!("price required for modify_order"))?;
772
773 let client_order_id = cmd.client_order_id;
774 let trader_id = cmd.trader_id;
775 let strategy_id = cmd.strategy_id;
776 let instrument_id = cmd.instrument_id;
777 let venue_order_id = cmd.venue_order_id;
778 let emitter = self.emitter.clone();
779 let clock = self.clock;
780
781 log::info!(
782 "Modifying order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
783 );
784
785 self.spawn_task("modify_order", async move {
787 if let Err(e) = ws_client
788 .modify_order(
789 &order_id,
790 quantity,
791 price,
792 client_order_id,
793 trader_id,
794 strategy_id,
795 instrument_id,
796 )
797 .await
798 {
799 log::error!(
800 "Modify order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
801 );
802
803 let ts_event = clock.get_time_ns();
804 emitter.emit_order_modify_rejected_event(
805 strategy_id,
806 instrument_id,
807 client_order_id,
808 venue_order_id,
809 &format!("modify-order-error: {e}"),
810 ts_event,
811 );
812
813 anyhow::bail!("Modify order failed: {e}");
814 }
815 Ok(())
816 });
817
818 Ok(())
819 }
820
821 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
822 let ws_client = self.ws_client.clone();
823
824 let order_id = cmd
826 .venue_order_id
827 .as_ref()
828 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for cancel_order"))?
829 .to_string();
830
831 let client_order_id = cmd.client_order_id;
832 let trader_id = cmd.trader_id;
833 let strategy_id = cmd.strategy_id;
834 let instrument_id = cmd.instrument_id;
835 let venue_order_id = cmd.venue_order_id;
836 let emitter = self.emitter.clone();
837 let clock = self.clock;
838
839 log::info!("Canceling order: order_id={order_id}, client_order_id={client_order_id}");
840
841 self.spawn_task("cancel_order", async move {
843 if let Err(e) = ws_client
844 .cancel_order(
845 &order_id,
846 client_order_id,
847 trader_id,
848 strategy_id,
849 instrument_id,
850 )
851 .await
852 {
853 log::error!(
854 "Cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
855 );
856
857 let ts_event = clock.get_time_ns();
858 emitter.emit_order_cancel_rejected_event(
859 strategy_id,
860 instrument_id,
861 client_order_id,
862 venue_order_id,
863 &format!("cancel-order-error: {e}"),
864 ts_event,
865 );
866
867 anyhow::bail!("Cancel order failed: {e}");
868 }
869 Ok(())
870 });
871
872 Ok(())
873 }
874
875 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
876 let instrument_id = cmd.instrument_id;
877
878 if cmd.order_side == OrderSide::NoOrderSide {
880 log::info!(
881 "Cancelling all orders: instrument={instrument_id}, order_side=NoOrderSide (bulk)"
882 );
883
884 let ws_client = self.ws_client.clone();
885 self.spawn_task("cancel_all_orders", async move {
886 if let Err(e) = ws_client.cancel_all_orders(instrument_id, None).await {
887 log::error!("Cancel all orders failed for instrument {instrument_id}: {e}");
888 anyhow::bail!("Cancel all orders failed: {e}");
889 }
890 Ok(())
891 });
892
893 return Ok(());
894 }
895
896 log::info!(
899 "Cancelling orders by side: instrument={}, order_side={}",
900 instrument_id,
901 cmd.order_side
902 );
903
904 let orders_to_cancel: Vec<_> = {
905 let cache = self.core.cache();
906 let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
907
908 open_orders
909 .into_iter()
910 .filter(|order| order.order_side() == cmd.order_side)
911 .filter_map(|order| {
912 let venue_order_id = order.venue_order_id()?;
913 Some((
914 venue_order_id.to_string(),
915 order.client_order_id(),
916 order.instrument_id(),
917 Some(venue_order_id),
918 ))
919 })
920 .collect()
921 };
922
923 if orders_to_cancel.is_empty() {
924 log::debug!(
925 "No open {} orders to cancel for {}",
926 cmd.order_side,
927 instrument_id
928 );
929 return Ok(());
930 }
931
932 log::info!(
933 "Cancelling {} {} orders for {}",
934 orders_to_cancel.len(),
935 cmd.order_side,
936 instrument_id
937 );
938
939 for (venue_order_id_str, client_order_id, order_instrument_id, venue_order_id) in
941 orders_to_cancel
942 {
943 let ws_client = self.ws_client.clone();
944 let trader_id = cmd.trader_id;
945 let strategy_id = cmd.strategy_id;
946 let emitter = self.emitter.clone();
947 let clock = self.clock;
948
949 self.spawn_task("cancel_order_by_side", async move {
950 if let Err(e) = ws_client
951 .cancel_order(
952 &venue_order_id_str,
953 client_order_id,
954 trader_id,
955 strategy_id,
956 order_instrument_id,
957 )
958 .await
959 {
960 log::error!(
961 "Cancel order failed: order_id={venue_order_id_str}, client_order_id={client_order_id}, error={e}"
962 );
963
964 let ts_event = clock.get_time_ns();
965 emitter.emit_order_cancel_rejected_event(
966 strategy_id,
967 order_instrument_id,
968 client_order_id,
969 venue_order_id,
970 &format!("cancel-order-error: {e}"),
971 ts_event,
972 );
973 }
974 Ok(())
975 });
976 }
977
978 Ok(())
979 }
980
981 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
982 if cmd.cancels.is_empty() {
983 log::debug!("batch_cancel_orders called with empty cancels list");
984 return Ok(());
985 }
986
987 log::info!(
988 "Batch cancelling {} orders for instrument={}",
989 cmd.cancels.len(),
990 cmd.instrument_id
991 );
992
993 for cancel in &cmd.cancels {
996 let order_id = match &cancel.venue_order_id {
997 Some(id) => id.to_string(),
998 None => {
999 log::warn!(
1000 "Cannot cancel order {} - no venue_order_id",
1001 cancel.client_order_id
1002 );
1003
1004 let ts_event = self.clock.get_time_ns();
1006 self.emitter.emit_order_cancel_rejected_event(
1007 cancel.strategy_id,
1008 cancel.instrument_id,
1009 cancel.client_order_id,
1010 None,
1011 "venue_order_id required for cancel",
1012 ts_event,
1013 );
1014 continue;
1015 }
1016 };
1017
1018 let ws_client = self.ws_client.clone();
1019 let emitter = self.emitter.clone();
1020 let clock = self.clock;
1021 let client_order_id = cancel.client_order_id;
1022 let trader_id = cancel.trader_id;
1023 let strategy_id = cancel.strategy_id;
1024 let instrument_id = cancel.instrument_id;
1025
1026 self.spawn_task("batch_cancel_order", async move {
1027 if let Err(e) = ws_client
1028 .cancel_order(
1029 &order_id,
1030 client_order_id,
1031 trader_id,
1032 strategy_id,
1033 instrument_id,
1034 )
1035 .await
1036 {
1037 log::error!(
1038 "Batch cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
1039 );
1040
1041 let ts_event = clock.get_time_ns();
1042 emitter.emit_order_cancel_rejected_event(
1043 strategy_id,
1044 instrument_id,
1045 client_order_id,
1046 None,
1047 &format!("batch-cancel-error: {e}"),
1048 ts_event,
1049 );
1050
1051 anyhow::bail!("Batch cancel order failed: {e}");
1052 }
1053 Ok(())
1054 });
1055 }
1056
1057 Ok(())
1058 }
1059}
1060
1061fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1063 match message {
1064 NautilusWsMessage::AccountState(state) => {
1065 emitter.send_account_state(state);
1066 }
1067 NautilusWsMessage::OrderStatusReports(reports) => {
1068 log::debug!("Processing {} order status report(s)", reports.len());
1069 for report in reports {
1070 emitter.send_order_status_report(report);
1071 }
1072 }
1073 NautilusWsMessage::FillReports(reports) => {
1074 log::debug!("Processing {} fill report(s)", reports.len());
1075 for report in reports {
1076 emitter.send_fill_report(report);
1077 }
1078 }
1079 NautilusWsMessage::OrderRejected(event) => {
1080 emitter.send_order_event(OrderEventAny::Rejected(event));
1081 }
1082 NautilusWsMessage::OrderAccepted(event) => {
1083 emitter.send_order_event(OrderEventAny::Accepted(event));
1084 }
1085 NautilusWsMessage::OrderCanceled(event) => {
1086 emitter.send_order_event(OrderEventAny::Canceled(event));
1087 }
1088 NautilusWsMessage::OrderExpired(event) => {
1089 emitter.send_order_event(OrderEventAny::Expired(event));
1090 }
1091 NautilusWsMessage::OrderUpdated(event) => {
1092 emitter.send_order_event(OrderEventAny::Updated(event));
1093 }
1094 NautilusWsMessage::OrderCancelRejected(event) => {
1095 emitter.send_order_event(OrderEventAny::CancelRejected(event));
1096 }
1097 NautilusWsMessage::OrderModifyRejected(event) => {
1098 emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1099 }
1100 NautilusWsMessage::Error(e) => {
1101 log::warn!("WebSocket error: {e}");
1102 }
1103 NautilusWsMessage::Reconnected => {
1104 log::info!("WebSocket reconnected");
1105 }
1106 NautilusWsMessage::Authenticated(auth) => {
1107 log::debug!("WebSocket authenticated: scope={}", auth.scope);
1108 }
1109 NautilusWsMessage::Data(_)
1110 | NautilusWsMessage::Deltas(_)
1111 | NautilusWsMessage::Instrument(_)
1112 | NautilusWsMessage::FundingRates(_)
1113 | NautilusWsMessage::Raw(_) => {
1114 log::trace!("Ignoring data message in execution client");
1116 }
1117 }
1118}