1use std::{
23 fmt::Debug,
24 num::NonZeroU32,
25 sync::{
26 Arc, LazyLock,
27 atomic::{AtomicBool, AtomicU8, Ordering},
28 },
29 time::Duration,
30};
31
32use arc_swap::ArcSwap;
33use dashmap::DashMap;
34use futures_util::Stream;
35use nautilus_common::live::get_runtime;
36use nautilus_core::{
37 consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt, time::get_atomic_clock_realtime,
38};
39use nautilus_model::{
40 identifiers::InstrumentId,
41 instruments::{Instrument, InstrumentAny},
42};
43use nautilus_network::{
44 http::USER_AGENT,
45 mode::ConnectionMode,
46 ratelimiter::quota::Quota,
47 websocket::{
48 AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
49 channel_message_handler,
50 },
51};
52use tokio_util::sync::CancellationToken;
53use ustr::Ustr;
54
55use super::{
56 auth::{AuthState, send_auth_request, spawn_token_refresh_task},
57 enums::{DeribitUpdateInterval, DeribitWsChannel},
58 error::{DeribitWsError, DeribitWsResult},
59 handler::{DeribitWsFeedHandler, HandlerCommand},
60 messages::NautilusWsMessage,
61};
62use crate::common::{
63 consts::{DERIBIT_TESTNET_WS_URL, DERIBIT_WS_URL},
64 credential::Credential,
65};
66
67pub static DERIBIT_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
69 LazyLock::new(|| Quota::per_second(NonZeroU32::new(20).unwrap()));
70
71const AUTHENTICATION_TIMEOUT_SECS: u64 = 30;
73
74#[derive(Clone)]
76#[cfg_attr(
77 feature = "python",
78 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit")
79)]
80pub struct DeribitWebSocketClient {
81 url: String,
82 is_testnet: bool,
83 heartbeat_interval: Option<u64>,
84 credential: Option<Credential>,
85 is_authenticated: Arc<AtomicBool>,
86 auth_state: Arc<tokio::sync::RwLock<Option<AuthState>>>,
87 signal: Arc<AtomicBool>,
88 connection_mode: Arc<ArcSwap<AtomicU8>>,
89 auth_tracker: AuthTracker,
90 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
91 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
92 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
93 subscriptions_state: SubscriptionState,
94 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
95 cancellation_token: CancellationToken,
96}
97
98impl Debug for DeribitWebSocketClient {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct(stringify!(DeribitWebSocketClient))
101 .field("url", &self.url)
102 .field("is_testnet", &self.is_testnet)
103 .field("has_credentials", &self.credential.is_some())
104 .field(
105 "is_authenticated",
106 &self.is_authenticated.load(Ordering::Relaxed),
107 )
108 .field(
109 "has_auth_state",
110 &self
111 .auth_state
112 .try_read()
113 .map(|s| s.is_some())
114 .unwrap_or(false),
115 )
116 .field("heartbeat_interval", &self.heartbeat_interval)
117 .finish_non_exhaustive()
118 }
119}
120
121impl DeribitWebSocketClient {
122 pub fn new(
128 url: Option<String>,
129 api_key: Option<String>,
130 api_secret: Option<String>,
131 heartbeat_interval: Option<u64>,
132 is_testnet: bool,
133 ) -> anyhow::Result<Self> {
134 let url = url.unwrap_or_else(|| {
135 if is_testnet {
136 DERIBIT_TESTNET_WS_URL.to_string()
137 } else {
138 DERIBIT_WS_URL.to_string()
139 }
140 });
141
142 let credential = Credential::resolve(api_key, api_secret, is_testnet);
144 if credential.is_some() {
145 log::info!("Deribit credentials loaded (testnet={is_testnet})");
146 } else {
147 log::debug!("No Deribit credentials configured - unauthenticated mode");
148 }
149
150 let signal = Arc::new(AtomicBool::new(false));
151 let subscriptions_state = SubscriptionState::new('.');
152
153 Ok(Self {
154 url,
155 is_testnet,
156 heartbeat_interval,
157 credential,
158 is_authenticated: Arc::new(AtomicBool::new(false)),
159 auth_state: Arc::new(tokio::sync::RwLock::new(None)),
160 signal,
161 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
162 ConnectionMode::Closed.as_u8(),
163 ))),
164 auth_tracker: AuthTracker::new(),
165 cmd_tx: {
166 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
167 Arc::new(tokio::sync::RwLock::new(tx))
168 },
169 out_rx: None,
170 task_handle: None,
171 subscriptions_state,
172 instruments_cache: Arc::new(DashMap::new()),
173 cancellation_token: CancellationToken::new(),
174 })
175 }
176
177 pub fn new_public(is_testnet: bool) -> anyhow::Result<Self> {
183 let heartbeat_interval = 10;
184 Self::new(None, None, None, Some(heartbeat_interval), is_testnet)
185 }
186
187 pub fn with_credentials(is_testnet: bool) -> anyhow::Result<Self> {
197 let (key_env, secret_env) = if is_testnet {
198 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
199 } else {
200 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
201 };
202
203 let api_key = get_or_env_var_opt(None, key_env)
204 .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {key_env}"))?;
205 let api_secret = get_or_env_var_opt(None, secret_env)
206 .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {secret_env}"))?;
207
208 let heartbeat_interval = 10;
209 Self::new(
210 None,
211 Some(api_key),
212 Some(api_secret),
213 Some(heartbeat_interval),
214 is_testnet,
215 )
216 }
217
218 fn connection_mode(&self) -> ConnectionMode {
220 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
221 ConnectionMode::from_u8(mode_u8)
222 }
223
224 #[must_use]
226 pub fn is_active(&self) -> bool {
227 self.connection_mode() == ConnectionMode::Active
228 }
229
230 #[must_use]
232 pub fn url(&self) -> &str {
233 &self.url
234 }
235
236 #[must_use]
238 pub fn is_closed(&self) -> bool {
239 self.connection_mode() == ConnectionMode::Disconnect
240 }
241
242 pub fn cancel_all_requests(&self) {
244 self.cancellation_token.cancel();
245 }
246
247 #[must_use]
249 pub fn cancellation_token(&self) -> &CancellationToken {
250 &self.cancellation_token
251 }
252
253 pub async fn wait_until_active(&self, timeout_secs: f64) -> DeribitWsResult<()> {
259 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
260
261 tokio::time::timeout(timeout, async {
262 while !self.is_active() {
263 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
264 }
265 })
266 .await
267 .map_err(|_| {
268 DeribitWsError::Timeout(format!(
269 "WebSocket connection timeout after {timeout_secs} seconds"
270 ))
271 })?;
272
273 Ok(())
274 }
275
276 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
278 self.instruments_cache.clear();
279 for inst in instruments {
280 self.instruments_cache
281 .insert(inst.raw_symbol().inner(), inst);
282 }
283 log::debug!("Cached {} instruments", self.instruments_cache.len());
284 }
285
286 pub fn cache_instrument(&self, instrument: InstrumentAny) {
288 let symbol = instrument.raw_symbol().inner();
289 self.instruments_cache.insert(symbol, instrument);
290
291 if self.is_active() {
293 let tx = self.cmd_tx.clone();
294 let inst = self.instruments_cache.get(&symbol).map(|r| r.clone());
295 if let Some(inst) = inst {
296 get_runtime().spawn(async move {
297 let _ = tx
298 .read()
299 .await
300 .send(HandlerCommand::UpdateInstrument(Box::new(inst)));
301 });
302 }
303 }
304 }
305
306 pub async fn connect(&mut self) -> anyhow::Result<()> {
312 log::info!("Connecting to Deribit WebSocket: {}", self.url);
313
314 self.signal.store(false, Ordering::Relaxed);
316
317 let (message_handler, raw_rx) = channel_message_handler();
319
320 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
322 });
324
325 let config = WebSocketConfig {
327 url: self.url.clone(),
328 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
329 heartbeat: self.heartbeat_interval,
330 heartbeat_msg: None, reconnect_timeout_ms: Some(5_000),
332 reconnect_delay_initial_ms: None,
333 reconnect_delay_max_ms: None,
334 reconnect_backoff_factor: None,
335 reconnect_jitter_ms: None,
336 reconnect_max_attempts: None,
337 };
338
339 let keyed_quotas = vec![("subscription".to_string(), *DERIBIT_WS_SUBSCRIPTION_QUOTA)];
341
342 let ws_client = WebSocketClient::connect(
344 config,
345 Some(message_handler),
346 Some(ping_handler),
347 None, keyed_quotas,
349 Some(*DERIBIT_WS_SUBSCRIPTION_QUOTA), )
351 .await?;
352
353 self.connection_mode
355 .store(ws_client.connection_mode_atomic());
356
357 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
359 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
360
361 *self.cmd_tx.write().await = cmd_tx.clone();
363 self.out_rx = Some(Arc::new(out_rx));
364
365 let mut handler = DeribitWsFeedHandler::new(
367 self.signal.clone(),
368 cmd_rx,
369 raw_rx,
370 out_tx,
371 self.auth_tracker.clone(),
372 self.subscriptions_state.clone(),
373 );
374
375 let _ = cmd_tx.send(HandlerCommand::SetClient(ws_client));
377
378 let instruments: Vec<InstrumentAny> =
380 self.instruments_cache.iter().map(|r| r.clone()).collect();
381 if !instruments.is_empty() {
382 let _ = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments));
383 }
384
385 if let Some(interval) = self.heartbeat_interval {
387 let _ = cmd_tx.send(HandlerCommand::SetHeartbeat { interval });
388 }
389
390 let subscriptions_state = self.subscriptions_state.clone();
392 let credential = self.credential.clone();
393 let is_authenticated = self.is_authenticated.clone();
394 let auth_state = self.auth_state.clone();
395
396 let task_handle = get_runtime().spawn(async move {
397 let mut pending_reauth = false;
399
400 loop {
401 match handler.next().await {
402 Some(msg) => match msg {
403 NautilusWsMessage::Reconnected => {
404 log::info!("Reconnected to Deribit WebSocket");
405
406 let channels = subscriptions_state.all_topics();
409
410 for channel in &channels {
412 subscriptions_state.mark_failure(channel);
413 }
414
415 if let Some(cred) = &credential {
417 log::info!("Re-authenticating after reconnection...");
418
419 is_authenticated.store(false, Ordering::Release);
421 pending_reauth = true;
422
423 let previous_scope = auth_state
425 .read()
426 .await
427 .as_ref()
428 .map(|s| s.scope.clone());
429
430 send_auth_request(cred, previous_scope, &cmd_tx);
432 } else {
433 if !channels.is_empty() {
435 let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
436 }
437 }
438 }
439 NautilusWsMessage::Authenticated(result) => {
440 let timestamp = get_atomic_clock_realtime().get_time_ms();
441 let new_auth_state = AuthState::from_auth_result(&result, timestamp);
442 *auth_state.write().await = Some(new_auth_state);
443
444 spawn_token_refresh_task(
446 result.expires_in,
447 result.refresh_token.clone(),
448 cmd_tx.clone(),
449 );
450
451 if pending_reauth {
452 pending_reauth = false;
453 is_authenticated.store(true, Ordering::Release);
454 log::info!(
455 "Re-authentication successful (scope: {}), resubscribing to channels",
456 result.scope
457 );
458
459 let channels = subscriptions_state.all_topics();
461
462 if !channels.is_empty() {
463 let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
464 }
465 } else {
466 is_authenticated.store(true, Ordering::Release);
468 log::debug!(
469 "Auth state stored: scope={}, expires_in={}s",
470 result.scope,
471 result.expires_in
472 );
473 }
474 }
475 _ => {}
476 },
477 None => {
478 log::debug!("Handler returned None, stopping task");
479 break;
480 }
481 }
482 }
483 });
484
485 self.task_handle = Some(Arc::new(task_handle));
486 log::info!("Connected to Deribit WebSocket");
487
488 Ok(())
489 }
490
491 pub async fn close(&self) -> DeribitWsResult<()> {
497 log::info!("Closing Deribit WebSocket connection");
498 self.signal.store(true, Ordering::Relaxed);
499
500 let _ = self.cmd_tx.read().await.send(HandlerCommand::Disconnect);
501
502 if let Some(_handle) = &self.task_handle {
504 let _ = tokio::time::timeout(Duration::from_secs(5), async {
505 tokio::time::sleep(Duration::from_millis(100)).await;
507 })
508 .await;
509 }
510
511 Ok(())
512 }
513
514 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
520 let rx = self
521 .out_rx
522 .take()
523 .expect("Data stream receiver already taken or not connected");
524 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
525
526 async_stream::stream! {
527 while let Some(msg) = rx.recv().await {
528 yield msg;
529 }
530 }
531 }
532
533 #[must_use]
535 pub fn has_credentials(&self) -> bool {
536 self.credential.is_some()
537 }
538
539 #[must_use]
541 pub fn is_authenticated(&self) -> bool {
542 self.is_authenticated.load(Ordering::Acquire)
543 }
544
545 pub async fn authenticate(&self, session_name: Option<&str>) -> DeribitWsResult<()> {
564 let credential = self.credential.as_ref().ok_or_else(|| {
565 DeribitWsError::Authentication("API credentials not configured".to_string())
566 })?;
567
568 let scope = session_name.map(|name| format!("session:{name}"));
570
571 log::info!(
572 "Authenticating WebSocket with API key: {}, scope: {}",
573 credential.api_key_masked(),
574 scope.as_deref().unwrap_or("connection (default)")
575 );
576
577 let rx = self.auth_tracker.begin();
578
579 let cmd_tx = self.cmd_tx.read().await;
581 send_auth_request(credential, scope, &cmd_tx);
582 drop(cmd_tx);
583
584 match self
586 .auth_tracker
587 .wait_for_result::<DeribitWsError>(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
588 .await
589 {
590 Ok(()) => {
591 self.is_authenticated.store(true, Ordering::Release);
592 log::info!("WebSocket authenticated successfully");
593 Ok(())
594 }
595 Err(e) => {
596 log::error!("WebSocket authentication failed: error={e}");
597 Err(e)
598 }
599 }
600 }
601
602 pub async fn authenticate_session(&self, session_name: &str) -> DeribitWsResult<()> {
611 self.authenticate(Some(session_name)).await
612 }
613
614 pub async fn auth_state(&self) -> Option<AuthState> {
618 self.auth_state.read().await.clone()
619 }
620
621 pub async fn access_token(&self) -> Option<String> {
623 self.auth_state
624 .read()
625 .await
626 .as_ref()
627 .map(|s| s.access_token.clone())
628 }
629
630 async fn send_subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
635 let mut channels_to_subscribe = Vec::new();
636
637 for channel in channels {
638 if self.subscriptions_state.add_reference(&channel) {
639 self.subscriptions_state.mark_subscribe(&channel);
640 channels_to_subscribe.push(channel);
641 } else {
642 log::debug!("Already subscribed to {channel}, skipping duplicate subscription");
643 }
644 }
645
646 if channels_to_subscribe.is_empty() {
647 return Ok(());
648 }
649
650 self.cmd_tx
651 .read()
652 .await
653 .send(HandlerCommand::Subscribe {
654 channels: channels_to_subscribe.clone(),
655 })
656 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
657
658 log::debug!(
659 "Sent subscribe for {} channels",
660 channels_to_subscribe.len()
661 );
662 Ok(())
663 }
664
665 async fn send_unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
666 let mut channels_to_unsubscribe = Vec::new();
667
668 for channel in channels {
669 if self.subscriptions_state.remove_reference(&channel) {
670 self.subscriptions_state.mark_unsubscribe(&channel);
671 channels_to_unsubscribe.push(channel);
672 } else {
673 log::debug!("Still has references to {channel}, skipping unsubscription");
674 }
675 }
676
677 if channels_to_unsubscribe.is_empty() {
678 return Ok(());
679 }
680
681 self.cmd_tx
682 .read()
683 .await
684 .send(HandlerCommand::Unsubscribe {
685 channels: channels_to_unsubscribe.clone(),
686 })
687 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
688
689 log::debug!(
690 "Sent unsubscribe for {} channels",
691 channels_to_unsubscribe.len()
692 );
693 Ok(())
694 }
695
696 pub async fn subscribe_trades(
707 &self,
708 instrument_id: InstrumentId,
709 interval: Option<DeribitUpdateInterval>,
710 ) -> DeribitWsResult<()> {
711 let interval = interval.unwrap_or_default();
712 self.check_auth_requirement(interval)?;
713 let channel =
714 DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
715 self.send_subscribe(vec![channel]).await
716 }
717
718 pub async fn subscribe_trades_raw(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
726 self.subscribe_trades(instrument_id, Some(DeribitUpdateInterval::Raw))
727 .await
728 }
729
730 pub async fn unsubscribe_trades(
736 &self,
737 instrument_id: InstrumentId,
738 interval: Option<DeribitUpdateInterval>,
739 ) -> DeribitWsResult<()> {
740 let interval = interval.unwrap_or_default();
741 let channel =
742 DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
743 self.send_unsubscribe(vec![channel]).await
744 }
745
746 pub async fn subscribe_book(
757 &self,
758 instrument_id: InstrumentId,
759 interval: Option<DeribitUpdateInterval>,
760 ) -> DeribitWsResult<()> {
761 let interval = interval.unwrap_or_default();
762 self.check_auth_requirement(interval)?;
763 let channel =
764 DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
765 self.send_subscribe(vec![channel]).await
766 }
767
768 pub async fn subscribe_book_raw(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
776 self.subscribe_book(instrument_id, Some(DeribitUpdateInterval::Raw))
777 .await
778 }
779
780 pub async fn unsubscribe_book(
786 &self,
787 instrument_id: InstrumentId,
788 interval: Option<DeribitUpdateInterval>,
789 ) -> DeribitWsResult<()> {
790 let interval = interval.unwrap_or_default();
791 let channel =
792 DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
793 self.send_unsubscribe(vec![channel]).await
794 }
795
796 pub async fn subscribe_book_grouped(
804 &self,
805 instrument_id: InstrumentId,
806 group: &str,
807 depth: u32,
808 interval: Option<DeribitUpdateInterval>,
809 ) -> DeribitWsResult<()> {
810 let interval = interval.unwrap_or_default();
811 self.check_auth_requirement(interval)?;
812 let channel = format!(
813 "book.{}.{}.{}.{}",
814 instrument_id.symbol,
815 group,
816 depth,
817 interval.as_str()
818 );
819 self.send_subscribe(vec![channel]).await
820 }
821
822 pub async fn unsubscribe_book_grouped(
828 &self,
829 instrument_id: InstrumentId,
830 group: &str,
831 depth: u32,
832 interval: Option<DeribitUpdateInterval>,
833 ) -> DeribitWsResult<()> {
834 let interval = interval.unwrap_or_default();
835 let channel = format!(
836 "book.{}.{}.{}.{}",
837 instrument_id.symbol,
838 group,
839 depth,
840 interval.as_str()
841 );
842 self.send_unsubscribe(vec![channel]).await
843 }
844
845 pub async fn subscribe_ticker(
856 &self,
857 instrument_id: InstrumentId,
858 interval: Option<DeribitUpdateInterval>,
859 ) -> DeribitWsResult<()> {
860 let interval = interval.unwrap_or_default();
861 self.check_auth_requirement(interval)?;
862 let channel =
863 DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
864 self.send_subscribe(vec![channel]).await
865 }
866
867 pub async fn subscribe_ticker_raw(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
875 self.subscribe_ticker(instrument_id, Some(DeribitUpdateInterval::Raw))
876 .await
877 }
878
879 pub async fn unsubscribe_ticker(
885 &self,
886 instrument_id: InstrumentId,
887 interval: Option<DeribitUpdateInterval>,
888 ) -> DeribitWsResult<()> {
889 let interval = interval.unwrap_or_default();
890 let channel =
891 DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
892 self.send_unsubscribe(vec![channel]).await
893 }
894
895 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
903 let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
904 self.send_subscribe(vec![channel]).await
905 }
906
907 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
913 let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
914 self.send_unsubscribe(vec![channel]).await
915 }
916
917 pub async fn subscribe_instrument_state(
925 &self,
926 kind: &str,
927 currency: &str,
928 ) -> DeribitWsResult<()> {
929 let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
930 self.send_subscribe(vec![channel]).await
931 }
932
933 pub async fn unsubscribe_instrument_state(
939 &self,
940 kind: &str,
941 currency: &str,
942 ) -> DeribitWsResult<()> {
943 let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
944 self.send_unsubscribe(vec![channel]).await
945 }
946
947 pub async fn subscribe_perpetual_interests_rates_updates(
955 &self,
956 instrument_id: InstrumentId,
957 interval: Option<DeribitUpdateInterval>,
958 ) -> DeribitWsResult<()> {
959 let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
960 let channel = DeribitWsChannel::Perpetual
961 .format_channel(instrument_id.symbol.as_str(), Some(interval));
962
963 self.send_subscribe(vec![channel]).await
964 }
965
966 pub async fn unsubscribe_perpetual_interest_rates_updates(
972 &self,
973 instrument_id: InstrumentId,
974 interval: Option<DeribitUpdateInterval>,
975 ) -> DeribitWsResult<()> {
976 let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
977 let channel = DeribitWsChannel::Perpetual
978 .format_channel(instrument_id.symbol.as_str(), Some(interval));
979
980 self.send_unsubscribe(vec![channel]).await
981 }
982
983 pub async fn subscribe_chart(
995 &self,
996 instrument_id: InstrumentId,
997 resolution: &str,
998 ) -> DeribitWsResult<()> {
999 let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1001 self.send_subscribe(vec![channel]).await
1002 }
1003
1004 pub async fn unsubscribe_chart(
1010 &self,
1011 instrument_id: InstrumentId,
1012 resolution: &str,
1013 ) -> DeribitWsResult<()> {
1014 let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1015 self.send_unsubscribe(vec![channel]).await
1016 }
1017
1018 fn check_auth_requirement(&self, interval: DeribitUpdateInterval) -> DeribitWsResult<()> {
1024 if interval.requires_auth() && !self.is_authenticated() {
1025 return Err(DeribitWsError::Authentication(
1026 "Raw streams require authentication. Call authenticate() first.".to_string(),
1027 ));
1028 }
1029 Ok(())
1030 }
1031
1032 pub async fn subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1038 self.send_subscribe(channels).await
1039 }
1040
1041 pub async fn unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1047 self.send_unsubscribe(channels).await
1048 }
1049}