1use std::{
23 fmt::Debug,
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::{enums::LogColor, live::get_runtime, log_info};
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt, time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39 data::BarType,
40 enums::OrderSide,
41 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
42 instruments::{Instrument, InstrumentAny},
43 types::{Price, Quantity},
44};
45use nautilus_network::{
46 http::USER_AGENT,
47 mode::ConnectionMode,
48 websocket::{
49 AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
50 channel_message_handler,
51 },
52};
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use super::{
57 auth::{AuthState, send_auth_request, spawn_token_refresh_task},
58 enums::{DeribitUpdateInterval, DeribitWsChannel},
59 error::{DeribitWsError, DeribitWsResult},
60 handler::{DeribitWsFeedHandler, HandlerCommand},
61 messages::{
62 DeribitCancelAllByInstrumentParams, DeribitCancelParams, DeribitEditParams,
63 DeribitOrderParams, NautilusWsMessage,
64 },
65};
66use crate::common::{
67 consts::{
68 DERIBIT_TESTNET_WS_URL, DERIBIT_WS_HEARTBEAT_SECS, DERIBIT_WS_ORDER_KEY,
69 DERIBIT_WS_ORDER_QUOTA, DERIBIT_WS_SUBSCRIPTION_KEY, DERIBIT_WS_SUBSCRIPTION_QUOTA,
70 DERIBIT_WS_URL,
71 },
72 credential::Credential,
73 parse::bar_spec_to_resolution,
74};
75
76const AUTHENTICATION_TIMEOUT_SECS: u64 = 30;
78
79#[derive(Clone)]
81#[cfg_attr(
82 feature = "python",
83 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit")
84)]
85pub struct DeribitWebSocketClient {
86 url: String,
87 is_testnet: bool,
88 heartbeat_interval: Option<u64>,
89 credential: Option<Credential>,
90 auth_state: Arc<tokio::sync::RwLock<Option<AuthState>>>,
91 signal: Arc<AtomicBool>,
92 connection_mode: Arc<ArcSwap<AtomicU8>>,
93 auth_tracker: AuthTracker,
94 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
95 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
96 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
97 subscriptions_state: SubscriptionState,
98 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
99 cancellation_token: CancellationToken,
100 account_id: Option<AccountId>,
101 bars_timestamp_on_close: bool,
102}
103
104impl Debug for DeribitWebSocketClient {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct(stringify!(DeribitWebSocketClient))
107 .field("url", &self.url)
108 .field("is_testnet", &self.is_testnet)
109 .field("has_credentials", &self.credential.is_some())
110 .field("is_authenticated", &self.auth_tracker.is_authenticated())
111 .field(
112 "has_auth_state",
113 &self.auth_state.try_read().is_ok_and(|s| s.is_some()),
114 )
115 .field("heartbeat_interval", &self.heartbeat_interval)
116 .finish_non_exhaustive()
117 }
118}
119
120impl DeribitWebSocketClient {
121 pub fn new(
129 url: Option<String>,
130 api_key: Option<String>,
131 api_secret: Option<String>,
132 heartbeat_interval: Option<u64>,
133 is_testnet: bool,
134 ) -> anyhow::Result<Self> {
135 Self::new_inner(
136 url,
137 api_key,
138 api_secret,
139 heartbeat_interval,
140 is_testnet,
141 true,
142 )
143 }
144
145 fn new_inner(
147 url: Option<String>,
148 api_key: Option<String>,
149 api_secret: Option<String>,
150 heartbeat_interval: Option<u64>,
151 is_testnet: bool,
152 env_fallback: bool,
153 ) -> anyhow::Result<Self> {
154 let url = url.unwrap_or_else(|| {
155 if is_testnet {
156 DERIBIT_TESTNET_WS_URL.to_string()
157 } else {
158 DERIBIT_WS_URL.to_string()
159 }
160 });
161
162 let credential =
164 Credential::resolve_with_env_fallback(api_key, api_secret, is_testnet, env_fallback)?;
165 if credential.is_some() {
166 log::info!("Credentials loaded (testnet={is_testnet})");
167 } else {
168 log::debug!("No credentials configured - unauthenticated mode");
169 }
170
171 let signal = Arc::new(AtomicBool::new(false));
172 let subscriptions_state = SubscriptionState::new('.');
173
174 Ok(Self {
175 url,
176 is_testnet,
177 heartbeat_interval,
178 credential,
179 auth_state: Arc::new(tokio::sync::RwLock::new(None)),
180 signal,
181 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
182 ConnectionMode::Closed.as_u8(),
183 ))),
184 auth_tracker: AuthTracker::new(),
185 cmd_tx: {
186 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
187 Arc::new(tokio::sync::RwLock::new(tx))
188 },
189 out_rx: None,
190 task_handle: None,
191 subscriptions_state,
192 instruments_cache: Arc::new(DashMap::new()),
193 cancellation_token: CancellationToken::new(),
194 account_id: None,
195 bars_timestamp_on_close: true,
196 })
197 }
198
199 pub fn new_public(is_testnet: bool) -> anyhow::Result<Self> {
207 let heartbeat_interval = DERIBIT_WS_HEARTBEAT_SECS;
208 Self::new_inner(
209 None,
210 None,
211 None,
212 Some(heartbeat_interval),
213 is_testnet,
214 false,
215 )
216 }
217
218 pub fn new_unauthenticated(
227 url: Option<String>,
228 heartbeat_interval: Option<u64>,
229 is_testnet: bool,
230 ) -> anyhow::Result<Self> {
231 Self::new_inner(url, None, None, heartbeat_interval, is_testnet, false)
232 }
233
234 pub fn with_credentials(is_testnet: bool) -> anyhow::Result<Self> {
244 let (key_env, secret_env) = if is_testnet {
245 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
246 } else {
247 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
248 };
249
250 let api_key = get_or_env_var_opt(None, key_env)
251 .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {key_env}"))?;
252 let api_secret = get_or_env_var_opt(None, secret_env)
253 .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {secret_env}"))?;
254
255 let heartbeat_interval = DERIBIT_WS_HEARTBEAT_SECS;
256 Self::new(
257 None,
258 Some(api_key),
259 Some(api_secret),
260 Some(heartbeat_interval),
261 is_testnet,
262 )
263 }
264
265 fn connection_mode(&self) -> ConnectionMode {
267 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
268 ConnectionMode::from_u8(mode_u8)
269 }
270
271 #[must_use]
273 pub fn is_active(&self) -> bool {
274 self.connection_mode() == ConnectionMode::Active
275 }
276
277 #[must_use]
279 pub fn url(&self) -> &str {
280 &self.url
281 }
282
283 #[must_use]
285 pub fn is_closed(&self) -> bool {
286 self.connection_mode() == ConnectionMode::Disconnect
287 }
288
289 pub fn cancel_all_requests(&self) {
291 self.cancellation_token.cancel();
292 }
293
294 #[must_use]
296 pub fn cancellation_token(&self) -> &CancellationToken {
297 &self.cancellation_token
298 }
299
300 pub async fn wait_until_active(&self, timeout_secs: f64) -> DeribitWsResult<()> {
306 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
307
308 tokio::time::timeout(timeout, async {
309 while !self.is_active() {
310 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
311 }
312 })
313 .await
314 .map_err(|_| {
315 DeribitWsError::Timeout(format!(
316 "WebSocket connection timeout after {timeout_secs} seconds"
317 ))
318 })?;
319
320 Ok(())
321 }
322
323 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
325 for inst in instruments {
326 self.instruments_cache
327 .insert(inst.raw_symbol().inner(), inst);
328 }
329 log::debug!("Cached {} instruments", self.instruments_cache.len());
330 }
331
332 pub fn cache_instrument(&self, instrument: InstrumentAny) {
334 let symbol = instrument.raw_symbol().inner();
335 self.instruments_cache.insert(symbol, instrument);
336
337 if self.is_active() {
339 let tx = self.cmd_tx.clone();
340 let inst = self.instruments_cache.get(&symbol).map(|r| r.clone());
341 if let Some(inst) = inst {
342 get_runtime().spawn(async move {
343 let _ = tx
344 .read()
345 .await
346 .send(HandlerCommand::UpdateInstrument(Box::new(inst)));
347 });
348 }
349 }
350 }
351
352 pub async fn connect(&mut self) -> anyhow::Result<()> {
358 log_info!(
359 "Connecting to WebSocket: {}",
360 self.url,
361 color = LogColor::Blue
362 );
363
364 self.signal.store(false, Ordering::Relaxed);
366
367 let (message_handler, raw_rx) = channel_message_handler();
369
370 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
372 });
374
375 let config = WebSocketConfig {
377 url: self.url.clone(),
378 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
379 heartbeat: self.heartbeat_interval,
380 heartbeat_msg: None, reconnect_timeout_ms: Some(5_000),
382 reconnect_delay_initial_ms: None,
383 reconnect_delay_max_ms: None,
384 reconnect_backoff_factor: None,
385 reconnect_jitter_ms: None,
386 reconnect_max_attempts: None,
387 };
388
389 let keyed_quotas = vec![
391 (
392 DERIBIT_WS_SUBSCRIPTION_KEY.to_string(),
393 *DERIBIT_WS_SUBSCRIPTION_QUOTA,
394 ),
395 (DERIBIT_WS_ORDER_KEY.to_string(), *DERIBIT_WS_ORDER_QUOTA),
396 ];
397
398 let ws_client = WebSocketClient::connect(
400 config,
401 Some(message_handler),
402 Some(ping_handler),
403 None, keyed_quotas,
405 Some(*DERIBIT_WS_SUBSCRIPTION_QUOTA), )
407 .await?;
408
409 self.connection_mode
411 .store(ws_client.connection_mode_atomic());
412
413 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
415 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
416
417 *self.cmd_tx.write().await = cmd_tx.clone();
419 self.out_rx = Some(Arc::new(out_rx));
420
421 let mut handler = DeribitWsFeedHandler::new(
423 self.signal.clone(),
424 cmd_rx,
425 raw_rx,
426 out_tx,
427 self.auth_tracker.clone(),
428 self.subscriptions_state.clone(),
429 self.account_id,
430 self.bars_timestamp_on_close,
431 );
432
433 let _ = cmd_tx.send(HandlerCommand::SetClient(ws_client));
435
436 let instruments: Vec<InstrumentAny> =
438 self.instruments_cache.iter().map(|r| r.clone()).collect();
439 if !instruments.is_empty() {
440 log::debug!(
441 "Sending {} cached instruments to handler",
442 instruments.len()
443 );
444 let _ = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments));
445 }
446
447 if let Some(interval) = self.heartbeat_interval {
449 let _ = cmd_tx.send(HandlerCommand::SetHeartbeat { interval });
450 }
451
452 let subscriptions_state = self.subscriptions_state.clone();
454 let credential = self.credential.clone();
455 let auth_tracker = self.auth_tracker.clone();
456 let auth_state = self.auth_state.clone();
457
458 let task_handle = get_runtime().spawn(async move {
459 let mut pending_reauth = false;
461
462 loop {
463 match handler.next().await {
464 Some(msg) => match msg {
465 NautilusWsMessage::Reconnected => {
466 log::info!("Reconnected to WebSocket");
467
468 let channels = subscriptions_state.all_topics();
471
472 for channel in &channels {
474 subscriptions_state.mark_failure(channel);
475 }
476
477 if let Some(cred) = &credential {
479 log::info!("Re-authenticating after reconnection...");
480
481 let _rx = auth_tracker.begin();
483 pending_reauth = true;
484
485 let previous_scope = auth_state
487 .read()
488 .await
489 .as_ref()
490 .map(|s| s.scope.clone());
491
492 send_auth_request(cred, previous_scope, &cmd_tx);
494 } else {
495 if !channels.is_empty() {
497 let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
498 }
499 }
500 }
501 NautilusWsMessage::Authenticated(result) => {
502 let timestamp = get_atomic_clock_realtime().get_time_ms();
503 let new_auth_state = AuthState::from_auth_result(&result, timestamp);
504 *auth_state.write().await = Some(new_auth_state);
505
506 spawn_token_refresh_task(
508 result.expires_in,
509 result.refresh_token.clone(),
510 cmd_tx.clone(),
511 );
512
513 if pending_reauth {
514 pending_reauth = false;
515 log::info!(
516 "Re-authentication successful (scope: {}), resubscribing to channels",
517 result.scope
518 );
519
520 let channels = subscriptions_state.all_topics();
522
523 if !channels.is_empty() {
524 let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
525 }
526 } else {
527 log::debug!(
529 "Auth state stored: scope={}, expires_in={}s",
530 result.scope,
531 result.expires_in
532 );
533 }
534 }
535 _ => {}
536 },
537 None => {
538 log::debug!("Handler returned None, stopping task");
539 break;
540 }
541 }
542 }
543 });
544
545 self.task_handle = Some(Arc::new(task_handle));
546 log::info!("Connected to WebSocket");
547
548 Ok(())
549 }
550
551 pub async fn close(&self) -> DeribitWsResult<()> {
557 log::info!("Closing WebSocket connection");
558 self.signal.store(true, Ordering::Relaxed);
559
560 let _ = self.cmd_tx.read().await.send(HandlerCommand::Disconnect);
561
562 if let Some(_handle) = &self.task_handle {
564 let _ = tokio::time::timeout(Duration::from_secs(5), async {
565 tokio::time::sleep(Duration::from_millis(100)).await;
567 })
568 .await;
569 }
570
571 self.auth_tracker.invalidate();
572
573 Ok(())
574 }
575
576 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
582 let rx = self
583 .out_rx
584 .take()
585 .expect("Data stream receiver already taken or not connected");
586 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
587
588 async_stream::stream! {
589 while let Some(msg) = rx.recv().await {
590 yield msg;
591 }
592 }
593 }
594
595 #[must_use]
597 pub fn has_credentials(&self) -> bool {
598 self.credential.is_some()
599 }
600
601 #[must_use]
603 pub fn is_authenticated(&self) -> bool {
604 self.auth_tracker.is_authenticated()
605 }
606
607 pub async fn authenticate(&self, session_name: Option<&str>) -> DeribitWsResult<()> {
626 let credential = self.credential.as_ref().ok_or_else(|| {
627 DeribitWsError::Authentication("API credentials not configured".to_string())
628 })?;
629
630 let scope = session_name.map(|name| format!("session:{name}"));
632
633 log::info!("Authenticating WebSocket...");
634
635 let rx = self.auth_tracker.begin();
636
637 let cmd_tx = self.cmd_tx.read().await;
639 send_auth_request(credential, scope, &cmd_tx);
640 drop(cmd_tx);
641
642 match self
644 .auth_tracker
645 .wait_for_result::<DeribitWsError>(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
646 .await
647 {
648 Ok(()) => {
649 log::info!("WebSocket authenticated successfully");
650 Ok(())
651 }
652 Err(e) => {
653 log::error!("WebSocket authentication failed: error={e}");
654 Err(e)
655 }
656 }
657 }
658
659 pub async fn authenticate_session(&self, session_name: &str) -> DeribitWsResult<()> {
668 self.authenticate(Some(session_name)).await
669 }
670
671 pub async fn auth_state(&self) -> Option<AuthState> {
675 self.auth_state.read().await.clone()
676 }
677
678 pub async fn access_token(&self) -> Option<String> {
680 self.auth_state
681 .read()
682 .await
683 .as_ref()
684 .map(|s| s.access_token.clone())
685 }
686
687 pub fn set_account_id(&mut self, account_id: AccountId) {
689 self.account_id = Some(account_id);
690 }
691
692 pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
696 self.bars_timestamp_on_close = value;
697 }
698
699 async fn send_subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
700 let mut channels_to_subscribe = Vec::new();
701
702 for channel in channels {
703 if self.subscriptions_state.add_reference(&channel) {
704 self.subscriptions_state.mark_subscribe(&channel);
705 channels_to_subscribe.push(channel);
706 } else {
707 log::debug!("Already subscribed to {channel}, skipping duplicate subscription");
708 }
709 }
710
711 if channels_to_subscribe.is_empty() {
712 return Ok(());
713 }
714
715 self.cmd_tx
716 .read()
717 .await
718 .send(HandlerCommand::Subscribe {
719 channels: channels_to_subscribe.clone(),
720 })
721 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
722
723 log::debug!(
724 "Sent subscribe for {} channels",
725 channels_to_subscribe.len()
726 );
727 Ok(())
728 }
729
730 async fn send_unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
731 let mut channels_to_unsubscribe = Vec::new();
732
733 for channel in channels {
734 if self.subscriptions_state.remove_reference(&channel) {
735 self.subscriptions_state.mark_unsubscribe(&channel);
736 channels_to_unsubscribe.push(channel);
737 } else {
738 log::debug!("Still has references to {channel}, skipping unsubscription");
739 }
740 }
741
742 if channels_to_unsubscribe.is_empty() {
743 return Ok(());
744 }
745
746 self.cmd_tx
747 .read()
748 .await
749 .send(HandlerCommand::Unsubscribe {
750 channels: channels_to_unsubscribe.clone(),
751 })
752 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
753
754 log::debug!(
755 "Sent unsubscribe for {} channels",
756 channels_to_unsubscribe.len()
757 );
758 Ok(())
759 }
760
761 pub async fn subscribe_trades(
772 &self,
773 instrument_id: InstrumentId,
774 interval: Option<DeribitUpdateInterval>,
775 ) -> DeribitWsResult<()> {
776 let interval = interval.unwrap_or_default();
777 self.check_auth_requirement(interval)?;
778 let channel =
779 DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
780 self.send_subscribe(vec![channel]).await
781 }
782
783 pub async fn unsubscribe_trades(
789 &self,
790 instrument_id: InstrumentId,
791 interval: Option<DeribitUpdateInterval>,
792 ) -> DeribitWsResult<()> {
793 let interval = interval.unwrap_or_default();
794 let channel =
795 DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
796 self.send_unsubscribe(vec![channel]).await
797 }
798
799 pub async fn subscribe_book(
810 &self,
811 instrument_id: InstrumentId,
812 interval: Option<DeribitUpdateInterval>,
813 ) -> DeribitWsResult<()> {
814 let interval = interval.unwrap_or_default();
815 self.check_auth_requirement(interval)?;
816 let channel =
817 DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
818 self.send_subscribe(vec![channel]).await
819 }
820
821 pub async fn unsubscribe_book(
827 &self,
828 instrument_id: InstrumentId,
829 interval: Option<DeribitUpdateInterval>,
830 ) -> DeribitWsResult<()> {
831 let interval = interval.unwrap_or_default();
832 let channel =
833 DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
834 self.send_unsubscribe(vec![channel]).await
835 }
836
837 pub async fn subscribe_book_grouped(
847 &self,
848 instrument_id: InstrumentId,
849 group: &str,
850 depth: u32,
851 interval: Option<DeribitUpdateInterval>,
852 ) -> DeribitWsResult<()> {
853 let interval = match interval {
855 Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
856 Some(i) => i,
857 };
858
859 let normalized_depth = if depth < 5 {
860 1
861 } else if depth < 15 {
862 10
863 } else {
864 20
865 };
866
867 let channel = format!(
868 "book.{}.{}.{}.{}",
869 instrument_id.symbol,
870 group,
871 normalized_depth,
872 interval.as_str()
873 );
874 log::debug!("Subscribing to grouped book channel: {channel}");
875 self.send_subscribe(vec![channel]).await
876 }
877
878 pub async fn unsubscribe_book_grouped(
886 &self,
887 instrument_id: InstrumentId,
888 group: &str,
889 depth: u32,
890 interval: Option<DeribitUpdateInterval>,
891 ) -> DeribitWsResult<()> {
892 let interval = match interval {
894 Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
895 Some(i) => i,
896 };
897
898 let normalized_depth = if depth < 5 {
899 1
900 } else if depth < 15 {
901 10
902 } else {
903 20
904 };
905
906 let channel = format!(
907 "book.{}.{}.{}.{}",
908 instrument_id.symbol,
909 group,
910 normalized_depth,
911 interval.as_str()
912 );
913 self.send_unsubscribe(vec![channel]).await
914 }
915
916 pub async fn subscribe_ticker(
927 &self,
928 instrument_id: InstrumentId,
929 interval: Option<DeribitUpdateInterval>,
930 ) -> DeribitWsResult<()> {
931 let interval = interval.unwrap_or_default();
932 self.check_auth_requirement(interval)?;
933 let channel =
934 DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
935 self.send_subscribe(vec![channel]).await
936 }
937
938 pub async fn unsubscribe_ticker(
944 &self,
945 instrument_id: InstrumentId,
946 interval: Option<DeribitUpdateInterval>,
947 ) -> DeribitWsResult<()> {
948 let interval = interval.unwrap_or_default();
949 let channel =
950 DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
951 self.send_unsubscribe(vec![channel]).await
952 }
953
954 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
962 let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
963 self.send_subscribe(vec![channel]).await
964 }
965
966 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
972 let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
973 self.send_unsubscribe(vec![channel]).await
974 }
975
976 pub async fn subscribe_instrument_state(
984 &self,
985 kind: &str,
986 currency: &str,
987 ) -> DeribitWsResult<()> {
988 let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
989 self.send_subscribe(vec![channel]).await
990 }
991
992 pub async fn unsubscribe_instrument_state(
998 &self,
999 kind: &str,
1000 currency: &str,
1001 ) -> DeribitWsResult<()> {
1002 let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
1003 self.send_unsubscribe(vec![channel]).await
1004 }
1005
1006 pub async fn subscribe_perpetual_interests_rates_updates(
1014 &self,
1015 instrument_id: InstrumentId,
1016 interval: Option<DeribitUpdateInterval>,
1017 ) -> DeribitWsResult<()> {
1018 let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1019 let channel = DeribitWsChannel::Perpetual
1020 .format_channel(instrument_id.symbol.as_str(), Some(interval));
1021
1022 self.send_subscribe(vec![channel]).await
1023 }
1024
1025 pub async fn unsubscribe_perpetual_interest_rates_updates(
1031 &self,
1032 instrument_id: InstrumentId,
1033 interval: Option<DeribitUpdateInterval>,
1034 ) -> DeribitWsResult<()> {
1035 let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1036 let channel = DeribitWsChannel::Perpetual
1037 .format_channel(instrument_id.symbol.as_str(), Some(interval));
1038
1039 self.send_unsubscribe(vec![channel]).await
1040 }
1041
1042 pub async fn subscribe_chart(
1054 &self,
1055 instrument_id: InstrumentId,
1056 resolution: &str,
1057 ) -> DeribitWsResult<()> {
1058 let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1060 self.send_subscribe(vec![channel]).await
1061 }
1062
1063 pub async fn unsubscribe_chart(
1069 &self,
1070 instrument_id: InstrumentId,
1071 resolution: &str,
1072 ) -> DeribitWsResult<()> {
1073 let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1074 self.send_unsubscribe(vec![channel]).await
1075 }
1076
1077 pub async fn subscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1086 let resolution = bar_spec_to_resolution(&bar_type);
1087 self.subscribe_chart(bar_type.instrument_id(), &resolution)
1088 .await
1089 }
1090
1091 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1097 let resolution = bar_spec_to_resolution(&bar_type);
1098 self.unsubscribe_chart(bar_type.instrument_id(), &resolution)
1099 .await
1100 }
1101
1102 fn check_auth_requirement(&self, interval: DeribitUpdateInterval) -> DeribitWsResult<()> {
1108 if interval.requires_auth() && !self.is_authenticated() {
1109 return Err(DeribitWsError::Authentication(
1110 "Raw streams require authentication. Call authenticate() first.".to_string(),
1111 ));
1112 }
1113 Ok(())
1114 }
1115
1116 pub async fn subscribe_user_orders(&self) -> DeribitWsResult<()> {
1124 if !self.is_authenticated() {
1125 return Err(DeribitWsError::Authentication(
1126 "User orders subscription requires authentication".to_string(),
1127 ));
1128 }
1129 self.send_subscribe(vec!["user.orders.any.any.raw".to_string()])
1130 .await
1131 }
1132
1133 pub async fn unsubscribe_user_orders(&self) -> DeribitWsResult<()> {
1139 self.send_unsubscribe(vec!["user.orders.any.any.raw".to_string()])
1140 .await
1141 }
1142
1143 pub async fn subscribe_user_trades(&self) -> DeribitWsResult<()> {
1151 if !self.is_authenticated() {
1152 return Err(DeribitWsError::Authentication(
1153 "User trades subscription requires authentication".to_string(),
1154 ));
1155 }
1156 self.send_subscribe(vec!["user.trades.any.any.raw".to_string()])
1157 .await
1158 }
1159
1160 pub async fn unsubscribe_user_trades(&self) -> DeribitWsResult<()> {
1166 self.send_unsubscribe(vec!["user.trades.any.any.raw".to_string()])
1167 .await
1168 }
1169
1170 pub async fn subscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1180 if !self.is_authenticated() {
1181 return Err(DeribitWsError::Authentication(
1182 "User portfolio subscription requires authentication".to_string(),
1183 ));
1184 }
1185 self.send_subscribe(vec!["user.portfolio.any".to_string()])
1186 .await
1187 }
1188
1189 pub async fn unsubscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1195 self.send_unsubscribe(vec!["user.portfolio.any".to_string()])
1196 .await
1197 }
1198
1199 pub async fn subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1205 self.send_subscribe(channels).await
1206 }
1207
1208 pub async fn unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1214 self.send_unsubscribe(channels).await
1215 }
1216
1217 pub async fn submit_order(
1228 &self,
1229 order_side: OrderSide,
1230 params: DeribitOrderParams,
1231 client_order_id: ClientOrderId,
1232 trader_id: TraderId,
1233 strategy_id: StrategyId,
1234 instrument_id: InstrumentId,
1235 ) -> DeribitWsResult<()> {
1236 if !self.is_authenticated() {
1237 return Err(DeribitWsError::Authentication(
1238 "Submit order requires authentication. Call authenticate_session() first."
1239 .to_string(),
1240 ));
1241 }
1242
1243 log::debug!(
1244 "Sending {} order: instrument={}, amount={}, price={:?}, client_order_id={}",
1245 order_side,
1246 params.instrument_name,
1247 params.amount,
1248 params.price,
1249 client_order_id
1250 );
1251
1252 let cmd = match order_side {
1253 OrderSide::Buy => HandlerCommand::Buy {
1254 params,
1255 client_order_id,
1256 trader_id,
1257 strategy_id,
1258 instrument_id,
1259 },
1260 OrderSide::Sell => HandlerCommand::Sell {
1261 params,
1262 client_order_id,
1263 trader_id,
1264 strategy_id,
1265 instrument_id,
1266 },
1267 _ => {
1268 return Err(DeribitWsError::ClientError(format!(
1269 "Invalid order side: {order_side}"
1270 )));
1271 }
1272 };
1273
1274 self.cmd_tx
1275 .read()
1276 .await
1277 .send(cmd)
1278 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1279
1280 Ok(())
1281 }
1282
1283 #[allow(clippy::too_many_arguments)]
1294 pub async fn modify_order(
1295 &self,
1296 order_id: &str,
1297 quantity: Quantity,
1298 price: Price,
1299 client_order_id: ClientOrderId,
1300 trader_id: TraderId,
1301 strategy_id: StrategyId,
1302 instrument_id: InstrumentId,
1303 ) -> DeribitWsResult<()> {
1304 if !self.is_authenticated() {
1305 return Err(DeribitWsError::Authentication(
1306 "Modify order requires authentication. Call authenticate_session() first."
1307 .to_string(),
1308 ));
1309 }
1310
1311 let params = DeribitEditParams {
1312 order_id: order_id.to_string(),
1313 amount: quantity.as_decimal(),
1314 price: Some(price.as_decimal()),
1315 post_only: None,
1316 reject_post_only: None,
1317 reduce_only: None,
1318 trigger_price: None,
1319 };
1320
1321 log::debug!(
1322 "Sending modify order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
1323 );
1324
1325 self.cmd_tx
1326 .read()
1327 .await
1328 .send(HandlerCommand::Edit {
1329 params,
1330 client_order_id,
1331 trader_id,
1332 strategy_id,
1333 instrument_id,
1334 })
1335 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1336
1337 Ok(())
1338 }
1339
1340 pub async fn cancel_order(
1351 &self,
1352 order_id: &str,
1353 client_order_id: ClientOrderId,
1354 trader_id: TraderId,
1355 strategy_id: StrategyId,
1356 instrument_id: InstrumentId,
1357 ) -> DeribitWsResult<()> {
1358 if !self.is_authenticated() {
1359 return Err(DeribitWsError::Authentication(
1360 "Cancel order requires authentication. Call authenticate_session() first."
1361 .to_string(),
1362 ));
1363 }
1364
1365 let params = DeribitCancelParams {
1366 order_id: order_id.to_string(),
1367 };
1368
1369 log::debug!("Sending cancel order: order_id={order_id}, client_order_id={client_order_id}");
1370
1371 self.cmd_tx
1372 .read()
1373 .await
1374 .send(HandlerCommand::Cancel {
1375 params,
1376 client_order_id,
1377 trader_id,
1378 strategy_id,
1379 instrument_id,
1380 })
1381 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1382
1383 Ok(())
1384 }
1385
1386 pub async fn cancel_all_orders(
1397 &self,
1398 instrument_id: InstrumentId,
1399 order_type: Option<String>,
1400 ) -> DeribitWsResult<()> {
1401 if !self.is_authenticated() {
1402 return Err(DeribitWsError::Authentication(
1403 "Cancel all orders requires authentication. Call authenticate_session() first."
1404 .to_string(),
1405 ));
1406 }
1407
1408 let instrument_name = instrument_id.symbol.to_string();
1409 let params = DeribitCancelAllByInstrumentParams {
1410 instrument_name: instrument_name.clone(),
1411 order_type,
1412 };
1413
1414 log::debug!("Sending cancel_all_orders: instrument={instrument_name}");
1415
1416 self.cmd_tx
1417 .read()
1418 .await
1419 .send(HandlerCommand::CancelAllByInstrument {
1420 params,
1421 instrument_id,
1422 })
1423 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1424
1425 Ok(())
1426 }
1427
1428 pub async fn query_order(
1439 &self,
1440 order_id: &str,
1441 client_order_id: ClientOrderId,
1442 trader_id: TraderId,
1443 strategy_id: StrategyId,
1444 instrument_id: InstrumentId,
1445 ) -> DeribitWsResult<()> {
1446 if !self.is_authenticated() {
1447 return Err(DeribitWsError::Authentication(
1448 "Query order state requires authentication. Call authenticate_session() first."
1449 .to_string(),
1450 ));
1451 }
1452
1453 log::debug!("Sending query_order: order_id={order_id}, client_order_id={client_order_id}");
1454
1455 self.cmd_tx
1456 .read()
1457 .await
1458 .send(HandlerCommand::GetOrderState {
1459 order_id: order_id.to_string(),
1460 client_order_id,
1461 trader_id,
1462 strategy_id,
1463 instrument_id,
1464 })
1465 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1466
1467 Ok(())
1468 }
1469}