Skip to main content

nautilus_deribit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Deribit API.
17//!
18//! The [`DeribitWebSocketClient`] provides connectivity to Deribit's WebSocket API using
19//! JSON-RPC 2.0. It supports subscribing to market data channels including trades, order books,
20//! and tickers.
21
22use std::{
23    fmt::Debug,
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::{enums::LogColor, live::get_runtime, log_info};
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt, time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39    data::BarType,
40    enums::OrderSide,
41    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
42    instruments::{Instrument, InstrumentAny},
43    types::{Price, Quantity},
44};
45use nautilus_network::{
46    http::USER_AGENT,
47    mode::ConnectionMode,
48    websocket::{
49        AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
50        channel_message_handler,
51    },
52};
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use super::{
57    auth::{AuthState, send_auth_request, spawn_token_refresh_task},
58    enums::{DeribitUpdateInterval, DeribitWsChannel},
59    error::{DeribitWsError, DeribitWsResult},
60    handler::{DeribitWsFeedHandler, HandlerCommand},
61    messages::{
62        DeribitCancelAllByInstrumentParams, DeribitCancelParams, DeribitEditParams,
63        DeribitOrderParams, NautilusWsMessage,
64    },
65};
66use crate::common::{
67    consts::{
68        DERIBIT_TESTNET_WS_URL, DERIBIT_WS_HEARTBEAT_SECS, DERIBIT_WS_ORDER_KEY,
69        DERIBIT_WS_ORDER_QUOTA, DERIBIT_WS_SUBSCRIPTION_KEY, DERIBIT_WS_SUBSCRIPTION_QUOTA,
70        DERIBIT_WS_URL,
71    },
72    credential::Credential,
73    parse::bar_spec_to_resolution,
74};
75
76/// Authentication timeout in seconds.
77const AUTHENTICATION_TIMEOUT_SECS: u64 = 30;
78
79/// WebSocket client for connecting to Deribit.
80#[derive(Clone)]
81#[cfg_attr(
82    feature = "python",
83    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit")
84)]
85pub struct DeribitWebSocketClient {
86    url: String,
87    is_testnet: bool,
88    heartbeat_interval: Option<u64>,
89    credential: Option<Credential>,
90    auth_state: Arc<tokio::sync::RwLock<Option<AuthState>>>,
91    signal: Arc<AtomicBool>,
92    connection_mode: Arc<ArcSwap<AtomicU8>>,
93    auth_tracker: AuthTracker,
94    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
95    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
96    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
97    subscriptions_state: SubscriptionState,
98    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
99    cancellation_token: CancellationToken,
100    account_id: Option<AccountId>,
101    bars_timestamp_on_close: bool,
102}
103
104impl Debug for DeribitWebSocketClient {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct(stringify!(DeribitWebSocketClient))
107            .field("url", &self.url)
108            .field("is_testnet", &self.is_testnet)
109            .field("has_credentials", &self.credential.is_some())
110            .field("is_authenticated", &self.auth_tracker.is_authenticated())
111            .field(
112                "has_auth_state",
113                &self.auth_state.try_read().is_ok_and(|s| s.is_some()),
114            )
115            .field("heartbeat_interval", &self.heartbeat_interval)
116            .finish_non_exhaustive()
117    }
118}
119
120impl DeribitWebSocketClient {
121    /// Creates a new [`DeribitWebSocketClient`] instance.
122    ///
123    /// Falls back to environment variables if credentials are not provided.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if only one of `api_key` or `api_secret` is provided.
128    pub fn new(
129        url: Option<String>,
130        api_key: Option<String>,
131        api_secret: Option<String>,
132        heartbeat_interval: Option<u64>,
133        is_testnet: bool,
134    ) -> anyhow::Result<Self> {
135        Self::new_inner(
136            url,
137            api_key,
138            api_secret,
139            heartbeat_interval,
140            is_testnet,
141            true,
142        )
143    }
144
145    /// Internal constructor with control over environment variable fallback.
146    fn new_inner(
147        url: Option<String>,
148        api_key: Option<String>,
149        api_secret: Option<String>,
150        heartbeat_interval: Option<u64>,
151        is_testnet: bool,
152        env_fallback: bool,
153    ) -> anyhow::Result<Self> {
154        let url = url.unwrap_or_else(|| {
155            if is_testnet {
156                DERIBIT_TESTNET_WS_URL.to_string()
157            } else {
158                DERIBIT_WS_URL.to_string()
159            }
160        });
161
162        // Resolve credential from config or environment variables (if env_fallback is true)
163        let credential =
164            Credential::resolve_with_env_fallback(api_key, api_secret, is_testnet, env_fallback)?;
165        if credential.is_some() {
166            log::info!("Credentials loaded (testnet={is_testnet})");
167        } else {
168            log::debug!("No credentials configured - unauthenticated mode");
169        }
170
171        let signal = Arc::new(AtomicBool::new(false));
172        let subscriptions_state = SubscriptionState::new('.');
173
174        Ok(Self {
175            url,
176            is_testnet,
177            heartbeat_interval,
178            credential,
179            auth_state: Arc::new(tokio::sync::RwLock::new(None)),
180            signal,
181            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
182                ConnectionMode::Closed.as_u8(),
183            ))),
184            auth_tracker: AuthTracker::new(),
185            cmd_tx: {
186                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
187                Arc::new(tokio::sync::RwLock::new(tx))
188            },
189            out_rx: None,
190            task_handle: None,
191            subscriptions_state,
192            instruments_cache: Arc::new(DashMap::new()),
193            cancellation_token: CancellationToken::new(),
194            account_id: None,
195            bars_timestamp_on_close: true,
196        })
197    }
198
199    /// Creates a new public (unauthenticated) client.
200    ///
201    /// Does NOT fall back to environment variables for credentials.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if initialization fails.
206    pub fn new_public(is_testnet: bool) -> anyhow::Result<Self> {
207        let heartbeat_interval = DERIBIT_WS_HEARTBEAT_SECS;
208        Self::new_inner(
209            None,
210            None,
211            None,
212            Some(heartbeat_interval),
213            is_testnet,
214            false,
215        )
216    }
217
218    /// Creates an unauthenticated client with a custom URL.
219    ///
220    /// Does NOT fall back to environment variables for credentials.
221    /// Useful for testing against mock servers.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if initialization fails.
226    pub fn new_unauthenticated(
227        url: Option<String>,
228        heartbeat_interval: Option<u64>,
229        is_testnet: bool,
230    ) -> anyhow::Result<Self> {
231        Self::new_inner(url, None, None, heartbeat_interval, is_testnet, false)
232    }
233
234    /// Creates an authenticated client with credentials.
235    ///
236    /// Uses environment variables to load credentials:
237    /// - Testnet: `DERIBIT_TESTNET_API_KEY` and `DERIBIT_TESTNET_API_SECRET`
238    /// - Mainnet: `DERIBIT_API_KEY` and `DERIBIT_API_SECRET`
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if credentials are not found in environment variables.
243    pub fn with_credentials(is_testnet: bool) -> anyhow::Result<Self> {
244        let (key_env, secret_env) = if is_testnet {
245            ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
246        } else {
247            ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
248        };
249
250        let api_key = get_or_env_var_opt(None, key_env)
251            .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {key_env}"))?;
252        let api_secret = get_or_env_var_opt(None, secret_env)
253            .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {secret_env}"))?;
254
255        let heartbeat_interval = DERIBIT_WS_HEARTBEAT_SECS;
256        Self::new(
257            None,
258            Some(api_key),
259            Some(api_secret),
260            Some(heartbeat_interval),
261            is_testnet,
262        )
263    }
264
265    /// Returns the current connection mode.
266    fn connection_mode(&self) -> ConnectionMode {
267        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
268        ConnectionMode::from_u8(mode_u8)
269    }
270
271    /// Returns whether the client is actively connected.
272    #[must_use]
273    pub fn is_active(&self) -> bool {
274        self.connection_mode() == ConnectionMode::Active
275    }
276
277    /// Returns the WebSocket URL.
278    #[must_use]
279    pub fn url(&self) -> &str {
280        &self.url
281    }
282
283    /// Returns whether the client is closed.
284    #[must_use]
285    pub fn is_closed(&self) -> bool {
286        self.connection_mode() == ConnectionMode::Disconnect
287    }
288
289    /// Cancel all pending WebSocket requests.
290    pub fn cancel_all_requests(&self) {
291        self.cancellation_token.cancel();
292    }
293
294    /// Returns the cancellation token for this client.
295    #[must_use]
296    pub fn cancellation_token(&self) -> &CancellationToken {
297        &self.cancellation_token
298    }
299
300    /// Waits until the client is active or timeout expires.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the timeout expires before the client becomes active.
305    pub async fn wait_until_active(&self, timeout_secs: f64) -> DeribitWsResult<()> {
306        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
307
308        tokio::time::timeout(timeout, async {
309            while !self.is_active() {
310                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
311            }
312        })
313        .await
314        .map_err(|_| {
315            DeribitWsError::Timeout(format!(
316                "WebSocket connection timeout after {timeout_secs} seconds"
317            ))
318        })?;
319
320        Ok(())
321    }
322
323    /// Caches instruments for use during message parsing.
324    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
325        for inst in instruments {
326            self.instruments_cache
327                .insert(inst.raw_symbol().inner(), inst);
328        }
329        log::debug!("Cached {} instruments", self.instruments_cache.len());
330    }
331
332    /// Caches a single instrument.
333    pub fn cache_instrument(&self, instrument: InstrumentAny) {
334        let symbol = instrument.raw_symbol().inner();
335        self.instruments_cache.insert(symbol, instrument);
336
337        // If connected, send update to handler
338        if self.is_active() {
339            let tx = self.cmd_tx.clone();
340            let inst = self.instruments_cache.get(&symbol).map(|r| r.clone());
341            if let Some(inst) = inst {
342                get_runtime().spawn(async move {
343                    let _ = tx
344                        .read()
345                        .await
346                        .send(HandlerCommand::UpdateInstrument(Box::new(inst)));
347                });
348            }
349        }
350    }
351
352    /// Connects to the Deribit WebSocket API.
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if the connection fails.
357    pub async fn connect(&mut self) -> anyhow::Result<()> {
358        log_info!(
359            "Connecting to WebSocket: {}",
360            self.url,
361            color = LogColor::Blue
362        );
363
364        // Reset stop signal
365        self.signal.store(false, Ordering::Relaxed);
366
367        // Create message handler and channel
368        let (message_handler, raw_rx) = channel_message_handler();
369
370        // No-op ping handler: handler responds to pings directly
371        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
372            // Handler responds to pings internally
373        });
374
375        // Configure WebSocket client
376        let config = WebSocketConfig {
377            url: self.url.clone(),
378            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
379            heartbeat: self.heartbeat_interval,
380            heartbeat_msg: None, // Deribit uses JSON-RPC heartbeat, not text ping
381            reconnect_timeout_ms: Some(5_000),
382            reconnect_delay_initial_ms: None,
383            reconnect_delay_max_ms: None,
384            reconnect_backoff_factor: None,
385            reconnect_jitter_ms: None,
386            reconnect_max_attempts: None,
387        };
388
389        // Configure rate limits
390        let keyed_quotas = vec![
391            (
392                DERIBIT_WS_SUBSCRIPTION_KEY.to_string(),
393                *DERIBIT_WS_SUBSCRIPTION_QUOTA,
394            ),
395            (DERIBIT_WS_ORDER_KEY.to_string(), *DERIBIT_WS_ORDER_QUOTA),
396        ];
397
398        // Connect the WebSocket
399        let ws_client = WebSocketClient::connect(
400            config,
401            Some(message_handler),
402            Some(ping_handler),
403            None, // post_reconnection
404            keyed_quotas,
405            Some(*DERIBIT_WS_SUBSCRIPTION_QUOTA), // Default quota for non-order operations
406        )
407        .await?;
408
409        // Store connection mode
410        self.connection_mode
411            .store(ws_client.connection_mode_atomic());
412
413        // Create message channels
414        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
415        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
416
417        // Store command sender and output receiver
418        *self.cmd_tx.write().await = cmd_tx.clone();
419        self.out_rx = Some(Arc::new(out_rx));
420
421        // Create handler
422        let mut handler = DeribitWsFeedHandler::new(
423            self.signal.clone(),
424            cmd_rx,
425            raw_rx,
426            out_tx,
427            self.auth_tracker.clone(),
428            self.subscriptions_state.clone(),
429            self.account_id,
430            self.bars_timestamp_on_close,
431        );
432
433        // Send client to handler
434        let _ = cmd_tx.send(HandlerCommand::SetClient(ws_client));
435
436        // Replay cached instruments
437        let instruments: Vec<InstrumentAny> =
438            self.instruments_cache.iter().map(|r| r.clone()).collect();
439        if !instruments.is_empty() {
440            log::debug!(
441                "Sending {} cached instruments to handler",
442                instruments.len()
443            );
444            let _ = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments));
445        }
446
447        // Enable heartbeat if configured
448        if let Some(interval) = self.heartbeat_interval {
449            let _ = cmd_tx.send(HandlerCommand::SetHeartbeat { interval });
450        }
451
452        // Spawn handler task
453        let subscriptions_state = self.subscriptions_state.clone();
454        let credential = self.credential.clone();
455        let auth_tracker = self.auth_tracker.clone();
456        let auth_state = self.auth_state.clone();
457
458        let task_handle = get_runtime().spawn(async move {
459            // Track if we're waiting for re-authentication after reconnection
460            let mut pending_reauth = false;
461
462            loop {
463                match handler.next().await {
464                    Some(msg) => match msg {
465                        NautilusWsMessage::Reconnected => {
466                            log::info!("Reconnected to WebSocket");
467
468                            // Get all subscriptions that should be restored
469                            // all_topics() returns confirmed + pending_subscribe, excluding pending_unsubscribe
470                            let channels = subscriptions_state.all_topics();
471
472                            // Mark each channel as failed (transitions confirmed → pending_subscribe)
473                            for channel in &channels {
474                                subscriptions_state.mark_failure(channel);
475                            }
476
477                            // Check if we need to re-authenticate
478                            if let Some(cred) = &credential {
479                                log::info!("Re-authenticating after reconnection...");
480
481                                // Begin auth attempt so succeed() will update state
482                                let _rx = auth_tracker.begin();
483                                pending_reauth = true;
484
485                                // Get the previously used scope for re-authentication
486                                let previous_scope = auth_state
487                                    .read()
488                                    .await
489                                    .as_ref()
490                                    .map(|s| s.scope.clone());
491
492                                // Send re-authentication request
493                                send_auth_request(cred, previous_scope, &cmd_tx);
494                            } else {
495                                // No credentials - resubscribe immediately
496                                if !channels.is_empty() {
497                                    let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
498                                }
499                            }
500                        }
501                        NautilusWsMessage::Authenticated(result) => {
502                            let timestamp = get_atomic_clock_realtime().get_time_ms();
503                            let new_auth_state = AuthState::from_auth_result(&result, timestamp);
504                            *auth_state.write().await = Some(new_auth_state);
505
506                            // Spawn background token refresh task
507                            spawn_token_refresh_task(
508                                result.expires_in,
509                                result.refresh_token.clone(),
510                                cmd_tx.clone(),
511                            );
512
513                            if pending_reauth {
514                                pending_reauth = false;
515                                log::info!(
516                                    "Re-authentication successful (scope: {}), resubscribing to channels",
517                                    result.scope
518                                );
519
520                                // Now resubscribe to all channels using all_topics()
521                                let channels = subscriptions_state.all_topics();
522
523                                if !channels.is_empty() {
524                                    let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
525                                }
526                            } else {
527                                // Initial authentication completed
528                                log::debug!(
529                                    "Auth state stored: scope={}, expires_in={}s",
530                                    result.scope,
531                                    result.expires_in
532                                );
533                            }
534                        }
535                        _ => {}
536                    },
537                    None => {
538                        log::debug!("Handler returned None, stopping task");
539                        break;
540                    }
541                }
542            }
543        });
544
545        self.task_handle = Some(Arc::new(task_handle));
546        log::info!("Connected to WebSocket");
547
548        Ok(())
549    }
550
551    /// Closes the WebSocket connection.
552    ///
553    /// # Errors
554    ///
555    /// Returns an error if the close operation fails.
556    pub async fn close(&self) -> DeribitWsResult<()> {
557        log::info!("Closing WebSocket connection");
558        self.signal.store(true, Ordering::Relaxed);
559
560        let _ = self.cmd_tx.read().await.send(HandlerCommand::Disconnect);
561
562        // Wait for task to complete
563        if let Some(_handle) = &self.task_handle {
564            let _ = tokio::time::timeout(Duration::from_secs(5), async {
565                // Can't actually await the handle since we don't own it
566                tokio::time::sleep(Duration::from_millis(100)).await;
567            })
568            .await;
569        }
570
571        self.auth_tracker.invalidate();
572
573        Ok(())
574    }
575
576    /// Returns a stream of WebSocket messages.
577    ///
578    /// # Panics
579    ///
580    /// Panics if called before `connect()` or if called twice.
581    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
582        let rx = self
583            .out_rx
584            .take()
585            .expect("Data stream receiver already taken or not connected");
586        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
587
588        async_stream::stream! {
589            while let Some(msg) = rx.recv().await {
590                yield msg;
591            }
592        }
593    }
594
595    /// Returns whether the client has credentials configured.
596    #[must_use]
597    pub fn has_credentials(&self) -> bool {
598        self.credential.is_some()
599    }
600
601    /// Returns whether the client is authenticated.
602    #[must_use]
603    pub fn is_authenticated(&self) -> bool {
604        self.auth_tracker.is_authenticated()
605    }
606
607    /// Authenticates the WebSocket session with Deribit.
608    ///
609    /// Uses the `client_signature` grant type with HMAC-SHA256 signature.
610    /// This must be called before subscribing to raw data streams.
611    ///
612    /// # Arguments
613    ///
614    /// * `session_name` - Optional session name for session-scoped authentication.
615    ///   When provided, uses `session:<name>` scope which allows skipping `access_token`
616    ///   in subsequent private requests. When `None`, uses default `connection` scope.
617    ///   Recommended to use session scope for order execution compatibility.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if:
622    /// - No credentials are configured
623    /// - The authentication request fails
624    /// - The authentication times out
625    pub async fn authenticate(&self, session_name: Option<&str>) -> DeribitWsResult<()> {
626        let credential = self.credential.as_ref().ok_or_else(|| {
627            DeribitWsError::Authentication("API credentials not configured".to_string())
628        })?;
629
630        // Determine scope
631        let scope = session_name.map(|name| format!("session:{name}"));
632
633        log::info!("Authenticating WebSocket...");
634
635        let rx = self.auth_tracker.begin();
636
637        // Send authentication request
638        let cmd_tx = self.cmd_tx.read().await;
639        send_auth_request(credential, scope, &cmd_tx);
640        drop(cmd_tx);
641
642        // Wait for authentication result with timeout
643        match self
644            .auth_tracker
645            .wait_for_result::<DeribitWsError>(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
646            .await
647        {
648            Ok(()) => {
649                log::info!("WebSocket authenticated successfully");
650                Ok(())
651            }
652            Err(e) => {
653                log::error!("WebSocket authentication failed: error={e}");
654                Err(e)
655            }
656        }
657    }
658
659    /// Authenticates with session scope using the provided session name.
660    ///
661    /// Use `DERIBIT_DATA_SESSION_NAME` for data clients and
662    /// `DERIBIT_EXECUTION_SESSION_NAME` for execution clients.
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if authentication fails.
667    pub async fn authenticate_session(&self, session_name: &str) -> DeribitWsResult<()> {
668        self.authenticate(Some(session_name)).await
669    }
670
671    /// Returns the current authentication state containing tokens.
672    ///
673    /// Returns `None` if not authenticated or tokens haven't been stored yet.
674    pub async fn auth_state(&self) -> Option<AuthState> {
675        self.auth_state.read().await.clone()
676    }
677
678    /// Returns the current access token if available.
679    pub async fn access_token(&self) -> Option<String> {
680        self.auth_state
681            .read()
682            .await
683            .as_ref()
684            .map(|s| s.access_token.clone())
685    }
686
687    /// Sets the account ID for order/fill reports.
688    pub fn set_account_id(&mut self, account_id: AccountId) {
689        self.account_id = Some(account_id);
690    }
691
692    /// Sets whether bar timestamps should use the close time.
693    ///
694    /// When `true` (default), bar `ts_event` is set to the bar's close time.
695    pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
696        self.bars_timestamp_on_close = value;
697    }
698
699    async fn send_subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
700        let mut channels_to_subscribe = Vec::new();
701
702        for channel in channels {
703            if self.subscriptions_state.add_reference(&channel) {
704                self.subscriptions_state.mark_subscribe(&channel);
705                channels_to_subscribe.push(channel);
706            } else {
707                log::debug!("Already subscribed to {channel}, skipping duplicate subscription");
708            }
709        }
710
711        if channels_to_subscribe.is_empty() {
712            return Ok(());
713        }
714
715        self.cmd_tx
716            .read()
717            .await
718            .send(HandlerCommand::Subscribe {
719                channels: channels_to_subscribe.clone(),
720            })
721            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
722
723        log::debug!(
724            "Sent subscribe for {} channels",
725            channels_to_subscribe.len()
726        );
727        Ok(())
728    }
729
730    async fn send_unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
731        let mut channels_to_unsubscribe = Vec::new();
732
733        for channel in channels {
734            if self.subscriptions_state.remove_reference(&channel) {
735                self.subscriptions_state.mark_unsubscribe(&channel);
736                channels_to_unsubscribe.push(channel);
737            } else {
738                log::debug!("Still has references to {channel}, skipping unsubscription");
739            }
740        }
741
742        if channels_to_unsubscribe.is_empty() {
743            return Ok(());
744        }
745
746        self.cmd_tx
747            .read()
748            .await
749            .send(HandlerCommand::Unsubscribe {
750                channels: channels_to_unsubscribe.clone(),
751            })
752            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
753
754        log::debug!(
755            "Sent unsubscribe for {} channels",
756            channels_to_unsubscribe.len()
757        );
758        Ok(())
759    }
760
761    /// Subscribes to trade updates for an instrument.
762    ///
763    /// # Arguments
764    ///
765    /// * `instrument_id` - The instrument to subscribe to
766    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if subscription fails or raw is requested without authentication.
771    pub async fn subscribe_trades(
772        &self,
773        instrument_id: InstrumentId,
774        interval: Option<DeribitUpdateInterval>,
775    ) -> DeribitWsResult<()> {
776        let interval = interval.unwrap_or_default();
777        self.check_auth_requirement(interval)?;
778        let channel =
779            DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
780        self.send_subscribe(vec![channel]).await
781    }
782
783    /// Unsubscribes from trade updates for an instrument.
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if unsubscription fails.
788    pub async fn unsubscribe_trades(
789        &self,
790        instrument_id: InstrumentId,
791        interval: Option<DeribitUpdateInterval>,
792    ) -> DeribitWsResult<()> {
793        let interval = interval.unwrap_or_default();
794        let channel =
795            DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
796        self.send_unsubscribe(vec![channel]).await
797    }
798
799    /// Subscribes to order book updates for an instrument.
800    ///
801    /// # Arguments
802    ///
803    /// * `instrument_id` - The instrument to subscribe to
804    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
805    ///
806    /// # Errors
807    ///
808    /// Returns an error if subscription fails or raw is requested without authentication.
809    pub async fn subscribe_book(
810        &self,
811        instrument_id: InstrumentId,
812        interval: Option<DeribitUpdateInterval>,
813    ) -> DeribitWsResult<()> {
814        let interval = interval.unwrap_or_default();
815        self.check_auth_requirement(interval)?;
816        let channel =
817            DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
818        self.send_subscribe(vec![channel]).await
819    }
820
821    /// Unsubscribes from order book updates for an instrument.
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if unsubscription fails.
826    pub async fn unsubscribe_book(
827        &self,
828        instrument_id: InstrumentId,
829        interval: Option<DeribitUpdateInterval>,
830    ) -> DeribitWsResult<()> {
831        let interval = interval.unwrap_or_default();
832        let channel =
833            DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
834        self.send_unsubscribe(vec![channel]).await
835    }
836
837    /// Subscribes to grouped (depth-limited) order book updates for an instrument.
838    ///
839    /// Uses the Deribit grouped book channel format: `book.{instrument}.{group}.{depth}.{interval}`
840    ///
841    /// Depth is normalized to Deribit supported values: 1, 10, or 20.
842    ///
843    /// # Errors
844    ///
845    /// Returns an error if subscription fails or raw is requested without authentication.
846    pub async fn subscribe_book_grouped(
847        &self,
848        instrument_id: InstrumentId,
849        group: &str,
850        depth: u32,
851        interval: Option<DeribitUpdateInterval>,
852    ) -> DeribitWsResult<()> {
853        // Grouped book channel only supports 100ms and agg2, not raw
854        let interval = match interval {
855            Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
856            Some(i) => i,
857        };
858
859        let normalized_depth = if depth < 5 {
860            1
861        } else if depth < 15 {
862            10
863        } else {
864            20
865        };
866
867        let channel = format!(
868            "book.{}.{}.{}.{}",
869            instrument_id.symbol,
870            group,
871            normalized_depth,
872            interval.as_str()
873        );
874        log::debug!("Subscribing to grouped book channel: {channel}");
875        self.send_subscribe(vec![channel]).await
876    }
877
878    /// Unsubscribes from grouped (depth-limited) order book updates for an instrument.
879    ///
880    /// Depth is normalized to Deribit supported values: 1, 10, or 20.
881    ///
882    /// # Errors
883    ///
884    /// Returns an error if unsubscription fails.
885    pub async fn unsubscribe_book_grouped(
886        &self,
887        instrument_id: InstrumentId,
888        group: &str,
889        depth: u32,
890        interval: Option<DeribitUpdateInterval>,
891    ) -> DeribitWsResult<()> {
892        // Grouped book channel only supports 100ms and agg2, not raw
893        let interval = match interval {
894            Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
895            Some(i) => i,
896        };
897
898        let normalized_depth = if depth < 5 {
899            1
900        } else if depth < 15 {
901            10
902        } else {
903            20
904        };
905
906        let channel = format!(
907            "book.{}.{}.{}.{}",
908            instrument_id.symbol,
909            group,
910            normalized_depth,
911            interval.as_str()
912        );
913        self.send_unsubscribe(vec![channel]).await
914    }
915
916    /// Subscribes to ticker updates for an instrument.
917    ///
918    /// # Arguments
919    ///
920    /// * `instrument_id` - The instrument to subscribe to
921    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
922    ///
923    /// # Errors
924    ///
925    /// Returns an error if subscription fails or raw is requested without authentication.
926    pub async fn subscribe_ticker(
927        &self,
928        instrument_id: InstrumentId,
929        interval: Option<DeribitUpdateInterval>,
930    ) -> DeribitWsResult<()> {
931        let interval = interval.unwrap_or_default();
932        self.check_auth_requirement(interval)?;
933        let channel =
934            DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
935        self.send_subscribe(vec![channel]).await
936    }
937
938    /// Unsubscribes from ticker updates for an instrument.
939    ///
940    /// # Errors
941    ///
942    /// Returns an error if unsubscription fails.
943    pub async fn unsubscribe_ticker(
944        &self,
945        instrument_id: InstrumentId,
946        interval: Option<DeribitUpdateInterval>,
947    ) -> DeribitWsResult<()> {
948        let interval = interval.unwrap_or_default();
949        let channel =
950            DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
951        self.send_unsubscribe(vec![channel]).await
952    }
953
954    /// Subscribes to quote (best bid/ask) updates for an instrument.
955    ///
956    /// Note: Quote channel does not support interval parameter.
957    ///
958    /// # Errors
959    ///
960    /// Returns an error if subscription fails.
961    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
962        let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
963        self.send_subscribe(vec![channel]).await
964    }
965
966    /// Unsubscribes from quote updates for an instrument.
967    ///
968    /// # Errors
969    ///
970    /// Returns an error if unsubscription fails.
971    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
972        let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
973        self.send_unsubscribe(vec![channel]).await
974    }
975
976    /// Subscribes to instrument state changes for lifecycle notifications.
977    ///
978    /// Channel format: `instrument.state.{kind}.{currency}`
979    ///
980    /// # Errors
981    ///
982    /// Returns an error if subscription fails.
983    pub async fn subscribe_instrument_state(
984        &self,
985        kind: &str,
986        currency: &str,
987    ) -> DeribitWsResult<()> {
988        let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
989        self.send_subscribe(vec![channel]).await
990    }
991
992    /// Unsubscribes from instrument state changes.
993    ///
994    /// # Errors
995    ///
996    /// Returns an error if unsubscription fails.
997    pub async fn unsubscribe_instrument_state(
998        &self,
999        kind: &str,
1000        currency: &str,
1001    ) -> DeribitWsResult<()> {
1002        let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
1003        self.send_unsubscribe(vec![channel]).await
1004    }
1005
1006    /// Subscribes to perpetual interest rates updates.
1007    ///
1008    /// Channel format: `perpetual.{instrument_name}.{interval}`
1009    ///
1010    /// # Errors
1011    ///
1012    /// Returns an error if subscription fails.
1013    pub async fn subscribe_perpetual_interests_rates_updates(
1014        &self,
1015        instrument_id: InstrumentId,
1016        interval: Option<DeribitUpdateInterval>,
1017    ) -> DeribitWsResult<()> {
1018        let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1019        let channel = DeribitWsChannel::Perpetual
1020            .format_channel(instrument_id.symbol.as_str(), Some(interval));
1021
1022        self.send_subscribe(vec![channel]).await
1023    }
1024
1025    /// Unsubscribes from perpetual interest rates updates.
1026    ///
1027    /// # Errors
1028    ///
1029    /// Returns an error if subscription fails.
1030    pub async fn unsubscribe_perpetual_interest_rates_updates(
1031        &self,
1032        instrument_id: InstrumentId,
1033        interval: Option<DeribitUpdateInterval>,
1034    ) -> DeribitWsResult<()> {
1035        let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1036        let channel = DeribitWsChannel::Perpetual
1037            .format_channel(instrument_id.symbol.as_str(), Some(interval));
1038
1039        self.send_unsubscribe(vec![channel]).await
1040    }
1041
1042    /// Subscribes to chart/OHLC bar updates for an instrument.
1043    ///
1044    /// # Arguments
1045    ///
1046    /// * `instrument_id` - The instrument to subscribe to
1047    /// * `resolution` - Bar resolution: "1", "3", "5", "10", "15", "30", "60", "120", "180",
1048    ///   "360", "720", "1D" (minutes or 1D for daily)
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if subscription fails.
1053    pub async fn subscribe_chart(
1054        &self,
1055        instrument_id: InstrumentId,
1056        resolution: &str,
1057    ) -> DeribitWsResult<()> {
1058        // Chart channel format: chart.trades.{instrument}.{resolution}
1059        let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1060        self.send_subscribe(vec![channel]).await
1061    }
1062
1063    /// Unsubscribes from chart/OHLC bar updates.
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns an error if unsubscription fails.
1068    pub async fn unsubscribe_chart(
1069        &self,
1070        instrument_id: InstrumentId,
1071        resolution: &str,
1072    ) -> DeribitWsResult<()> {
1073        let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1074        self.send_unsubscribe(vec![channel]).await
1075    }
1076
1077    /// Subscribes to bar updates for an instrument using a BarType specification.
1078    ///
1079    /// Converts the BarType to the nearest supported Deribit resolution and subscribes
1080    /// to the chart channel.
1081    ///
1082    /// # Errors
1083    ///
1084    /// Returns an error if the subscription request fails.
1085    pub async fn subscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1086        let resolution = bar_spec_to_resolution(&bar_type);
1087        self.subscribe_chart(bar_type.instrument_id(), &resolution)
1088            .await
1089    }
1090
1091    /// Unsubscribes from bar updates for an instrument using a BarType specification.
1092    ///
1093    /// # Errors
1094    ///
1095    /// Returns an error if the unsubscription request fails.
1096    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1097        let resolution = bar_spec_to_resolution(&bar_type);
1098        self.unsubscribe_chart(bar_type.instrument_id(), &resolution)
1099            .await
1100    }
1101
1102    /// Checks if authentication is required for the given interval.
1103    ///
1104    /// # Errors
1105    ///
1106    /// Returns an error if raw interval is requested but client is not authenticated.
1107    fn check_auth_requirement(&self, interval: DeribitUpdateInterval) -> DeribitWsResult<()> {
1108        if interval.requires_auth() && !self.is_authenticated() {
1109            return Err(DeribitWsError::Authentication(
1110                "Raw streams require authentication. Call authenticate() first.".to_string(),
1111            ));
1112        }
1113        Ok(())
1114    }
1115
1116    /// Subscribes to user order updates for all instruments.
1117    ///
1118    /// Requires authentication. Subscribes to `user.orders.any.any.raw` channel.
1119    ///
1120    /// # Errors
1121    ///
1122    /// Returns an error if client is not authenticated or subscription fails.
1123    pub async fn subscribe_user_orders(&self) -> DeribitWsResult<()> {
1124        if !self.is_authenticated() {
1125            return Err(DeribitWsError::Authentication(
1126                "User orders subscription requires authentication".to_string(),
1127            ));
1128        }
1129        self.send_subscribe(vec!["user.orders.any.any.raw".to_string()])
1130            .await
1131    }
1132
1133    /// Unsubscribes from user order updates for all instruments.
1134    ///
1135    /// # Errors
1136    ///
1137    /// Returns an error if unsubscription fails.
1138    pub async fn unsubscribe_user_orders(&self) -> DeribitWsResult<()> {
1139        self.send_unsubscribe(vec!["user.orders.any.any.raw".to_string()])
1140            .await
1141    }
1142
1143    /// Subscribes to user trade/fill updates for all instruments.
1144    ///
1145    /// Requires authentication. Subscribes to `user.trades.any.any.raw` channel.
1146    ///
1147    /// # Errors
1148    ///
1149    /// Returns an error if client is not authenticated or subscription fails.
1150    pub async fn subscribe_user_trades(&self) -> DeribitWsResult<()> {
1151        if !self.is_authenticated() {
1152            return Err(DeribitWsError::Authentication(
1153                "User trades subscription requires authentication".to_string(),
1154            ));
1155        }
1156        self.send_subscribe(vec!["user.trades.any.any.raw".to_string()])
1157            .await
1158    }
1159
1160    /// Unsubscribes from user trade/fill updates for all instruments.
1161    ///
1162    /// # Errors
1163    ///
1164    /// Returns an error if unsubscription fails.
1165    pub async fn unsubscribe_user_trades(&self) -> DeribitWsResult<()> {
1166        self.send_unsubscribe(vec!["user.trades.any.any.raw".to_string()])
1167            .await
1168    }
1169
1170    /// Subscribes to user portfolio updates for all currencies.
1171    ///
1172    /// Requires authentication. Subscribes to `user.portfolio.any` channel which
1173    /// provides real-time account balance and margin updates for all currencies
1174    /// (BTC, ETH, USDC, USDT, etc.).
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns an error if client is not authenticated or subscription fails.
1179    pub async fn subscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1180        if !self.is_authenticated() {
1181            return Err(DeribitWsError::Authentication(
1182                "User portfolio subscription requires authentication".to_string(),
1183            ));
1184        }
1185        self.send_subscribe(vec!["user.portfolio.any".to_string()])
1186            .await
1187    }
1188
1189    /// Unsubscribes from user portfolio updates for all currencies.
1190    ///
1191    /// # Errors
1192    ///
1193    /// Returns an error if unsubscription fails.
1194    pub async fn unsubscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1195        self.send_unsubscribe(vec!["user.portfolio.any".to_string()])
1196            .await
1197    }
1198
1199    /// Subscribes to multiple channels at once.
1200    ///
1201    /// # Errors
1202    ///
1203    /// Returns an error if subscription fails.
1204    pub async fn subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1205        self.send_subscribe(channels).await
1206    }
1207
1208    /// Unsubscribes from multiple channels at once.
1209    ///
1210    /// # Errors
1211    ///
1212    /// Returns an error if unsubscription fails.
1213    pub async fn unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1214        self.send_unsubscribe(channels).await
1215    }
1216
1217    /// Submits an order to Deribit via WebSocket.
1218    ///
1219    /// Routes to `private/buy` or `private/sell` JSON-RPC method based on order side.
1220    /// Requires authentication (call `authenticate_session()` first).
1221    ///
1222    /// # Errors
1223    ///
1224    /// Returns an error if:
1225    /// - The client is not authenticated
1226    /// - The command fails to send
1227    pub async fn submit_order(
1228        &self,
1229        order_side: OrderSide,
1230        params: DeribitOrderParams,
1231        client_order_id: ClientOrderId,
1232        trader_id: TraderId,
1233        strategy_id: StrategyId,
1234        instrument_id: InstrumentId,
1235    ) -> DeribitWsResult<()> {
1236        if !self.is_authenticated() {
1237            return Err(DeribitWsError::Authentication(
1238                "Submit order requires authentication. Call authenticate_session() first."
1239                    .to_string(),
1240            ));
1241        }
1242
1243        log::debug!(
1244            "Sending {} order: instrument={}, amount={}, price={:?}, client_order_id={}",
1245            order_side,
1246            params.instrument_name,
1247            params.amount,
1248            params.price,
1249            client_order_id
1250        );
1251
1252        let cmd = match order_side {
1253            OrderSide::Buy => HandlerCommand::Buy {
1254                params,
1255                client_order_id,
1256                trader_id,
1257                strategy_id,
1258                instrument_id,
1259            },
1260            OrderSide::Sell => HandlerCommand::Sell {
1261                params,
1262                client_order_id,
1263                trader_id,
1264                strategy_id,
1265                instrument_id,
1266            },
1267            _ => {
1268                return Err(DeribitWsError::ClientError(format!(
1269                    "Invalid order side: {order_side}"
1270                )));
1271            }
1272        };
1273
1274        self.cmd_tx
1275            .read()
1276            .await
1277            .send(cmd)
1278            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1279
1280        Ok(())
1281    }
1282
1283    /// Modifies an existing order on Deribit via WebSocket.
1284    ///
1285    /// The order parameters are sent using the `private/edit` JSON-RPC method.
1286    /// Requires authentication (call `authenticate_session()` first).
1287    ///
1288    /// # Errors
1289    ///
1290    /// Returns an error if:
1291    /// - The client is not authenticated
1292    /// - The command fails to send
1293    #[allow(clippy::too_many_arguments)]
1294    pub async fn modify_order(
1295        &self,
1296        order_id: &str,
1297        quantity: Quantity,
1298        price: Price,
1299        client_order_id: ClientOrderId,
1300        trader_id: TraderId,
1301        strategy_id: StrategyId,
1302        instrument_id: InstrumentId,
1303    ) -> DeribitWsResult<()> {
1304        if !self.is_authenticated() {
1305            return Err(DeribitWsError::Authentication(
1306                "Modify order requires authentication. Call authenticate_session() first."
1307                    .to_string(),
1308            ));
1309        }
1310
1311        let params = DeribitEditParams {
1312            order_id: order_id.to_string(),
1313            amount: quantity.as_decimal(),
1314            price: Some(price.as_decimal()),
1315            post_only: None,
1316            reject_post_only: None,
1317            reduce_only: None,
1318            trigger_price: None,
1319        };
1320
1321        log::debug!(
1322            "Sending modify order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
1323        );
1324
1325        self.cmd_tx
1326            .read()
1327            .await
1328            .send(HandlerCommand::Edit {
1329                params,
1330                client_order_id,
1331                trader_id,
1332                strategy_id,
1333                instrument_id,
1334            })
1335            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1336
1337        Ok(())
1338    }
1339
1340    /// Cancels an existing order on Deribit via WebSocket.
1341    ///
1342    /// The order is cancelled using the `private/cancel` JSON-RPC method.
1343    /// Requires authentication (call `authenticate_session()` first).
1344    ///
1345    /// # Errors
1346    ///
1347    /// Returns an error if:
1348    /// - The client is not authenticated
1349    /// - The command fails to send
1350    pub async fn cancel_order(
1351        &self,
1352        order_id: &str,
1353        client_order_id: ClientOrderId,
1354        trader_id: TraderId,
1355        strategy_id: StrategyId,
1356        instrument_id: InstrumentId,
1357    ) -> DeribitWsResult<()> {
1358        if !self.is_authenticated() {
1359            return Err(DeribitWsError::Authentication(
1360                "Cancel order requires authentication. Call authenticate_session() first."
1361                    .to_string(),
1362            ));
1363        }
1364
1365        let params = DeribitCancelParams {
1366            order_id: order_id.to_string(),
1367        };
1368
1369        log::debug!("Sending cancel order: order_id={order_id}, client_order_id={client_order_id}");
1370
1371        self.cmd_tx
1372            .read()
1373            .await
1374            .send(HandlerCommand::Cancel {
1375                params,
1376                client_order_id,
1377                trader_id,
1378                strategy_id,
1379                instrument_id,
1380            })
1381            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1382
1383        Ok(())
1384    }
1385
1386    /// Cancels all orders for a specific instrument on Deribit via WebSocket.
1387    ///
1388    /// Uses the `private/cancel_all_by_instrument` JSON-RPC method.
1389    /// Requires authentication (call `authenticate_session()` first).
1390    ///
1391    /// # Errors
1392    ///
1393    /// Returns an error if:
1394    /// - The client is not authenticated
1395    /// - The command fails to send
1396    pub async fn cancel_all_orders(
1397        &self,
1398        instrument_id: InstrumentId,
1399        order_type: Option<String>,
1400    ) -> DeribitWsResult<()> {
1401        if !self.is_authenticated() {
1402            return Err(DeribitWsError::Authentication(
1403                "Cancel all orders requires authentication. Call authenticate_session() first."
1404                    .to_string(),
1405            ));
1406        }
1407
1408        let instrument_name = instrument_id.symbol.to_string();
1409        let params = DeribitCancelAllByInstrumentParams {
1410            instrument_name: instrument_name.clone(),
1411            order_type,
1412        };
1413
1414        log::debug!("Sending cancel_all_orders: instrument={instrument_name}");
1415
1416        self.cmd_tx
1417            .read()
1418            .await
1419            .send(HandlerCommand::CancelAllByInstrument {
1420                params,
1421                instrument_id,
1422            })
1423            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1424
1425        Ok(())
1426    }
1427
1428    /// Queries the state of an order on Deribit via WebSocket.
1429    ///
1430    /// Uses the `private/get_order_state` JSON-RPC method.
1431    /// Requires authentication (call `authenticate_session()` first).
1432    ///
1433    /// # Errors
1434    ///
1435    /// Returns an error if:
1436    /// - The client is not authenticated
1437    /// - The command fails to send
1438    pub async fn query_order(
1439        &self,
1440        order_id: &str,
1441        client_order_id: ClientOrderId,
1442        trader_id: TraderId,
1443        strategy_id: StrategyId,
1444        instrument_id: InstrumentId,
1445    ) -> DeribitWsResult<()> {
1446        if !self.is_authenticated() {
1447            return Err(DeribitWsError::Authentication(
1448                "Query order state requires authentication. Call authenticate_session() first."
1449                    .to_string(),
1450            ));
1451        }
1452
1453        log::debug!("Sending query_order: order_id={order_id}, client_order_id={client_order_id}");
1454
1455        self.cmd_tx
1456            .read()
1457            .await
1458            .send(HandlerCommand::GetOrderState {
1459                order_id: order_id.to_string(),
1460                client_order_id,
1461                trader_id,
1462                strategy_id,
1463                instrument_id,
1464            })
1465            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1466
1467        Ok(())
1468    }
1469}