nautilus_deribit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Deribit API.
17//!
18//! The [`DeribitWebSocketClient`] provides connectivity to Deribit's WebSocket API using
19//! JSON-RPC 2.0. It supports subscribing to market data channels including trades, order books,
20//! and tickers.
21
22use std::{
23    fmt::Debug,
24    num::NonZeroU32,
25    sync::{
26        Arc, LazyLock,
27        atomic::{AtomicBool, AtomicU8, Ordering},
28    },
29    time::Duration,
30};
31
32use arc_swap::ArcSwap;
33use dashmap::DashMap;
34use futures_util::Stream;
35use nautilus_common::live::get_runtime;
36use nautilus_core::{
37    consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt, time::get_atomic_clock_realtime,
38};
39use nautilus_model::{
40    identifiers::InstrumentId,
41    instruments::{Instrument, InstrumentAny},
42};
43use nautilus_network::{
44    http::USER_AGENT,
45    mode::ConnectionMode,
46    ratelimiter::quota::Quota,
47    websocket::{
48        AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
49        channel_message_handler,
50    },
51};
52use tokio_util::sync::CancellationToken;
53use ustr::Ustr;
54
55use super::{
56    auth::{AuthState, send_auth_request, spawn_token_refresh_task},
57    enums::{DeribitUpdateInterval, DeribitWsChannel},
58    error::{DeribitWsError, DeribitWsResult},
59    handler::{DeribitWsFeedHandler, HandlerCommand},
60    messages::NautilusWsMessage,
61};
62use crate::common::{
63    consts::{DERIBIT_TESTNET_WS_URL, DERIBIT_WS_URL},
64    credential::Credential,
65};
66
67/// Default Deribit WebSocket subscription rate limit: 20 requests per second.
68pub static DERIBIT_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
69    LazyLock::new(|| Quota::per_second(NonZeroU32::new(20).unwrap()));
70
71/// Authentication timeout in seconds.
72const AUTHENTICATION_TIMEOUT_SECS: u64 = 30;
73
74/// WebSocket client for connecting to Deribit.
75#[derive(Clone)]
76#[cfg_attr(
77    feature = "python",
78    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit")
79)]
80pub struct DeribitWebSocketClient {
81    url: String,
82    is_testnet: bool,
83    heartbeat_interval: Option<u64>,
84    credential: Option<Credential>,
85    is_authenticated: Arc<AtomicBool>,
86    auth_state: Arc<tokio::sync::RwLock<Option<AuthState>>>,
87    signal: Arc<AtomicBool>,
88    connection_mode: Arc<ArcSwap<AtomicU8>>,
89    auth_tracker: AuthTracker,
90    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
91    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
92    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
93    subscriptions_state: SubscriptionState,
94    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
95    cancellation_token: CancellationToken,
96}
97
98impl Debug for DeribitWebSocketClient {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct(stringify!(DeribitWebSocketClient))
101            .field("url", &self.url)
102            .field("is_testnet", &self.is_testnet)
103            .field("has_credentials", &self.credential.is_some())
104            .field(
105                "is_authenticated",
106                &self.is_authenticated.load(Ordering::Relaxed),
107            )
108            .field(
109                "has_auth_state",
110                &self
111                    .auth_state
112                    .try_read()
113                    .map(|s| s.is_some())
114                    .unwrap_or(false),
115            )
116            .field("heartbeat_interval", &self.heartbeat_interval)
117            .finish_non_exhaustive()
118    }
119}
120
121impl DeribitWebSocketClient {
122    /// Creates a new [`DeribitWebSocketClient`] instance.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if only one of `api_key` or `api_secret` is provided.
127    pub fn new(
128        url: Option<String>,
129        api_key: Option<String>,
130        api_secret: Option<String>,
131        heartbeat_interval: Option<u64>,
132        is_testnet: bool,
133    ) -> anyhow::Result<Self> {
134        let url = url.unwrap_or_else(|| {
135            if is_testnet {
136                DERIBIT_TESTNET_WS_URL.to_string()
137            } else {
138                DERIBIT_WS_URL.to_string()
139            }
140        });
141
142        // Resolve credential from config or environment variables
143        let credential = Credential::resolve(api_key, api_secret, is_testnet);
144        if credential.is_some() {
145            log::info!("Deribit credentials loaded (testnet={is_testnet})");
146        } else {
147            log::debug!("No Deribit credentials configured - unauthenticated mode");
148        }
149
150        let signal = Arc::new(AtomicBool::new(false));
151        let subscriptions_state = SubscriptionState::new('.');
152
153        Ok(Self {
154            url,
155            is_testnet,
156            heartbeat_interval,
157            credential,
158            is_authenticated: Arc::new(AtomicBool::new(false)),
159            auth_state: Arc::new(tokio::sync::RwLock::new(None)),
160            signal,
161            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
162                ConnectionMode::Closed.as_u8(),
163            ))),
164            auth_tracker: AuthTracker::new(),
165            cmd_tx: {
166                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
167                Arc::new(tokio::sync::RwLock::new(tx))
168            },
169            out_rx: None,
170            task_handle: None,
171            subscriptions_state,
172            instruments_cache: Arc::new(DashMap::new()),
173            cancellation_token: CancellationToken::new(),
174        })
175    }
176
177    /// Creates a new public (unauthenticated) client.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if initialization fails.
182    pub fn new_public(is_testnet: bool) -> anyhow::Result<Self> {
183        let heartbeat_interval = 10;
184        Self::new(None, None, None, Some(heartbeat_interval), is_testnet)
185    }
186
187    /// Creates an authenticated client with credentials.
188    ///
189    /// Uses environment variables to load credentials:
190    /// - Testnet: `DERIBIT_TESTNET_API_KEY` and `DERIBIT_TESTNET_API_SECRET`
191    /// - Mainnet: `DERIBIT_API_KEY` and `DERIBIT_API_SECRET`
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if credentials are not found in environment variables.
196    pub fn with_credentials(is_testnet: bool) -> anyhow::Result<Self> {
197        let (key_env, secret_env) = if is_testnet {
198            ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
199        } else {
200            ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
201        };
202
203        let api_key = get_or_env_var_opt(None, key_env)
204            .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {key_env}"))?;
205        let api_secret = get_or_env_var_opt(None, secret_env)
206            .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {secret_env}"))?;
207
208        let heartbeat_interval = 10;
209        Self::new(
210            None,
211            Some(api_key),
212            Some(api_secret),
213            Some(heartbeat_interval),
214            is_testnet,
215        )
216    }
217
218    /// Returns the current connection mode.
219    fn connection_mode(&self) -> ConnectionMode {
220        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
221        ConnectionMode::from_u8(mode_u8)
222    }
223
224    /// Returns whether the client is actively connected.
225    #[must_use]
226    pub fn is_active(&self) -> bool {
227        self.connection_mode() == ConnectionMode::Active
228    }
229
230    /// Returns the WebSocket URL.
231    #[must_use]
232    pub fn url(&self) -> &str {
233        &self.url
234    }
235
236    /// Returns whether the client is closed.
237    #[must_use]
238    pub fn is_closed(&self) -> bool {
239        self.connection_mode() == ConnectionMode::Disconnect
240    }
241
242    /// Cancel all pending WebSocket requests.
243    pub fn cancel_all_requests(&self) {
244        self.cancellation_token.cancel();
245    }
246
247    /// Returns the cancellation token for this client.
248    #[must_use]
249    pub fn cancellation_token(&self) -> &CancellationToken {
250        &self.cancellation_token
251    }
252
253    /// Waits until the client is active or timeout expires.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if the timeout expires before the client becomes active.
258    pub async fn wait_until_active(&self, timeout_secs: f64) -> DeribitWsResult<()> {
259        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
260
261        tokio::time::timeout(timeout, async {
262            while !self.is_active() {
263                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
264            }
265        })
266        .await
267        .map_err(|_| {
268            DeribitWsError::Timeout(format!(
269                "WebSocket connection timeout after {timeout_secs} seconds"
270            ))
271        })?;
272
273        Ok(())
274    }
275
276    /// Caches instruments for use during message parsing.
277    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
278        self.instruments_cache.clear();
279        for inst in instruments {
280            self.instruments_cache
281                .insert(inst.raw_symbol().inner(), inst);
282        }
283        log::debug!("Cached {} instruments", self.instruments_cache.len());
284    }
285
286    /// Caches a single instrument.
287    pub fn cache_instrument(&self, instrument: InstrumentAny) {
288        let symbol = instrument.raw_symbol().inner();
289        self.instruments_cache.insert(symbol, instrument);
290
291        // If connected, send update to handler
292        if self.is_active() {
293            let tx = self.cmd_tx.clone();
294            let inst = self.instruments_cache.get(&symbol).map(|r| r.clone());
295            if let Some(inst) = inst {
296                get_runtime().spawn(async move {
297                    let _ = tx
298                        .read()
299                        .await
300                        .send(HandlerCommand::UpdateInstrument(Box::new(inst)));
301                });
302            }
303        }
304    }
305
306    /// Connects to the Deribit WebSocket API.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the connection fails.
311    pub async fn connect(&mut self) -> anyhow::Result<()> {
312        log::info!("Connecting to Deribit WebSocket: {}", self.url);
313
314        // Reset stop signal
315        self.signal.store(false, Ordering::Relaxed);
316
317        // Create message handler and channel
318        let (message_handler, raw_rx) = channel_message_handler();
319
320        // No-op ping handler: handler responds to pings directly
321        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
322            // Handler responds to pings internally
323        });
324
325        // Configure WebSocket client
326        let config = WebSocketConfig {
327            url: self.url.clone(),
328            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
329            heartbeat: self.heartbeat_interval,
330            heartbeat_msg: None, // Deribit uses JSON-RPC heartbeat, not text ping
331            reconnect_timeout_ms: Some(5_000),
332            reconnect_delay_initial_ms: None,
333            reconnect_delay_max_ms: None,
334            reconnect_backoff_factor: None,
335            reconnect_jitter_ms: None,
336            reconnect_max_attempts: None,
337        };
338
339        // Configure rate limits
340        let keyed_quotas = vec![("subscription".to_string(), *DERIBIT_WS_SUBSCRIPTION_QUOTA)];
341
342        // Connect the WebSocket
343        let ws_client = WebSocketClient::connect(
344            config,
345            Some(message_handler),
346            Some(ping_handler),
347            None, // post_reconnection
348            keyed_quotas,
349            Some(*DERIBIT_WS_SUBSCRIPTION_QUOTA), // Default quota
350        )
351        .await?;
352
353        // Store connection mode
354        self.connection_mode
355            .store(ws_client.connection_mode_atomic());
356
357        // Create message channels
358        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
359        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
360
361        // Store command sender and output receiver
362        *self.cmd_tx.write().await = cmd_tx.clone();
363        self.out_rx = Some(Arc::new(out_rx));
364
365        // Create handler
366        let mut handler = DeribitWsFeedHandler::new(
367            self.signal.clone(),
368            cmd_rx,
369            raw_rx,
370            out_tx,
371            self.auth_tracker.clone(),
372            self.subscriptions_state.clone(),
373        );
374
375        // Send client to handler
376        let _ = cmd_tx.send(HandlerCommand::SetClient(ws_client));
377
378        // Replay cached instruments
379        let instruments: Vec<InstrumentAny> =
380            self.instruments_cache.iter().map(|r| r.clone()).collect();
381        if !instruments.is_empty() {
382            let _ = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments));
383        }
384
385        // Enable heartbeat if configured
386        if let Some(interval) = self.heartbeat_interval {
387            let _ = cmd_tx.send(HandlerCommand::SetHeartbeat { interval });
388        }
389
390        // Spawn handler task
391        let subscriptions_state = self.subscriptions_state.clone();
392        let credential = self.credential.clone();
393        let is_authenticated = self.is_authenticated.clone();
394        let auth_state = self.auth_state.clone();
395
396        let task_handle = get_runtime().spawn(async move {
397            // Track if we're waiting for re-authentication after reconnection
398            let mut pending_reauth = false;
399
400            loop {
401                match handler.next().await {
402                    Some(msg) => match msg {
403                        NautilusWsMessage::Reconnected => {
404                            log::info!("Reconnected to Deribit WebSocket");
405
406                            // Get all subscriptions that should be restored
407                            // all_topics() returns confirmed + pending_subscribe, excluding pending_unsubscribe
408                            let channels = subscriptions_state.all_topics();
409
410                            // Mark each channel as failed (transitions confirmed → pending_subscribe)
411                            for channel in &channels {
412                                subscriptions_state.mark_failure(channel);
413                            }
414
415                            // Check if we need to re-authenticate
416                            if let Some(cred) = &credential {
417                                log::info!("Re-authenticating after reconnection...");
418
419                                // Reset authenticated state
420                                is_authenticated.store(false, Ordering::Release);
421                                pending_reauth = true;
422
423                                // Get the previously used scope for re-authentication
424                                let previous_scope = auth_state
425                                    .read()
426                                    .await
427                                    .as_ref()
428                                    .map(|s| s.scope.clone());
429
430                                // Send re-authentication request
431                                send_auth_request(cred, previous_scope, &cmd_tx);
432                            } else {
433                                // No credentials - resubscribe immediately
434                                if !channels.is_empty() {
435                                    let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
436                                }
437                            }
438                        }
439                        NautilusWsMessage::Authenticated(result) => {
440                            let timestamp = get_atomic_clock_realtime().get_time_ms();
441                            let new_auth_state = AuthState::from_auth_result(&result, timestamp);
442                            *auth_state.write().await = Some(new_auth_state);
443
444                            // Spawn background token refresh task
445                            spawn_token_refresh_task(
446                                result.expires_in,
447                                result.refresh_token.clone(),
448                                cmd_tx.clone(),
449                            );
450
451                            if pending_reauth {
452                                pending_reauth = false;
453                                is_authenticated.store(true, Ordering::Release);
454                                log::info!(
455                                    "Re-authentication successful (scope: {}), resubscribing to channels",
456                                    result.scope
457                                );
458
459                                // Now resubscribe to all channels using all_topics()
460                                let channels = subscriptions_state.all_topics();
461
462                                if !channels.is_empty() {
463                                    let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
464                                }
465                            } else {
466                                // Initial authentication completed
467                                is_authenticated.store(true, Ordering::Release);
468                                log::debug!(
469                                    "Auth state stored: scope={}, expires_in={}s",
470                                    result.scope,
471                                    result.expires_in
472                                );
473                            }
474                        }
475                        _ => {}
476                    },
477                    None => {
478                        log::debug!("Handler returned None, stopping task");
479                        break;
480                    }
481                }
482            }
483        });
484
485        self.task_handle = Some(Arc::new(task_handle));
486        log::info!("Connected to Deribit WebSocket");
487
488        Ok(())
489    }
490
491    /// Closes the WebSocket connection.
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if the close operation fails.
496    pub async fn close(&self) -> DeribitWsResult<()> {
497        log::info!("Closing Deribit WebSocket connection");
498        self.signal.store(true, Ordering::Relaxed);
499
500        let _ = self.cmd_tx.read().await.send(HandlerCommand::Disconnect);
501
502        // Wait for task to complete
503        if let Some(_handle) = &self.task_handle {
504            let _ = tokio::time::timeout(Duration::from_secs(5), async {
505                // Can't actually await the handle since we don't own it
506                tokio::time::sleep(Duration::from_millis(100)).await;
507            })
508            .await;
509        }
510
511        Ok(())
512    }
513
514    /// Returns a stream of WebSocket messages.
515    ///
516    /// # Panics
517    ///
518    /// Panics if called before `connect()` or if called twice.
519    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
520        let rx = self
521            .out_rx
522            .take()
523            .expect("Data stream receiver already taken or not connected");
524        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
525
526        async_stream::stream! {
527            while let Some(msg) = rx.recv().await {
528                yield msg;
529            }
530        }
531    }
532
533    /// Returns whether the client has credentials configured.
534    #[must_use]
535    pub fn has_credentials(&self) -> bool {
536        self.credential.is_some()
537    }
538
539    /// Returns whether the client is authenticated.
540    #[must_use]
541    pub fn is_authenticated(&self) -> bool {
542        self.is_authenticated.load(Ordering::Acquire)
543    }
544
545    /// Authenticates the WebSocket session with Deribit.
546    ///
547    /// Uses the `client_signature` grant type with HMAC-SHA256 signature.
548    /// This must be called before subscribing to raw data streams.
549    ///
550    /// # Arguments
551    ///
552    /// * `session_name` - Optional session name for session-scoped authentication.
553    ///   When provided, uses `session:<name>` scope which allows skipping `access_token`
554    ///   in subsequent private requests. When `None`, uses default `connection` scope.
555    ///   Recommended to use session scope for order execution compatibility.
556    ///
557    /// # Errors
558    ///
559    /// Returns an error if:
560    /// - No credentials are configured
561    /// - The authentication request fails
562    /// - The authentication times out
563    pub async fn authenticate(&self, session_name: Option<&str>) -> DeribitWsResult<()> {
564        let credential = self.credential.as_ref().ok_or_else(|| {
565            DeribitWsError::Authentication("API credentials not configured".to_string())
566        })?;
567
568        // Determine scope
569        let scope = session_name.map(|name| format!("session:{name}"));
570
571        log::info!(
572            "Authenticating WebSocket with API key: {}, scope: {}",
573            credential.api_key_masked(),
574            scope.as_deref().unwrap_or("connection (default)")
575        );
576
577        let rx = self.auth_tracker.begin();
578
579        // Send authentication request
580        let cmd_tx = self.cmd_tx.read().await;
581        send_auth_request(credential, scope, &cmd_tx);
582        drop(cmd_tx);
583
584        // Wait for authentication result with timeout
585        match self
586            .auth_tracker
587            .wait_for_result::<DeribitWsError>(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
588            .await
589        {
590            Ok(()) => {
591                self.is_authenticated.store(true, Ordering::Release);
592                log::info!("WebSocket authenticated successfully");
593                Ok(())
594            }
595            Err(e) => {
596                log::error!("WebSocket authentication failed: error={e}");
597                Err(e)
598            }
599        }
600    }
601
602    /// Authenticates with session scope using the provided session name.
603    ///
604    /// Use `DERIBIT_DATA_SESSION_NAME` for data clients and
605    /// `DERIBIT_EXECUTION_SESSION_NAME` for execution clients.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if authentication fails.
610    pub async fn authenticate_session(&self, session_name: &str) -> DeribitWsResult<()> {
611        self.authenticate(Some(session_name)).await
612    }
613
614    /// Returns the current authentication state containing tokens.
615    ///
616    /// Returns `None` if not authenticated or tokens haven't been stored yet.
617    pub async fn auth_state(&self) -> Option<AuthState> {
618        self.auth_state.read().await.clone()
619    }
620
621    /// Returns the current access token if available.
622    pub async fn access_token(&self) -> Option<String> {
623        self.auth_state
624            .read()
625            .await
626            .as_ref()
627            .map(|s| s.access_token.clone())
628    }
629
630    // ------------------------------------------------------------------------------------------------
631    // Subscription Methods
632    // ------------------------------------------------------------------------------------------------
633
634    async fn send_subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
635        let mut channels_to_subscribe = Vec::new();
636
637        for channel in channels {
638            if self.subscriptions_state.add_reference(&channel) {
639                self.subscriptions_state.mark_subscribe(&channel);
640                channels_to_subscribe.push(channel);
641            } else {
642                log::debug!("Already subscribed to {channel}, skipping duplicate subscription");
643            }
644        }
645
646        if channels_to_subscribe.is_empty() {
647            return Ok(());
648        }
649
650        self.cmd_tx
651            .read()
652            .await
653            .send(HandlerCommand::Subscribe {
654                channels: channels_to_subscribe.clone(),
655            })
656            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
657
658        log::debug!(
659            "Sent subscribe for {} channels",
660            channels_to_subscribe.len()
661        );
662        Ok(())
663    }
664
665    async fn send_unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
666        let mut channels_to_unsubscribe = Vec::new();
667
668        for channel in channels {
669            if self.subscriptions_state.remove_reference(&channel) {
670                self.subscriptions_state.mark_unsubscribe(&channel);
671                channels_to_unsubscribe.push(channel);
672            } else {
673                log::debug!("Still has references to {channel}, skipping unsubscription");
674            }
675        }
676
677        if channels_to_unsubscribe.is_empty() {
678            return Ok(());
679        }
680
681        self.cmd_tx
682            .read()
683            .await
684            .send(HandlerCommand::Unsubscribe {
685                channels: channels_to_unsubscribe.clone(),
686            })
687            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
688
689        log::debug!(
690            "Sent unsubscribe for {} channels",
691            channels_to_unsubscribe.len()
692        );
693        Ok(())
694    }
695
696    /// Subscribes to trade updates for an instrument.
697    ///
698    /// # Arguments
699    ///
700    /// * `instrument_id` - The instrument to subscribe to
701    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
702    ///
703    /// # Errors
704    ///
705    /// Returns an error if subscription fails or raw is requested without authentication.
706    pub async fn subscribe_trades(
707        &self,
708        instrument_id: InstrumentId,
709        interval: Option<DeribitUpdateInterval>,
710    ) -> DeribitWsResult<()> {
711        let interval = interval.unwrap_or_default();
712        self.check_auth_requirement(interval)?;
713        let channel =
714            DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
715        self.send_subscribe(vec![channel]).await
716    }
717
718    /// Subscribes to raw trade updates (requires authentication).
719    ///
720    /// Convenience method equivalent to `subscribe_trades(id, Some(DeribitUpdateInterval::Raw))`.
721    ///
722    /// # Errors
723    ///
724    /// Returns an error if not authenticated or subscription fails.
725    pub async fn subscribe_trades_raw(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
726        self.subscribe_trades(instrument_id, Some(DeribitUpdateInterval::Raw))
727            .await
728    }
729
730    /// Unsubscribes from trade updates for an instrument.
731    ///
732    /// # Errors
733    ///
734    /// Returns an error if unsubscription fails.
735    pub async fn unsubscribe_trades(
736        &self,
737        instrument_id: InstrumentId,
738        interval: Option<DeribitUpdateInterval>,
739    ) -> DeribitWsResult<()> {
740        let interval = interval.unwrap_or_default();
741        let channel =
742            DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
743        self.send_unsubscribe(vec![channel]).await
744    }
745
746    /// Subscribes to order book updates for an instrument.
747    ///
748    /// # Arguments
749    ///
750    /// * `instrument_id` - The instrument to subscribe to
751    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
752    ///
753    /// # Errors
754    ///
755    /// Returns an error if subscription fails or raw is requested without authentication.
756    pub async fn subscribe_book(
757        &self,
758        instrument_id: InstrumentId,
759        interval: Option<DeribitUpdateInterval>,
760    ) -> DeribitWsResult<()> {
761        let interval = interval.unwrap_or_default();
762        self.check_auth_requirement(interval)?;
763        let channel =
764            DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
765        self.send_subscribe(vec![channel]).await
766    }
767
768    /// Subscribes to raw order book updates (requires authentication).
769    ///
770    /// Convenience method equivalent to `subscribe_book(id, Some(DeribitUpdateInterval::Raw))`.
771    ///
772    /// # Errors
773    ///
774    /// Returns an error if not authenticated or subscription fails.
775    pub async fn subscribe_book_raw(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
776        self.subscribe_book(instrument_id, Some(DeribitUpdateInterval::Raw))
777            .await
778    }
779
780    /// Unsubscribes from order book updates for an instrument.
781    ///
782    /// # Errors
783    ///
784    /// Returns an error if unsubscription fails.
785    pub async fn unsubscribe_book(
786        &self,
787        instrument_id: InstrumentId,
788        interval: Option<DeribitUpdateInterval>,
789    ) -> DeribitWsResult<()> {
790        let interval = interval.unwrap_or_default();
791        let channel =
792            DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
793        self.send_unsubscribe(vec![channel]).await
794    }
795
796    /// Subscribes to grouped (depth-limited) order book updates for an instrument.
797    ///
798    /// Uses the Deribit grouped book channel format: `book.{instrument}.{group}.{depth}.{interval}`
799    ///
800    /// # Errors
801    ///
802    /// Returns an error if subscription fails or raw is requested without authentication.
803    pub async fn subscribe_book_grouped(
804        &self,
805        instrument_id: InstrumentId,
806        group: &str,
807        depth: u32,
808        interval: Option<DeribitUpdateInterval>,
809    ) -> DeribitWsResult<()> {
810        let interval = interval.unwrap_or_default();
811        self.check_auth_requirement(interval)?;
812        let channel = format!(
813            "book.{}.{}.{}.{}",
814            instrument_id.symbol,
815            group,
816            depth,
817            interval.as_str()
818        );
819        self.send_subscribe(vec![channel]).await
820    }
821
822    /// Unsubscribes from grouped (depth-limited) order book updates for an instrument.
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if unsubscription fails.
827    pub async fn unsubscribe_book_grouped(
828        &self,
829        instrument_id: InstrumentId,
830        group: &str,
831        depth: u32,
832        interval: Option<DeribitUpdateInterval>,
833    ) -> DeribitWsResult<()> {
834        let interval = interval.unwrap_or_default();
835        let channel = format!(
836            "book.{}.{}.{}.{}",
837            instrument_id.symbol,
838            group,
839            depth,
840            interval.as_str()
841        );
842        self.send_unsubscribe(vec![channel]).await
843    }
844
845    /// Subscribes to ticker updates for an instrument.
846    ///
847    /// # Arguments
848    ///
849    /// * `instrument_id` - The instrument to subscribe to
850    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
851    ///
852    /// # Errors
853    ///
854    /// Returns an error if subscription fails or raw is requested without authentication.
855    pub async fn subscribe_ticker(
856        &self,
857        instrument_id: InstrumentId,
858        interval: Option<DeribitUpdateInterval>,
859    ) -> DeribitWsResult<()> {
860        let interval = interval.unwrap_or_default();
861        self.check_auth_requirement(interval)?;
862        let channel =
863            DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
864        self.send_subscribe(vec![channel]).await
865    }
866
867    /// Subscribes to raw ticker updates (requires authentication).
868    ///
869    /// Convenience method equivalent to `subscribe_ticker(id, Some(DeribitUpdateInterval::Raw))`.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if not authenticated or subscription fails.
874    pub async fn subscribe_ticker_raw(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
875        self.subscribe_ticker(instrument_id, Some(DeribitUpdateInterval::Raw))
876            .await
877    }
878
879    /// Unsubscribes from ticker updates for an instrument.
880    ///
881    /// # Errors
882    ///
883    /// Returns an error if unsubscription fails.
884    pub async fn unsubscribe_ticker(
885        &self,
886        instrument_id: InstrumentId,
887        interval: Option<DeribitUpdateInterval>,
888    ) -> DeribitWsResult<()> {
889        let interval = interval.unwrap_or_default();
890        let channel =
891            DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
892        self.send_unsubscribe(vec![channel]).await
893    }
894
895    /// Subscribes to quote (best bid/ask) updates for an instrument.
896    ///
897    /// Note: Quote channel does not support interval parameter.
898    ///
899    /// # Errors
900    ///
901    /// Returns an error if subscription fails.
902    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
903        let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
904        self.send_subscribe(vec![channel]).await
905    }
906
907    /// Unsubscribes from quote updates for an instrument.
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if unsubscription fails.
912    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
913        let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
914        self.send_unsubscribe(vec![channel]).await
915    }
916
917    /// Subscribes to instrument state changes for lifecycle notifications.
918    ///
919    /// Channel format: `instrument.state.{kind}.{currency}`
920    ///
921    /// # Errors
922    ///
923    /// Returns an error if subscription fails.
924    pub async fn subscribe_instrument_state(
925        &self,
926        kind: &str,
927        currency: &str,
928    ) -> DeribitWsResult<()> {
929        let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
930        self.send_subscribe(vec![channel]).await
931    }
932
933    /// Unsubscribes from instrument state changes.
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if unsubscription fails.
938    pub async fn unsubscribe_instrument_state(
939        &self,
940        kind: &str,
941        currency: &str,
942    ) -> DeribitWsResult<()> {
943        let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
944        self.send_unsubscribe(vec![channel]).await
945    }
946
947    /// Subscribes to perpetual interest rates updates.
948    ///
949    /// Channel format: `perpetual.{instrument_name}.{interval}`
950    ///
951    /// # Errors
952    ///
953    /// Returns an error if subscription fails.
954    pub async fn subscribe_perpetual_interests_rates_updates(
955        &self,
956        instrument_id: InstrumentId,
957        interval: Option<DeribitUpdateInterval>,
958    ) -> DeribitWsResult<()> {
959        let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
960        let channel = DeribitWsChannel::Perpetual
961            .format_channel(instrument_id.symbol.as_str(), Some(interval));
962
963        self.send_subscribe(vec![channel]).await
964    }
965
966    /// Unsubscribes from perpetual interest rates updates.
967    ///
968    /// # Errors
969    ///
970    /// Returns an error if subscription fails.
971    pub async fn unsubscribe_perpetual_interest_rates_updates(
972        &self,
973        instrument_id: InstrumentId,
974        interval: Option<DeribitUpdateInterval>,
975    ) -> DeribitWsResult<()> {
976        let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
977        let channel = DeribitWsChannel::Perpetual
978            .format_channel(instrument_id.symbol.as_str(), Some(interval));
979
980        self.send_unsubscribe(vec![channel]).await
981    }
982
983    /// Subscribes to chart/OHLC bar updates for an instrument.
984    ///
985    /// # Arguments
986    ///
987    /// * `instrument_id` - The instrument to subscribe to
988    /// * `resolution` - Bar resolution: "1", "3", "5", "10", "15", "30", "60", "120", "180",
989    ///   "360", "720", "1D" (minutes or 1D for daily)
990    ///
991    /// # Errors
992    ///
993    /// Returns an error if subscription fails.
994    pub async fn subscribe_chart(
995        &self,
996        instrument_id: InstrumentId,
997        resolution: &str,
998    ) -> DeribitWsResult<()> {
999        // Chart channel format: chart.trades.{instrument}.{resolution}
1000        let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1001        self.send_subscribe(vec![channel]).await
1002    }
1003
1004    /// Unsubscribes from chart/OHLC bar updates.
1005    ///
1006    /// # Errors
1007    ///
1008    /// Returns an error if unsubscription fails.
1009    pub async fn unsubscribe_chart(
1010        &self,
1011        instrument_id: InstrumentId,
1012        resolution: &str,
1013    ) -> DeribitWsResult<()> {
1014        let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1015        self.send_unsubscribe(vec![channel]).await
1016    }
1017
1018    /// Checks if authentication is required for the given interval.
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns an error if raw interval is requested but client is not authenticated.
1023    fn check_auth_requirement(&self, interval: DeribitUpdateInterval) -> DeribitWsResult<()> {
1024        if interval.requires_auth() && !self.is_authenticated() {
1025            return Err(DeribitWsError::Authentication(
1026                "Raw streams require authentication. Call authenticate() first.".to_string(),
1027            ));
1028        }
1029        Ok(())
1030    }
1031
1032    /// Subscribes to multiple channels at once.
1033    ///
1034    /// # Errors
1035    ///
1036    /// Returns an error if subscription fails.
1037    pub async fn subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1038        self.send_subscribe(channels).await
1039    }
1040
1041    /// Unsubscribes from multiple channels at once.
1042    ///
1043    /// # Errors
1044    ///
1045    /// Returns an error if unsubscription fails.
1046    pub async fn unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1047        self.send_unsubscribe(channels).await
1048    }
1049}