1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28 clients::DataClient,
29 live::{runner::get_data_event_sender, runtime::get_runtime},
30 log_info,
31 messages::{
32 DataEvent, DataResponse,
33 data::{
34 BarsResponse, BookResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
36 SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates,
37 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
38 SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
40 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstruments,
41 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
42 },
43 },
44};
45use nautilus_core::{
46 datetime::datetime_to_unix_nanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50 data::{Data, OrderBookDeltas_API},
51 identifiers::{ClientId, InstrumentId, Venue},
52 instruments::{Instrument, InstrumentAny},
53};
54use tokio::task::JoinHandle;
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58 common::{
59 consts::DERIBIT_VENUE,
60 parse::{bar_spec_to_resolution, parse_instrument_kind_currency},
61 },
62 config::DeribitDataClientConfig,
63 http::{
64 client::DeribitHttpClient,
65 models::{DeribitCurrency, DeribitInstrumentKind},
66 },
67 websocket::{
68 auth::DERIBIT_DATA_SESSION_NAME, client::DeribitWebSocketClient,
69 enums::DeribitUpdateInterval, messages::NautilusWsMessage,
70 },
71};
72
73#[derive(Debug)]
75pub struct DeribitDataClient {
76 client_id: ClientId,
77 config: DeribitDataClientConfig,
78 http_client: DeribitHttpClient,
79 ws_client: Option<DeribitWebSocketClient>,
80 is_connected: AtomicBool,
81 cancellation_token: CancellationToken,
82 tasks: Vec<JoinHandle<()>>,
83 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
84 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
85 clock: &'static AtomicTime,
86}
87
88impl DeribitDataClient {
89 pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
95 let clock = get_atomic_clock_realtime();
96 let data_sender = get_data_event_sender();
97
98 let http_client = if config.has_api_credentials() {
99 DeribitHttpClient::new_with_env(
100 config.api_key.clone(),
101 config.api_secret.clone(),
102 config.use_testnet,
103 config.http_timeout_secs,
104 config.max_retries,
105 config.retry_delay_initial_ms,
106 config.retry_delay_max_ms,
107 None, )?
109 } else {
110 DeribitHttpClient::new(
111 config.base_url_http.clone(),
112 config.use_testnet,
113 config.http_timeout_secs,
114 config.max_retries,
115 config.retry_delay_initial_ms,
116 config.retry_delay_max_ms,
117 None, )?
119 };
120
121 let ws_client = DeribitWebSocketClient::new(
122 Some(config.ws_url()),
123 config.api_key.clone(),
124 config.api_secret.clone(),
125 config.heartbeat_interval_secs,
126 config.use_testnet,
127 )?;
128
129 Ok(Self {
130 client_id,
131 config,
132 http_client,
133 ws_client: Some(ws_client),
134 is_connected: AtomicBool::new(false),
135 cancellation_token: CancellationToken::new(),
136 tasks: Vec::new(),
137 data_sender,
138 instruments: Arc::new(RwLock::new(AHashMap::new())),
139 clock,
140 })
141 }
142
143 fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
145 self.ws_client
146 .as_mut()
147 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
148 }
149
150 fn spawn_stream_task(
152 &mut self,
153 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
154 ) -> anyhow::Result<()> {
155 let data_sender = self.data_sender.clone();
156 let instruments = Arc::clone(&self.instruments);
157 let cancellation = self.cancellation_token.clone();
158
159 let handle = get_runtime().spawn(async move {
160 tokio::pin!(stream);
161
162 loop {
163 tokio::select! {
164 maybe_msg = stream.next() => {
165 match maybe_msg {
166 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
167 None => {
168 log::debug!("Deribit websocket stream ended");
169 break;
170 }
171 }
172 }
173 () = cancellation.cancelled() => {
174 log::debug!("Deribit websocket stream task cancelled");
175 break;
176 }
177 }
178 }
179 });
180
181 self.tasks.push(handle);
182 Ok(())
183 }
184
185 fn handle_ws_message(
187 message: NautilusWsMessage,
188 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
189 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
190 ) {
191 match message {
192 NautilusWsMessage::Data(payloads) => {
193 for data in payloads {
194 Self::send_data(sender, data);
195 }
196 }
197 NautilusWsMessage::Deltas(deltas) => {
198 Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
199 }
200 NautilusWsMessage::Instrument(instrument) => {
201 let instrument_any = *instrument;
202 if let Ok(mut guard) = instruments.write() {
203 let instrument_id = instrument_any.id();
204 guard.insert(instrument_id, instrument_any.clone());
205 drop(guard);
206
207 if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
208 log::warn!("Failed to send instrument update: {e}");
209 }
210 } else {
211 log::error!("Instrument cache lock poisoned, skipping instrument update");
212 }
213 }
214 NautilusWsMessage::Error(e) => {
215 log::error!("Deribit WebSocket error: {e:?}");
216 }
217 NautilusWsMessage::Raw(value) => {
218 log::debug!("Unhandled raw message: {value}");
219 }
220 NautilusWsMessage::Reconnected => {
221 log::info!("Deribit websocket reconnected");
222 }
223 NautilusWsMessage::Authenticated(auth) => {
224 log::debug!(
225 "Deribit websocket authenticated: expires_in={}s",
226 auth.expires_in
227 );
228 }
229 NautilusWsMessage::FundingRates(funding_rates) => {
230 log::info!(
231 "Received {} funding rate update(s) from WebSocket",
232 funding_rates.len()
233 );
234 for funding_rate in funding_rates {
235 log::debug!("Sending funding rate: {funding_rate:?}");
236 if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
237 log::error!("Failed to send funding rate: {e}");
238 }
239 }
240 }
241 }
242 }
243
244 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
246 if let Err(e) = sender.send(DataEvent::Data(data)) {
247 log::error!("Failed to send data: {e}");
248 }
249 }
250}
251
252#[async_trait(?Send)]
253impl DataClient for DeribitDataClient {
254 fn client_id(&self) -> ClientId {
255 self.client_id
256 }
257
258 fn venue(&self) -> Option<Venue> {
259 Some(*DERIBIT_VENUE)
260 }
261
262 fn start(&mut self) -> anyhow::Result<()> {
263 log::info!(
264 "Starting Deribit data client: client_id={}, use_testnet={}",
265 self.client_id,
266 self.config.use_testnet
267 );
268 Ok(())
269 }
270
271 fn stop(&mut self) -> anyhow::Result<()> {
272 log::info!("Stopping Deribit data client: {}", self.client_id);
273 self.cancellation_token.cancel();
274 self.is_connected.store(false, Ordering::Relaxed);
275 Ok(())
276 }
277
278 fn reset(&mut self) -> anyhow::Result<()> {
279 log::info!("Resetting Deribit data client: {}", self.client_id);
280 self.is_connected.store(false, Ordering::Relaxed);
281 self.cancellation_token = CancellationToken::new();
282 self.tasks.clear();
283 if let Ok(mut instruments) = self.instruments.write() {
284 instruments.clear();
285 }
286 Ok(())
287 }
288
289 fn dispose(&mut self) -> anyhow::Result<()> {
290 log::info!("Disposing Deribit data client: {}", self.client_id);
291 self.stop()
292 }
293
294 fn is_connected(&self) -> bool {
295 self.is_connected.load(Ordering::SeqCst)
296 }
297
298 fn is_disconnected(&self) -> bool {
299 !self.is_connected()
300 }
301
302 async fn connect(&mut self) -> anyhow::Result<()> {
303 if self.is_connected() {
304 return Ok(());
305 }
306
307 let instrument_kinds = if self.config.instrument_kinds.is_empty() {
309 vec![DeribitInstrumentKind::Future]
310 } else {
311 self.config.instrument_kinds.clone()
312 };
313
314 let mut all_instruments = Vec::new();
315 for kind in &instrument_kinds {
316 let fetched = self
317 .http_client
318 .request_instruments(DeribitCurrency::ANY, Some(*kind))
319 .await
320 .with_context(|| format!("failed to request Deribit instruments for {kind:?}"))?;
321
322 self.http_client.cache_instruments(fetched.clone());
324
325 let mut guard = self
327 .instruments
328 .write()
329 .map_err(|e| anyhow::anyhow!("{e}"))?;
330 for instrument in &fetched {
331 guard.insert(instrument.id(), instrument.clone());
332 }
333 drop(guard);
334
335 all_instruments.extend(fetched);
336 }
337
338 log::info!(
339 "Cached instruments: client_id={}, total={}",
340 self.client_id,
341 all_instruments.len()
342 );
343
344 for instrument in &all_instruments {
345 if let Err(e) = self
346 .data_sender
347 .send(DataEvent::Instrument(instrument.clone()))
348 {
349 log::warn!("Failed to send instrument: {e}");
350 }
351 }
352
353 let ws = self.ws_client_mut()?;
355 ws.cache_instruments(all_instruments);
356
357 ws.connect()
359 .await
360 .context("failed to connect Deribit websocket")?;
361 ws.wait_until_active(10.0)
362 .await
363 .context("websocket failed to become active")?;
364
365 if ws.has_credentials() {
367 ws.authenticate_session(DERIBIT_DATA_SESSION_NAME)
368 .await
369 .context("failed to authenticate Deribit websocket")?;
370 log_info!("Deribit WebSocket authenticated");
371 }
372
373 let stream = self.ws_client_mut()?.stream();
375 self.spawn_stream_task(stream)?;
376
377 self.is_connected.store(true, Ordering::Release);
378 let network = if self.config.use_testnet {
379 "testnet"
380 } else {
381 "mainnet"
382 };
383 log_info!("Deribit data client connected ({})", network);
384 Ok(())
385 }
386
387 async fn disconnect(&mut self) -> anyhow::Result<()> {
388 if self.is_disconnected() {
389 return Ok(());
390 }
391
392 self.cancellation_token.cancel();
394
395 if let Some(ws) = self.ws_client.as_ref()
397 && let Err(e) = ws.close().await
398 {
399 log::warn!("Error while closing Deribit websocket: {e:?}");
400 }
401
402 for handle in self.tasks.drain(..) {
404 if let Err(e) = handle.await {
405 log::error!("Error joining websocket task: {e:?}");
406 }
407 }
408
409 self.cancellation_token = CancellationToken::new();
411 self.is_connected.store(false, Ordering::Relaxed);
412
413 log_info!("Deribit data client disconnected");
414 Ok(())
415 }
416
417 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
418 let kind = cmd
420 .params
421 .as_ref()
422 .and_then(|p| p.get("kind"))
423 .map_or("any", |s| s.as_str())
424 .to_string();
425 let currency = cmd
426 .params
427 .as_ref()
428 .and_then(|p| p.get("currency"))
429 .map_or("any", |s| s.as_str())
430 .to_string();
431
432 let ws = self
433 .ws_client
434 .as_ref()
435 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
436 .clone();
437
438 log::info!("Subscribing to instrument state changes for {kind}.{currency}");
439
440 get_runtime().spawn(async move {
441 if let Err(e) = ws.subscribe_instrument_state(&kind, ¤cy).await {
442 log::error!("Failed to subscribe to instrument state for {kind}.{currency}: {e}");
443 }
444 });
445
446 Ok(())
447 }
448
449 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
450 let instrument_id = cmd.instrument_id;
451
452 let guard = self
454 .instruments
455 .read()
456 .map_err(|e| anyhow::anyhow!("{e}"))?;
457 if !guard.contains_key(&instrument_id) {
458 log::warn!(
459 "Instrument {instrument_id} not in cache - it may have been created after connect()"
460 );
461 }
462 drop(guard);
463
464 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
466
467 let ws = self
468 .ws_client
469 .as_ref()
470 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
471 .clone();
472
473 log::info!(
474 "Subscribing to instrument state for {instrument_id} (channel: {kind}.{currency})"
475 );
476
477 get_runtime().spawn(async move {
479 if let Err(e) = ws.subscribe_instrument_state(&kind, ¤cy).await {
480 log::error!("Failed to subscribe to instrument state for {instrument_id}: {e}");
481 }
482 });
483
484 Ok(())
485 }
486
487 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
488 let ws = self
489 .ws_client
490 .as_ref()
491 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
492 .clone();
493 let instrument_id = cmd.instrument_id;
494
495 let interval = cmd
497 .params
498 .as_ref()
499 .and_then(|p| p.get("interval"))
500 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
501
502 log::info!(
503 "Subscribing to book deltas for {} (interval: {}, book_type: {:?})",
504 instrument_id,
505 interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
506 cmd.book_type
507 );
508
509 get_runtime().spawn(async move {
510 if let Err(e) = ws.subscribe_book(instrument_id, interval).await {
511 log::error!("Failed to subscribe to book deltas for {instrument_id}: {e}");
512 }
513 });
514
515 Ok(())
516 }
517
518 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
519 let ws = self
520 .ws_client
521 .as_ref()
522 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
523 .clone();
524 let instrument_id = cmd.instrument_id;
525
526 let interval = cmd
528 .params
529 .as_ref()
530 .and_then(|p| p.get("interval"))
531 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
532
533 let group = cmd
535 .params
536 .as_ref()
537 .and_then(|p| p.get("group"))
538 .map_or("none", String::as_str)
539 .to_string();
540
541 log::info!(
542 "Subscribing to book depth10 for {} (group: {}, interval: {}, book_type: {:?})",
543 instrument_id,
544 group,
545 interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
546 cmd.book_type
547 );
548
549 get_runtime().spawn(async move {
550 if let Err(e) = ws
551 .subscribe_book_grouped(instrument_id, &group, 10, interval)
552 .await
553 {
554 log::error!("Failed to subscribe to book depth10 for {instrument_id}: {e}");
555 }
556 });
557
558 Ok(())
559 }
560
561 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
562 let ws = self
563 .ws_client
564 .as_ref()
565 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
566 .clone();
567 let instrument_id = cmd.instrument_id;
568
569 log::info!("Subscribing to quotes for {instrument_id}");
570
571 get_runtime().spawn(async move {
572 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
573 log::error!("Failed to subscribe to quotes for {instrument_id}: {e}");
574 }
575 });
576
577 Ok(())
578 }
579
580 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
581 let ws = self
582 .ws_client
583 .as_ref()
584 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
585 .clone();
586 let instrument_id = cmd.instrument_id;
587
588 let interval = cmd
589 .params
590 .as_ref()
591 .and_then(|p| p.get("interval"))
592 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
593
594 log::info!(
595 "Subscribing to trades for {} (interval: {})",
596 instrument_id,
597 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
598 );
599
600 get_runtime().spawn(async move {
601 if let Err(e) = ws.subscribe_trades(instrument_id, interval).await {
602 log::error!("Failed to subscribe to trades for {instrument_id}: {e}");
603 }
604 });
605
606 Ok(())
607 }
608
609 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
610 let ws = self
611 .ws_client
612 .as_ref()
613 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
614 .clone();
615 let instrument_id = cmd.instrument_id;
616
617 let interval = cmd
618 .params
619 .as_ref()
620 .and_then(|p| p.get("interval"))
621 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
622
623 log::info!(
624 "Subscribing to mark prices for {} (via ticker channel, interval: {})",
625 instrument_id,
626 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
627 );
628
629 get_runtime().spawn(async move {
630 if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
631 log::error!("Failed to subscribe to mark prices for {instrument_id}: {e}");
632 }
633 });
634
635 Ok(())
636 }
637
638 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
639 let ws = self
640 .ws_client
641 .as_ref()
642 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
643 .clone();
644 let instrument_id = cmd.instrument_id;
645
646 let interval = cmd
647 .params
648 .as_ref()
649 .and_then(|p| p.get("interval"))
650 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
651
652 log::info!(
653 "Subscribing to index prices for {} (via ticker channel, interval: {})",
654 instrument_id,
655 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
656 );
657
658 get_runtime().spawn(async move {
659 if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
660 log::error!("Failed to subscribe to index prices for {instrument_id}: {e}");
661 }
662 });
663
664 Ok(())
665 }
666
667 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
668 let instrument_id = cmd.instrument_id;
669
670 let is_perpetual = self
672 .instruments
673 .read()
674 .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
675 .get(&instrument_id)
676 .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
677
678 if !is_perpetual {
679 log::warn!(
680 "Funding rates subscription rejected for {instrument_id}: only available for perpetual instruments."
681 );
682 return Ok(());
683 }
684
685 let ws = self
686 .ws_client
687 .as_ref()
688 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
689 .clone();
690
691 let interval = cmd
692 .params
693 .as_ref()
694 .and_then(|p| p.get("interval"))
695 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
696
697 log::info!(
699 "Subscribing to funding rates for {} (perpetual channel, interval: {})",
700 instrument_id,
701 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
702 );
703
704 get_runtime().spawn(async move {
705 if let Err(e) = ws
706 .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
707 .await
708 {
709 log::error!("Failed to subscribe to funding rates for {instrument_id}: {e}");
710 }
711 });
712
713 Ok(())
714 }
715
716 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
717 let ws = self
718 .ws_client
719 .as_ref()
720 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
721 .clone();
722 let instrument_id = cmd.bar_type.instrument_id();
723 let resolution = bar_spec_to_resolution(&cmd.bar_type);
725
726 get_runtime().spawn(async move {
727 if let Err(e) = ws.subscribe_chart(instrument_id, &resolution).await {
728 log::error!("Failed to subscribe to bars for {instrument_id}: {e}");
729 }
730 });
731
732 Ok(())
733 }
734
735 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
736 let kind = cmd
737 .params
738 .as_ref()
739 .and_then(|p| p.get("kind"))
740 .map_or("any", |s| s.as_str())
741 .to_string();
742 let currency = cmd
743 .params
744 .as_ref()
745 .and_then(|p| p.get("currency"))
746 .map_or("any", |s| s.as_str())
747 .to_string();
748
749 let ws = self
750 .ws_client
751 .as_ref()
752 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
753 .clone();
754
755 log::info!("Unsubscribing from instrument state changes for {kind}.{currency}");
756
757 get_runtime().spawn(async move {
758 if let Err(e) = ws.unsubscribe_instrument_state(&kind, ¤cy).await {
759 log::error!(
760 "Failed to unsubscribe from instrument state for {kind}.{currency}: {e}"
761 );
762 }
763 });
764
765 Ok(())
766 }
767
768 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
769 let instrument_id = cmd.instrument_id;
770
771 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
773
774 let ws = self
775 .ws_client
776 .as_ref()
777 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
778 .clone();
779
780 log::info!(
781 "Unsubscribing from instrument state for {instrument_id} (channel: {kind}.{currency})"
782 );
783
784 get_runtime().spawn(async move {
785 if let Err(e) = ws.unsubscribe_instrument_state(&kind, ¤cy).await {
786 log::error!("Failed to unsubscribe from instrument state for {instrument_id}: {e}");
787 }
788 });
789
790 Ok(())
791 }
792
793 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
794 let ws = self
795 .ws_client
796 .as_ref()
797 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
798 .clone();
799 let instrument_id = cmd.instrument_id;
800
801 let interval = cmd
803 .params
804 .as_ref()
805 .and_then(|p| p.get("interval"))
806 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
807
808 log::info!(
809 "Unsubscribing from book deltas for {} (interval: {})",
810 instrument_id,
811 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
812 );
813
814 get_runtime().spawn(async move {
815 if let Err(e) = ws.unsubscribe_book(instrument_id, interval).await {
816 log::error!("Failed to unsubscribe from book deltas for {instrument_id}: {e}");
817 }
818 });
819
820 Ok(())
821 }
822
823 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
824 let ws = self
825 .ws_client
826 .as_ref()
827 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
828 .clone();
829 let instrument_id = cmd.instrument_id;
830
831 let interval = cmd
833 .params
834 .as_ref()
835 .and_then(|p| p.get("interval"))
836 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
837
838 let group = cmd
840 .params
841 .as_ref()
842 .and_then(|p| p.get("group"))
843 .map_or("none", String::as_str)
844 .to_string();
845
846 log::info!(
847 "Unsubscribing from book depth10 for {} (group: {}, interval: {})",
848 instrument_id,
849 group,
850 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
851 );
852
853 get_runtime().spawn(async move {
854 if let Err(e) = ws
855 .unsubscribe_book_grouped(instrument_id, &group, 10, interval)
856 .await
857 {
858 log::error!("Failed to unsubscribe from book depth10 for {instrument_id}: {e}");
859 }
860 });
861
862 Ok(())
863 }
864
865 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
866 let ws = self
867 .ws_client
868 .as_ref()
869 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
870 .clone();
871 let instrument_id = cmd.instrument_id;
872
873 log::info!("Unsubscribing from quotes for {instrument_id}");
874
875 get_runtime().spawn(async move {
876 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
877 log::error!("Failed to unsubscribe from quotes for {instrument_id}: {e}");
878 }
879 });
880
881 Ok(())
882 }
883
884 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
885 let ws = self
886 .ws_client
887 .as_ref()
888 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
889 .clone();
890 let instrument_id = cmd.instrument_id;
891
892 let interval = cmd
893 .params
894 .as_ref()
895 .and_then(|p| p.get("interval"))
896 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
897
898 log::info!(
899 "Unsubscribing from trades for {} (interval: {})",
900 instrument_id,
901 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
902 );
903
904 get_runtime().spawn(async move {
905 if let Err(e) = ws.unsubscribe_trades(instrument_id, interval).await {
906 log::error!("Failed to unsubscribe from trades for {instrument_id}: {e}");
907 }
908 });
909
910 Ok(())
911 }
912
913 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
914 let ws = self
915 .ws_client
916 .as_ref()
917 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
918 .clone();
919 let instrument_id = cmd.instrument_id;
920
921 let interval = cmd
922 .params
923 .as_ref()
924 .and_then(|p| p.get("interval"))
925 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
926
927 log::info!(
928 "Unsubscribing from mark prices for {} (via ticker channel, interval: {})",
929 instrument_id,
930 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
931 );
932
933 get_runtime().spawn(async move {
934 if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
935 log::error!("Failed to unsubscribe from mark prices for {instrument_id}: {e}");
936 }
937 });
938
939 Ok(())
940 }
941
942 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
943 let ws = self
944 .ws_client
945 .as_ref()
946 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
947 .clone();
948 let instrument_id = cmd.instrument_id;
949
950 let interval = cmd
951 .params
952 .as_ref()
953 .and_then(|p| p.get("interval"))
954 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
955
956 log::info!(
957 "Unsubscribing from index prices for {} (via ticker channel, interval: {})",
958 instrument_id,
959 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
960 );
961
962 get_runtime().spawn(async move {
963 if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
964 log::error!("Failed to unsubscribe from index prices for {instrument_id}: {e}");
965 }
966 });
967
968 Ok(())
969 }
970
971 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
972 let instrument_id = cmd.instrument_id;
973
974 let is_perpetual = self
976 .instruments
977 .read()
978 .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
979 .get(&instrument_id)
980 .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
981
982 if !is_perpetual {
983 log::warn!(
984 "Funding rates unsubscription rejected for {instrument_id}: only available for perpetual instruments."
985 );
986 return Ok(());
987 }
988
989 let ws = self
990 .ws_client
991 .as_ref()
992 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
993 .clone();
994
995 let interval = cmd
996 .params
997 .as_ref()
998 .and_then(|p| p.get("interval"))
999 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
1000
1001 log::info!(
1002 "Unsubscribing from funding rates for {} (perpetual channel, interval: {})",
1003 instrument_id,
1004 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1005 );
1006
1007 get_runtime().spawn(async move {
1008 if let Err(e) = ws
1009 .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
1010 .await
1011 {
1012 log::error!("Failed to unsubscribe from funding rates for {instrument_id}: {e}");
1013 }
1014 });
1015
1016 Ok(())
1017 }
1018
1019 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1020 let ws = self
1021 .ws_client
1022 .as_ref()
1023 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1024 .clone();
1025 let instrument_id = cmd.bar_type.instrument_id();
1026 let resolution = bar_spec_to_resolution(&cmd.bar_type);
1027
1028 get_runtime().spawn(async move {
1029 if let Err(e) = ws.unsubscribe_chart(instrument_id, &resolution).await {
1030 log::error!("Failed to unsubscribe from bars for {instrument_id}: {e}");
1031 }
1032 });
1033
1034 Ok(())
1035 }
1036
1037 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
1038 if request.start.is_some() {
1039 log::warn!(
1040 "Requesting instruments for {:?} with specified `start` which has no effect",
1041 request.venue
1042 );
1043 }
1044 if request.end.is_some() {
1045 log::warn!(
1046 "Requesting instruments for {:?} with specified `end` which has no effect",
1047 request.venue
1048 );
1049 }
1050
1051 let http_client = self.http_client.clone();
1052 let instruments_cache = Arc::clone(&self.instruments);
1053 let sender = self.data_sender.clone();
1054 let request_id = request.request_id;
1055 let client_id = request.client_id.unwrap_or(self.client_id);
1056 let start_nanos = datetime_to_unix_nanos(request.start);
1057 let end_nanos = datetime_to_unix_nanos(request.end);
1058 let params = request.params.clone();
1059 let clock = self.clock;
1060 let venue = *DERIBIT_VENUE;
1061
1062 let instrument_kinds = if self.config.instrument_kinds.is_empty() {
1064 vec![crate::http::models::DeribitInstrumentKind::Future]
1065 } else {
1066 self.config.instrument_kinds.clone()
1067 };
1068
1069 get_runtime().spawn(async move {
1070 let mut all_instruments = Vec::new();
1071 for kind in &instrument_kinds {
1072 log::debug!("Requesting instruments for currency=ANY, kind={kind:?}");
1073
1074 match http_client
1075 .request_instruments(DeribitCurrency::ANY, Some(*kind))
1076 .await
1077 {
1078 Ok(instruments) => {
1079 log::info!(
1080 "Fetched {} instruments for ANY/{:?}",
1081 instruments.len(),
1082 kind
1083 );
1084
1085 for instrument in instruments {
1086 {
1088 match instruments_cache.write() {
1089 Ok(mut guard) => {
1090 guard.insert(instrument.id(), instrument.clone());
1091 }
1092 Err(e) => {
1093 log::error!(
1094 "Instrument cache lock poisoned: {e}, skipping cache update"
1095 );
1096 }
1097 }
1098 }
1099
1100 all_instruments.push(instrument);
1101 }
1102 }
1103 Err(e) => {
1104 log::error!("Failed to fetch instruments for ANY/{kind:?}: {e:?}");
1105 }
1106 }
1107 }
1108
1109 let response = DataResponse::Instruments(InstrumentsResponse::new(
1111 request_id,
1112 client_id,
1113 venue,
1114 all_instruments,
1115 start_nanos,
1116 end_nanos,
1117 clock.get_time_ns(),
1118 params,
1119 ));
1120
1121 if let Err(e) = sender.send(DataEvent::Response(response)) {
1122 log::error!("Failed to send instruments response: {e}");
1123 }
1124 });
1125
1126 Ok(())
1127 }
1128
1129 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
1130 if request.start.is_some() {
1131 log::warn!(
1132 "Requesting instrument {} with specified `start` which has no effect",
1133 request.instrument_id
1134 );
1135 }
1136 if request.end.is_some() {
1137 log::warn!(
1138 "Requesting instrument {} with specified `end` which has no effect",
1139 request.instrument_id
1140 );
1141 }
1142
1143 if let Some(instrument) = self
1145 .instruments
1146 .read()
1147 .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
1148 .get(&request.instrument_id)
1149 .cloned()
1150 {
1151 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1152 request.request_id,
1153 request.client_id.unwrap_or(self.client_id),
1154 instrument.id(),
1155 instrument,
1156 datetime_to_unix_nanos(request.start),
1157 datetime_to_unix_nanos(request.end),
1158 self.clock.get_time_ns(),
1159 request.params.clone(),
1160 )));
1161
1162 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1163 log::error!("Failed to send instrument response: {e}");
1164 }
1165 return Ok(());
1166 }
1167
1168 log::debug!(
1169 "Instrument {} not in cache, fetching from API",
1170 request.instrument_id
1171 );
1172
1173 let http_client = self.http_client.clone();
1174 let instruments_cache = Arc::clone(&self.instruments);
1175 let sender = self.data_sender.clone();
1176 let instrument_id = request.instrument_id;
1177 let request_id = request.request_id;
1178 let client_id = request.client_id.unwrap_or(self.client_id);
1179 let start_nanos = datetime_to_unix_nanos(request.start);
1180 let end_nanos = datetime_to_unix_nanos(request.end);
1181 let params = request.params.clone();
1182 let clock = self.clock;
1183
1184 get_runtime().spawn(async move {
1185 match http_client
1186 .request_instrument(instrument_id)
1187 .await
1188 .context("failed to request instrument from Deribit")
1189 {
1190 Ok(instrument) => {
1191 log::info!("Successfully fetched instrument: {instrument_id}");
1192
1193 {
1195 let mut guard = instruments_cache
1196 .write()
1197 .expect("instrument cache lock poisoned");
1198 guard.insert(instrument.id(), instrument.clone());
1199 }
1200
1201 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1203 request_id,
1204 client_id,
1205 instrument.id(),
1206 instrument,
1207 start_nanos,
1208 end_nanos,
1209 clock.get_time_ns(),
1210 params,
1211 )));
1212
1213 if let Err(e) = sender.send(DataEvent::Response(response)) {
1214 log::error!("Failed to send instrument response: {e}");
1215 }
1216 }
1217 Err(e) => {
1218 log::error!("Instrument request failed for {instrument_id}: {e:?}");
1219 }
1220 }
1221 });
1222
1223 Ok(())
1224 }
1225
1226 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1227 let http_client = self.http_client.clone();
1228 let sender = self.data_sender.clone();
1229 let instrument_id = request.instrument_id;
1230 let start = request.start;
1231 let end = request.end;
1232 let limit = request.limit.map(|n| n.get() as u32);
1233 let request_id = request.request_id;
1234 let client_id = request.client_id.unwrap_or(self.client_id);
1235 let params = request.params.clone();
1236 let clock = self.clock;
1237 let start_nanos = datetime_to_unix_nanos(start);
1238 let end_nanos = datetime_to_unix_nanos(end);
1239
1240 get_runtime().spawn(async move {
1241 match http_client
1242 .request_trades(instrument_id, start, end, limit)
1243 .await
1244 .context("failed to request trades from Deribit")
1245 {
1246 Ok(trades) => {
1247 let response = DataResponse::Trades(TradesResponse::new(
1248 request_id,
1249 client_id,
1250 instrument_id,
1251 trades,
1252 start_nanos,
1253 end_nanos,
1254 clock.get_time_ns(),
1255 params,
1256 ));
1257 if let Err(e) = sender.send(DataEvent::Response(response)) {
1258 log::error!("Failed to send trades response: {e}");
1259 }
1260 }
1261 Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
1262 }
1263 });
1264
1265 Ok(())
1266 }
1267
1268 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1269 let http_client = self.http_client.clone();
1270 let sender = self.data_sender.clone();
1271 let bar_type = request.bar_type;
1272 let start = request.start;
1273 let end = request.end;
1274 let limit = request.limit.map(|n| n.get() as u32);
1275 let request_id = request.request_id;
1276 let client_id = request.client_id.unwrap_or(self.client_id);
1277 let params = request.params.clone();
1278 let clock = self.clock;
1279 let start_nanos = datetime_to_unix_nanos(start);
1280 let end_nanos = datetime_to_unix_nanos(end);
1281
1282 get_runtime().spawn(async move {
1283 match http_client
1284 .request_bars(bar_type, start, end, limit)
1285 .await
1286 .context("failed to request bars from Deribit")
1287 {
1288 Ok(bars) => {
1289 let response = DataResponse::Bars(BarsResponse::new(
1290 request_id,
1291 client_id,
1292 bar_type,
1293 bars,
1294 start_nanos,
1295 end_nanos,
1296 clock.get_time_ns(),
1297 params,
1298 ));
1299 if let Err(e) = sender.send(DataEvent::Response(response)) {
1300 log::error!("Failed to send bars response: {e}");
1301 }
1302 }
1303 Err(e) => log::error!("Bars request failed for {bar_type}: {e:?}"),
1304 }
1305 });
1306
1307 Ok(())
1308 }
1309
1310 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
1311 let http_client = self.http_client.clone();
1312 let sender = self.data_sender.clone();
1313 let instrument_id = request.instrument_id;
1314 let depth = request.depth.map(|n| n.get() as u32);
1315 let request_id = request.request_id;
1316 let client_id = request.client_id.unwrap_or(self.client_id);
1317 let params = request.params.clone();
1318 let clock = self.clock;
1319
1320 get_runtime().spawn(async move {
1321 match http_client
1322 .request_book_snapshot(instrument_id, depth)
1323 .await
1324 .context("failed to request book snapshot from Deribit")
1325 {
1326 Ok(book) => {
1327 let response = DataResponse::Book(BookResponse::new(
1328 request_id,
1329 client_id,
1330 instrument_id,
1331 book,
1332 None,
1333 None,
1334 clock.get_time_ns(),
1335 params,
1336 ));
1337 if let Err(e) = sender.send(DataEvent::Response(response)) {
1338 log::error!("Failed to send book snapshot response: {e}");
1339 }
1340 }
1341 Err(e) => {
1342 log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
1343 }
1344 }
1345 });
1346
1347 Ok(())
1348 }
1349}