Skip to main content

nautilus_deribit/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Deribit adapter.
17
18use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::{StreamExt, pin_mut};
23use nautilus_common::{
24    clients::ExecutionClient,
25    live::{get_runtime, runner::get_exec_event_sender},
26    messages::execution::{
27        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
29        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
30        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
31        SubmitOrderList,
32    },
33};
34use nautilus_core::{
35    MUTEX_POISONED, UnixNanos,
36    datetime::NANOSECONDS_IN_SECOND,
37    time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41    accounts::AccountAny,
42    enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce, TriggerType},
43    events::OrderEventAny,
44    identifiers::{AccountId, ClientId, Venue},
45    orders::{Order, OrderAny},
46    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47    types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52    common::consts::{DERIBIT_VENUE, DERIBIT_WS_HEARTBEAT_SECS},
53    config::DeribitExecClientConfig,
54    http::{client::DeribitHttpClient, models::DeribitCurrency, query::GetOrderStateParams},
55    websocket::{
56        auth::DERIBIT_EXECUTION_SESSION_NAME,
57        client::DeribitWebSocketClient,
58        messages::{DeribitOrderParams, NautilusWsMessage},
59        parse::parse_user_order_msg,
60    },
61};
62
63/// Deribit live execution client.
64#[derive(Debug)]
65pub struct DeribitExecutionClient {
66    core: ExecutionClientCore,
67    clock: &'static AtomicTime,
68    config: DeribitExecClientConfig,
69    emitter: ExecutionEventEmitter,
70    http_client: DeribitHttpClient,
71    ws_client: DeribitWebSocketClient,
72    ws_stream_handle: Option<JoinHandle<()>>,
73    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
74}
75
76impl DeribitExecutionClient {
77    /// Creates a new [`DeribitExecutionClient`].
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if the client fails to initialize.
82    pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
83        let http_client = if config.has_api_credentials() {
84            DeribitHttpClient::new_with_env(
85                config.api_key.clone(),
86                config.api_secret.clone(),
87                config.use_testnet,
88                config.http_timeout_secs,
89                config.max_retries,
90                config.retry_delay_initial_ms,
91                config.retry_delay_max_ms,
92                None, // proxy_url
93            )?
94        } else {
95            DeribitHttpClient::new(
96                config.base_url_http.clone(),
97                config.use_testnet,
98                config.http_timeout_secs,
99                config.max_retries,
100                config.retry_delay_initial_ms,
101                config.retry_delay_max_ms,
102                None, // proxy_url
103            )?
104        };
105
106        let mut ws_client = DeribitWebSocketClient::new(
107            config.base_url_ws.clone(),
108            config.api_key.clone(),
109            config.api_secret.clone(),
110            Some(DERIBIT_WS_HEARTBEAT_SECS),
111            config.use_testnet,
112        )
113        .context("failed to create WebSocket client for execution")?;
114        // Set account ID for order/fill reports
115        ws_client.set_account_id(core.account_id);
116
117        let clock = get_atomic_clock_realtime();
118        let emitter = ExecutionEventEmitter::new(
119            clock,
120            core.trader_id,
121            core.account_id,
122            AccountType::Margin,
123            None,
124        );
125
126        Ok(Self {
127            core,
128            clock,
129            config,
130            emitter,
131            http_client,
132            ws_client,
133            ws_stream_handle: None,
134            pending_tasks: Mutex::new(Vec::new()),
135        })
136    }
137
138    /// Spawns an async task for execution operations.
139    fn spawn_task<F>(&self, description: &'static str, fut: F)
140    where
141        F: Future<Output = anyhow::Result<()>> + Send + 'static,
142    {
143        let runtime = get_runtime();
144        let handle = runtime.spawn(async move {
145            if let Err(e) = fut.await {
146                log::warn!("{description} failed: {e:?}");
147            }
148        });
149
150        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
151        tasks.retain(|handle| !handle.is_finished());
152        tasks.push(handle);
153    }
154
155    /// Aborts all pending async tasks.
156    fn abort_pending_tasks(&self) {
157        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
158        for handle in tasks.drain(..) {
159            handle.abort();
160        }
161    }
162
163    /// Builds Deribit order parameters from a Nautilus order.
164    fn build_order_params(order: &dyn Order) -> DeribitOrderParams {
165        let order_type = match order.order_type() {
166            OrderType::Limit => "limit",
167            OrderType::Market => "market",
168            OrderType::StopLimit => "stop_limit",
169            OrderType::StopMarket => "stop_market",
170            other => {
171                log::warn!(
172                    "Unsupported order type {other:?} for Deribit, falling back to limit order"
173                );
174                "limit"
175            }
176        }
177        .to_string();
178
179        let time_in_force = Some(
180            match order.time_in_force() {
181                TimeInForce::Gtc => "good_til_cancelled",
182                TimeInForce::Ioc => "immediate_or_cancel",
183                TimeInForce::Fok => "fill_or_kill",
184                TimeInForce::Gtd => {
185                    if order.expire_time().is_some() {
186                        log::warn!(
187                            "Deribit GTD orders expire at 8:00 UTC only - custom expire_time is ignored. \
188                            For custom expiry times, use managed GTD with emulation_trigger"
189                        );
190                    }
191                    "good_til_day"
192                }
193                other => {
194                    log::warn!(
195                        "Unsupported time_in_force {other:?} for Deribit, falling back to GTC"
196                    );
197                    "good_til_cancelled"
198                }
199            }
200            .to_string(),
201        );
202
203        // Deribit's `valid_until` is a REQUEST timeout, not order expiry.
204        // Deribit's `good_til_day` expires at end of trading session (8 UTC).
205        let valid_until = None;
206
207        // Map trigger type for stop orders
208        let trigger = order.trigger_type().and_then(|tt| {
209            match tt {
210                TriggerType::LastPrice => Some("last_price".to_string()),
211                TriggerType::MarkPrice => Some("mark_price".to_string()),
212                TriggerType::IndexPrice => Some("index_price".to_string()),
213                TriggerType::Default => Some("last_price".to_string()), // Deribit default
214                _ => None,
215            }
216        });
217
218        DeribitOrderParams {
219            instrument_name: order.instrument_id().symbol.to_string(),
220            amount: order.quantity().as_decimal(),
221            order_type,
222            label: Some(order.client_order_id().to_string()),
223            price: order.price().map(|p| p.as_decimal()),
224            time_in_force,
225            post_only: if order.is_post_only() {
226                Some(true)
227            } else {
228                None
229            },
230            reject_post_only: if order.is_post_only() {
231                Some(true)
232            } else {
233                None
234            },
235            reduce_only: if order.is_reduce_only() {
236                Some(true)
237            } else {
238                None
239            },
240            trigger_price: order.trigger_price().map(|p| p.as_decimal()),
241            trigger,
242            max_show: None,
243            valid_until,
244        }
245    }
246
247    /// Submits a single order to Deribit.
248    ///
249    /// This is the core submission logic shared by `submit_order` and `submit_order_list`.
250    fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) -> anyhow::Result<()> {
251        if order.is_closed() {
252            log::warn!("Cannot submit closed order {}", order.client_order_id());
253            return Ok(());
254        }
255
256        let params = Self::build_order_params(order);
257        let client_order_id = order.client_order_id();
258        let trader_id = order.trader_id();
259        let strategy_id = order.strategy_id();
260        let instrument_id = order.instrument_id();
261        let order_side = order.order_side();
262
263        log::debug!("OrderSubmitted client_order_id={client_order_id}");
264        self.emitter.emit_order_submitted(order);
265
266        let ws_client = self.ws_client.clone();
267        let emitter = self.emitter.clone();
268        let clock = self.clock;
269
270        self.spawn_task(task_name, async move {
271            let result = ws_client
272                .submit_order(
273                    order_side,
274                    params,
275                    client_order_id,
276                    trader_id,
277                    strategy_id,
278                    instrument_id,
279                )
280                .await;
281
282            if let Err(e) = result {
283                let ts_event = clock.get_time_ns();
284                emitter.emit_order_rejected_event(
285                    strategy_id,
286                    instrument_id,
287                    client_order_id,
288                    &format!("{task_name}-error: {e}"),
289                    ts_event,
290                    false,
291                );
292                return Err(e.into());
293            }
294
295            Ok(())
296        });
297
298        Ok(())
299    }
300
301    /// Spawns a stream handler to dispatch WebSocket messages to the execution engine.
302    fn spawn_stream_handler(
303        &mut self,
304        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
305    ) {
306        if self.ws_stream_handle.is_some() {
307            return;
308        }
309
310        let emitter = self.emitter.clone();
311
312        let handle = get_runtime().spawn(async move {
313            pin_mut!(stream);
314            while let Some(message) = stream.next().await {
315                dispatch_ws_message(message, &emitter);
316            }
317        });
318
319        self.ws_stream_handle = Some(handle);
320        log::info!("WebSocket stream handler started");
321    }
322}
323
324#[async_trait(?Send)]
325impl ExecutionClient for DeribitExecutionClient {
326    fn is_connected(&self) -> bool {
327        self.core.is_connected()
328    }
329
330    fn client_id(&self) -> ClientId {
331        self.core.client_id
332    }
333
334    fn account_id(&self) -> AccountId {
335        self.core.account_id
336    }
337
338    fn venue(&self) -> Venue {
339        *DERIBIT_VENUE
340    }
341
342    fn oms_type(&self) -> OmsType {
343        self.core.oms_type
344    }
345
346    fn get_account(&self) -> Option<AccountAny> {
347        self.core.cache().account(&self.core.account_id).cloned()
348    }
349
350    fn generate_account_state(
351        &self,
352        balances: Vec<AccountBalance>,
353        margins: Vec<MarginBalance>,
354        reported: bool,
355        ts_event: UnixNanos,
356    ) -> anyhow::Result<()> {
357        self.emitter
358            .emit_account_state(balances, margins, reported, ts_event);
359        Ok(())
360    }
361
362    fn start(&mut self) -> anyhow::Result<()> {
363        if self.core.is_started() {
364            return Ok(());
365        }
366
367        let sender = get_exec_event_sender();
368        self.emitter.set_sender(sender);
369        self.core.set_started();
370
371        log::info!(
372            "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, use_testnet={}",
373            self.core.client_id,
374            self.core.account_id,
375            self.core.account_type,
376            self.config.product_types,
377            self.config.use_testnet
378        );
379        Ok(())
380    }
381
382    fn stop(&mut self) -> anyhow::Result<()> {
383        if self.core.is_stopped() {
384            return Ok(());
385        }
386
387        self.core.set_stopped();
388        self.core.set_disconnected();
389        self.abort_pending_tasks();
390        log::info!("Stopped: client_id={}", self.core.client_id);
391        Ok(())
392    }
393
394    async fn connect(&mut self) -> anyhow::Result<()> {
395        if self.core.is_connected() {
396            return Ok(());
397        }
398
399        // Check if credentials are available before requesting account state
400        if !self.config.has_api_credentials() {
401            anyhow::bail!("Missing API credentials; set Deribit environment variables");
402        }
403
404        // Set account ID for order/fill reports
405        self.ws_client.set_account_id(self.core.account_id);
406
407        // Fetch and cache instruments in both HTTP client and WebSocket client
408        if !self.core.instruments_initialized() {
409            for product_type in &self.config.product_types {
410                let instruments = self
411                    .http_client
412                    .request_instruments(DeribitCurrency::ANY, Some(*product_type))
413                    .await
414                    .with_context(|| {
415                        format!("failed to request instruments for {product_type:?}")
416                    })?;
417
418                if instruments.is_empty() {
419                    log::warn!("No instruments returned for {product_type:?}");
420                    continue;
421                }
422
423                log::info!("Fetched {} {product_type:?} instruments", instruments.len());
424                self.ws_client.cache_instruments(instruments.clone());
425                self.http_client.cache_instruments(instruments);
426            }
427            self.core.set_instruments_initialized();
428        }
429
430        // Fetch initial account state
431        let account_state = self
432            .http_client
433            .request_account_state(self.core.account_id)
434            .await
435            .context("failed to request account state")?;
436
437        self.emitter.send_account_state(account_state);
438
439        self.ws_client
440            .connect()
441            .await
442            .context("failed to connect WebSocket client for execution")?;
443
444        self.ws_client
445            .authenticate_session(DERIBIT_EXECUTION_SESSION_NAME)
446            .await
447            .map_err(|e| anyhow::anyhow!("failed to authenticate WebSocket session: {e}"))?;
448
449        log::info!("WebSocket client authenticated for execution");
450
451        // Subscribe to user order and trade updates for all instruments
452        self.ws_client
453            .subscribe_user_orders()
454            .await
455            .map_err(|e| anyhow::anyhow!("failed to subscribe to user orders: {e}"))?;
456        self.ws_client
457            .subscribe_user_trades()
458            .await
459            .map_err(|e| anyhow::anyhow!("failed to subscribe to user trades: {e}"))?;
460        self.ws_client
461            .subscribe_user_portfolio()
462            .await
463            .map_err(|e| anyhow::anyhow!("failed to subscribe to user portfolio: {e}"))?;
464
465        log::info!("Subscribed to user order, trade, and portfolio updates");
466
467        // Spawn stream handler to dispatch WebSocket messages to the execution engine
468        let stream = self.ws_client.stream();
469        self.spawn_stream_handler(stream);
470
471        self.core.set_connected();
472        log::info!("Connected: client_id={}", self.core.client_id);
473        Ok(())
474    }
475
476    async fn disconnect(&mut self) -> anyhow::Result<()> {
477        if self.core.is_disconnected() {
478            return Ok(());
479        }
480
481        self.abort_pending_tasks();
482
483        // Abort stream handler
484        if let Some(handle) = self.ws_stream_handle.take() {
485            handle.abort();
486        }
487
488        // Close WebSocket client
489        if let Err(e) = self.ws_client.close().await {
490            log::warn!("Error closing WebSocket client: {e}");
491        }
492
493        self.core.set_disconnected();
494        log::info!("Disconnected: client_id={}", self.core.client_id);
495        Ok(())
496    }
497
498    async fn generate_order_status_report(
499        &self,
500        cmd: &GenerateOrderStatusReport,
501    ) -> anyhow::Result<Option<OrderStatusReport>> {
502        // If venue_order_id is provided, fetch the specific order by ID
503        if let Some(venue_order_id) = &cmd.venue_order_id {
504            let params = GetOrderStateParams {
505                order_id: venue_order_id.to_string(),
506            };
507            let ts_init = self.clock.get_time_ns();
508
509            match self.http_client.inner.get_order_state(params).await {
510                Ok(response) => {
511                    if let Some(order) = response.result {
512                        let symbol = ustr::Ustr::from(&order.instrument_name);
513                        if let Some(instrument) = self.http_client.get_instrument(&symbol) {
514                            let report = parse_user_order_msg(
515                                &order,
516                                &instrument,
517                                self.core.account_id,
518                                ts_init,
519                            )?;
520                            return Ok(Some(report));
521                        } else {
522                            log::warn!(
523                                "Instrument {} not in cache for order {}",
524                                order.instrument_name,
525                                order.order_id
526                            );
527                        }
528                    }
529                }
530                Err(e) => {
531                    log::warn!("Failed to get order state: {e}");
532                }
533            }
534            return Ok(None);
535        }
536
537        // If client_order_id is provided, search through open orders
538        if let Some(client_order_id) = &cmd.client_order_id {
539            let reports = self
540                .http_client
541                .request_order_status_reports(
542                    self.core.account_id,
543                    cmd.instrument_id,
544                    None,
545                    None,
546                    true, // open_only for efficiency
547                )
548                .await?;
549
550            // Filter by client_order_id
551            for report in reports {
552                if report.client_order_id == Some(*client_order_id) {
553                    return Ok(Some(report));
554                }
555            }
556        }
557
558        Ok(None)
559    }
560
561    async fn generate_order_status_reports(
562        &self,
563        cmd: &GenerateOrderStatusReports,
564    ) -> anyhow::Result<Vec<OrderStatusReport>> {
565        self.http_client
566            .request_order_status_reports(
567                self.core.account_id,
568                cmd.instrument_id,
569                cmd.start,
570                cmd.end,
571                cmd.open_only,
572            )
573            .await
574    }
575
576    async fn generate_fill_reports(
577        &self,
578        cmd: GenerateFillReports,
579    ) -> anyhow::Result<Vec<FillReport>> {
580        let mut reports = self
581            .http_client
582            .request_fill_reports(self.core.account_id, cmd.instrument_id, cmd.start, cmd.end)
583            .await?;
584
585        // Filter by venue_order_id if provided
586        if let Some(venue_order_id) = &cmd.venue_order_id {
587            reports.retain(|r| r.venue_order_id.to_string() == venue_order_id.to_string());
588        }
589
590        Ok(reports)
591    }
592
593    async fn generate_position_status_reports(
594        &self,
595        cmd: &GeneratePositionStatusReports,
596    ) -> anyhow::Result<Vec<PositionStatusReport>> {
597        self.http_client
598            .request_position_status_reports(self.core.account_id, cmd.instrument_id)
599            .await
600    }
601
602    async fn generate_mass_status(
603        &self,
604        lookback_mins: Option<u64>,
605    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
606        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
607        let ts_now = self.clock.get_time_ns();
608        let start = lookback_mins.map(|mins| {
609            let lookback_ns = mins
610                .saturating_mul(60)
611                .saturating_mul(NANOSECONDS_IN_SECOND);
612            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
613        });
614
615        let order_cmd = GenerateOrderStatusReportsBuilder::default()
616            .ts_init(ts_now)
617            .open_only(false) // get all orders for mass status
618            .start(start)
619            .build()
620            .context("Failed to build GenerateOrderStatusReports")?;
621
622        let fill_cmd = GenerateFillReportsBuilder::default()
623            .ts_init(ts_now)
624            .start(start)
625            .build()
626            .context("Failed to build GenerateFillReports")?;
627
628        let position_cmd = GeneratePositionStatusReportsBuilder::default()
629            .ts_init(ts_now)
630            .start(start)
631            .build()
632            .context("Failed to build GeneratePositionStatusReports")?;
633
634        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
635            self.generate_order_status_reports(&order_cmd),
636            self.generate_fill_reports(fill_cmd),
637            self.generate_position_status_reports(&position_cmd),
638        )?;
639
640        log::info!("Received {} OrderStatusReports", order_reports.len());
641        log::info!("Received {} FillReports", fill_reports.len());
642        log::info!("Received {} PositionReports", position_reports.len());
643
644        let mut mass_status = ExecutionMassStatus::new(
645            self.core.client_id,
646            self.core.account_id,
647            *DERIBIT_VENUE,
648            ts_now,
649            None,
650        );
651
652        mass_status.add_order_reports(order_reports);
653        mass_status.add_fill_reports(fill_reports);
654        mass_status.add_position_reports(position_reports);
655
656        Ok(Some(mass_status))
657    }
658
659    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
660        let http_client = self.http_client.clone();
661        let account_id = self.core.account_id;
662        let emitter = self.emitter.clone();
663
664        self.spawn_task("query_account", async move {
665            let account_state = http_client
666                .request_account_state(account_id)
667                .await
668                .context("failed to query account state (check API credentials are valid)")?;
669
670            emitter.send_account_state(account_state);
671            Ok(())
672        });
673
674        Ok(())
675    }
676
677    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
678        let ws_client = self.ws_client.clone();
679
680        // Extract venue order ID (Deribit's order_id)
681        let order_id = cmd
682            .venue_order_id
683            .as_ref()
684            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for query_order"))?
685            .to_string();
686
687        let client_order_id = cmd.client_order_id;
688        let trader_id = cmd.trader_id;
689        let strategy_id = cmd.strategy_id;
690        let instrument_id = cmd.instrument_id;
691
692        log::info!("Querying order state: order_id={order_id}, client_order_id={client_order_id}");
693
694        // Spawn async task to query order state via WebSocket
695        // Response will be dispatched through the WebSocket stream handler as OrderStatusReport
696        self.spawn_task("query_order", async move {
697            ws_client
698                .query_order(
699                    &order_id,
700                    client_order_id,
701                    trader_id,
702                    strategy_id,
703                    instrument_id,
704                )
705                .await
706                .map_err(|e| anyhow::anyhow!("Query order state failed: {e}"))?;
707            Ok(())
708        });
709
710        Ok(())
711    }
712
713    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
714        let order = self
715            .core
716            .cache()
717            .order(&cmd.client_order_id)
718            .cloned()
719            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
720        self.submit_single_order(&order, "submit_order")
721    }
722
723    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
724        if cmd.order_list.client_order_ids.is_empty() {
725            log::debug!("submit_order_list called with empty order list");
726            return Ok(());
727        }
728
729        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
730
731        log::info!(
732            "Submitting order list {} with {} orders for instrument={}",
733            cmd.order_list.id,
734            orders.len(),
735            cmd.instrument_id
736        );
737
738        // Deribit doesn't have native batch order submission
739        // Loop through and submit each order individually using shared helper
740        for order in &orders {
741            self.submit_single_order(order, "submit_order_list_item")?;
742        }
743
744        Ok(())
745    }
746
747    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
748        let ws_client = self.ws_client.clone();
749
750        // Extract venue order ID (Deribit's order_id)
751        let order_id = cmd
752            .venue_order_id
753            .as_ref()
754            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify_order"))?
755            .to_string();
756
757        // Extract quantity - if not provided, get from order in cache
758        let quantity = if let Some(qty) = cmd.quantity {
759            qty
760        } else {
761            // Get order from cache to use its current quantity
762            let cache = self.core.cache();
763            let order = cache
764                .order(&cmd.client_order_id)
765                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
766            order.quantity()
767        };
768
769        let price = cmd
770            .price
771            .ok_or_else(|| anyhow::anyhow!("price required for modify_order"))?;
772
773        let client_order_id = cmd.client_order_id;
774        let trader_id = cmd.trader_id;
775        let strategy_id = cmd.strategy_id;
776        let instrument_id = cmd.instrument_id;
777        let venue_order_id = cmd.venue_order_id;
778        let emitter = self.emitter.clone();
779        let clock = self.clock;
780
781        log::info!(
782            "Modifying order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
783        );
784
785        // Spawn async task to send modify via WebSocket
786        self.spawn_task("modify_order", async move {
787            if let Err(e) = ws_client
788                .modify_order(
789                    &order_id,
790                    quantity,
791                    price,
792                    client_order_id,
793                    trader_id,
794                    strategy_id,
795                    instrument_id,
796                )
797                .await
798            {
799                log::error!(
800                    "Modify order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
801                );
802
803                let ts_event = clock.get_time_ns();
804                emitter.emit_order_modify_rejected_event(
805                    strategy_id,
806                    instrument_id,
807                    client_order_id,
808                    venue_order_id,
809                    &format!("modify-order-error: {e}"),
810                    ts_event,
811                );
812
813                anyhow::bail!("Modify order failed: {e}");
814            }
815            Ok(())
816        });
817
818        Ok(())
819    }
820
821    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
822        let ws_client = self.ws_client.clone();
823
824        // Extract venue order ID (Deribit's order_id)
825        let order_id = cmd
826            .venue_order_id
827            .as_ref()
828            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for cancel_order"))?
829            .to_string();
830
831        let client_order_id = cmd.client_order_id;
832        let trader_id = cmd.trader_id;
833        let strategy_id = cmd.strategy_id;
834        let instrument_id = cmd.instrument_id;
835        let venue_order_id = cmd.venue_order_id;
836        let emitter = self.emitter.clone();
837        let clock = self.clock;
838
839        log::info!("Canceling order: order_id={order_id}, client_order_id={client_order_id}");
840
841        // Spawn async task to send cancel via WebSocket
842        self.spawn_task("cancel_order", async move {
843            if let Err(e) = ws_client
844                .cancel_order(
845                    &order_id,
846                    client_order_id,
847                    trader_id,
848                    strategy_id,
849                    instrument_id,
850                )
851                .await
852            {
853                log::error!(
854                    "Cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
855                );
856
857                let ts_event = clock.get_time_ns();
858                emitter.emit_order_cancel_rejected_event(
859                    strategy_id,
860                    instrument_id,
861                    client_order_id,
862                    venue_order_id,
863                    &format!("cancel-order-error: {e}"),
864                    ts_event,
865                );
866
867                anyhow::bail!("Cancel order failed: {e}");
868            }
869            Ok(())
870        });
871
872        Ok(())
873    }
874
875    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
876        let instrument_id = cmd.instrument_id;
877
878        // If NoOrderSide, use efficient bulk cancel via Deribit API
879        if cmd.order_side == OrderSide::NoOrderSide {
880            log::info!(
881                "Cancelling all orders: instrument={instrument_id}, order_side=NoOrderSide (bulk)"
882            );
883
884            let ws_client = self.ws_client.clone();
885            self.spawn_task("cancel_all_orders", async move {
886                if let Err(e) = ws_client.cancel_all_orders(instrument_id, None).await {
887                    log::error!("Cancel all orders failed for instrument {instrument_id}: {e}");
888                    anyhow::bail!("Cancel all orders failed: {e}");
889                }
890                Ok(())
891            });
892
893            return Ok(());
894        }
895
896        // For specific side (Buy/Sell), filter from cache and cancel individually
897        // Deribit API doesn't support side filtering, so we implement it locally
898        log::info!(
899            "Cancelling orders by side: instrument={}, order_side={}",
900            instrument_id,
901            cmd.order_side
902        );
903
904        let orders_to_cancel: Vec<_> = {
905            let cache = self.core.cache();
906            let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
907
908            open_orders
909                .into_iter()
910                .filter(|order| order.order_side() == cmd.order_side)
911                .filter_map(|order| {
912                    let venue_order_id = order.venue_order_id()?;
913                    Some((
914                        venue_order_id.to_string(),
915                        order.client_order_id(),
916                        order.instrument_id(),
917                        Some(venue_order_id),
918                    ))
919                })
920                .collect()
921        };
922
923        if orders_to_cancel.is_empty() {
924            log::debug!(
925                "No open {} orders to cancel for {}",
926                cmd.order_side,
927                instrument_id
928            );
929            return Ok(());
930        }
931
932        log::info!(
933            "Cancelling {} {} orders for {}",
934            orders_to_cancel.len(),
935            cmd.order_side,
936            instrument_id
937        );
938
939        // Cancel each matching order individually
940        for (venue_order_id_str, client_order_id, order_instrument_id, venue_order_id) in
941            orders_to_cancel
942        {
943            let ws_client = self.ws_client.clone();
944            let trader_id = cmd.trader_id;
945            let strategy_id = cmd.strategy_id;
946            let emitter = self.emitter.clone();
947            let clock = self.clock;
948
949            self.spawn_task("cancel_order_by_side", async move {
950                if let Err(e) = ws_client
951                    .cancel_order(
952                        &venue_order_id_str,
953                        client_order_id,
954                        trader_id,
955                        strategy_id,
956                        order_instrument_id,
957                    )
958                    .await
959                {
960                    log::error!(
961                        "Cancel order failed: order_id={venue_order_id_str}, client_order_id={client_order_id}, error={e}"
962                    );
963
964                    let ts_event = clock.get_time_ns();
965                    emitter.emit_order_cancel_rejected_event(
966                        strategy_id,
967                        order_instrument_id,
968                        client_order_id,
969                        venue_order_id,
970                        &format!("cancel-order-error: {e}"),
971                        ts_event,
972                    );
973                }
974                Ok(())
975            });
976        }
977
978        Ok(())
979    }
980
981    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
982        if cmd.cancels.is_empty() {
983            log::debug!("batch_cancel_orders called with empty cancels list");
984            return Ok(());
985        }
986
987        log::info!(
988            "Batch cancelling {} orders for instrument={}",
989            cmd.cancels.len(),
990            cmd.instrument_id
991        );
992
993        // Deribit doesn't have native batch cancel by order ID
994        // Loop through and cancel each order individually
995        for cancel in &cmd.cancels {
996            let order_id = match &cancel.venue_order_id {
997                Some(id) => id.to_string(),
998                None => {
999                    log::warn!(
1000                        "Cannot cancel order {} - no venue_order_id",
1001                        cancel.client_order_id
1002                    );
1003
1004                    // Emit OrderCancelRejected event for missing venue_order_id
1005                    let ts_event = self.clock.get_time_ns();
1006                    self.emitter.emit_order_cancel_rejected_event(
1007                        cancel.strategy_id,
1008                        cancel.instrument_id,
1009                        cancel.client_order_id,
1010                        None,
1011                        "venue_order_id required for cancel",
1012                        ts_event,
1013                    );
1014                    continue;
1015                }
1016            };
1017
1018            let ws_client = self.ws_client.clone();
1019            let emitter = self.emitter.clone();
1020            let clock = self.clock;
1021            let client_order_id = cancel.client_order_id;
1022            let trader_id = cancel.trader_id;
1023            let strategy_id = cancel.strategy_id;
1024            let instrument_id = cancel.instrument_id;
1025
1026            self.spawn_task("batch_cancel_order", async move {
1027                if let Err(e) = ws_client
1028                    .cancel_order(
1029                        &order_id,
1030                        client_order_id,
1031                        trader_id,
1032                        strategy_id,
1033                        instrument_id,
1034                    )
1035                    .await
1036                {
1037                    log::error!(
1038                        "Batch cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
1039                    );
1040
1041                    let ts_event = clock.get_time_ns();
1042                    emitter.emit_order_cancel_rejected_event(
1043                        strategy_id,
1044                        instrument_id,
1045                        client_order_id,
1046                        None,
1047                        &format!("batch-cancel-error: {e}"),
1048                        ts_event,
1049                    );
1050
1051                    anyhow::bail!("Batch cancel order failed: {e}");
1052                }
1053                Ok(())
1054            });
1055        }
1056
1057        Ok(())
1058    }
1059}
1060
1061/// Dispatches a WebSocket message using the event emitter.
1062fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1063    match message {
1064        NautilusWsMessage::AccountState(state) => {
1065            emitter.send_account_state(state);
1066        }
1067        NautilusWsMessage::OrderStatusReports(reports) => {
1068            log::debug!("Processing {} order status report(s)", reports.len());
1069            for report in reports {
1070                emitter.send_order_status_report(report);
1071            }
1072        }
1073        NautilusWsMessage::FillReports(reports) => {
1074            log::debug!("Processing {} fill report(s)", reports.len());
1075            for report in reports {
1076                emitter.send_fill_report(report);
1077            }
1078        }
1079        NautilusWsMessage::OrderRejected(event) => {
1080            emitter.send_order_event(OrderEventAny::Rejected(event));
1081        }
1082        NautilusWsMessage::OrderAccepted(event) => {
1083            emitter.send_order_event(OrderEventAny::Accepted(event));
1084        }
1085        NautilusWsMessage::OrderCanceled(event) => {
1086            emitter.send_order_event(OrderEventAny::Canceled(event));
1087        }
1088        NautilusWsMessage::OrderExpired(event) => {
1089            emitter.send_order_event(OrderEventAny::Expired(event));
1090        }
1091        NautilusWsMessage::OrderUpdated(event) => {
1092            emitter.send_order_event(OrderEventAny::Updated(event));
1093        }
1094        NautilusWsMessage::OrderCancelRejected(event) => {
1095            emitter.send_order_event(OrderEventAny::CancelRejected(event));
1096        }
1097        NautilusWsMessage::OrderModifyRejected(event) => {
1098            emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1099        }
1100        NautilusWsMessage::Error(e) => {
1101            log::warn!("WebSocket error: {e}");
1102        }
1103        NautilusWsMessage::Reconnected => {
1104            log::info!("WebSocket reconnected");
1105        }
1106        NautilusWsMessage::Authenticated(auth) => {
1107            log::debug!("WebSocket authenticated: scope={}", auth.scope);
1108        }
1109        NautilusWsMessage::Data(_)
1110        | NautilusWsMessage::Deltas(_)
1111        | NautilusWsMessage::Instrument(_)
1112        | NautilusWsMessage::FundingRates(_)
1113        | NautilusWsMessage::Raw(_) => {
1114            // Data messages are handled by the data client, not execution
1115            log::trace!("Ignoring data message in execution client");
1116        }
1117    }
1118}