1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28 clients::DataClient,
29 live::{runner::get_data_event_sender, runtime::get_runtime},
30 log_info,
31 messages::{
32 DataEvent, DataResponse,
33 data::{
34 BarsResponse, BookResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
36 SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates,
37 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
38 SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
40 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstruments,
41 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
42 },
43 },
44};
45use nautilus_core::{
46 datetime::datetime_to_unix_nanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50 data::{Data, OrderBookDeltas_API},
51 enums::BookType,
52 identifiers::{ClientId, InstrumentId, Venue},
53 instruments::{Instrument, InstrumentAny},
54};
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59 common::{
60 consts::{
61 DERIBIT_BOOK_DEFAULT_DEPTH, DERIBIT_BOOK_DEFAULT_GROUP, DERIBIT_BOOK_VALID_DEPTHS,
62 DERIBIT_VENUE,
63 },
64 parse::{bar_spec_to_resolution, parse_instrument_kind_currency},
65 },
66 config::DeribitDataClientConfig,
67 http::{
68 client::DeribitHttpClient,
69 models::{DeribitCurrency, DeribitProductType},
70 },
71 websocket::{
72 auth::DERIBIT_DATA_SESSION_NAME, client::DeribitWebSocketClient,
73 enums::DeribitUpdateInterval, messages::NautilusWsMessage,
74 },
75};
76
77#[derive(Debug)]
79pub struct DeribitDataClient {
80 client_id: ClientId,
81 config: DeribitDataClientConfig,
82 http_client: DeribitHttpClient,
83 ws_client: Option<DeribitWebSocketClient>,
84 is_connected: AtomicBool,
85 cancellation_token: CancellationToken,
86 tasks: Vec<JoinHandle<()>>,
87 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
88 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
89 clock: &'static AtomicTime,
90}
91
92impl DeribitDataClient {
93 pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
99 let clock = get_atomic_clock_realtime();
100 let data_sender = get_data_event_sender();
101
102 let http_client = if config.has_api_credentials() {
103 DeribitHttpClient::new_with_env(
104 config.api_key.clone(),
105 config.api_secret.clone(),
106 config.use_testnet,
107 config.http_timeout_secs,
108 config.max_retries,
109 config.retry_delay_initial_ms,
110 config.retry_delay_max_ms,
111 None, )?
113 } else {
114 DeribitHttpClient::new(
115 config.base_url_http.clone(),
116 config.use_testnet,
117 config.http_timeout_secs,
118 config.max_retries,
119 config.retry_delay_initial_ms,
120 config.retry_delay_max_ms,
121 None, )?
123 };
124
125 let ws_client = DeribitWebSocketClient::new(
126 Some(config.ws_url()),
127 config.api_key.clone(),
128 config.api_secret.clone(),
129 config.heartbeat_interval_secs,
130 config.use_testnet,
131 )?;
132
133 Ok(Self {
134 client_id,
135 config,
136 http_client,
137 ws_client: Some(ws_client),
138 is_connected: AtomicBool::new(false),
139 cancellation_token: CancellationToken::new(),
140 tasks: Vec::new(),
141 data_sender,
142 instruments: Arc::new(RwLock::new(AHashMap::new())),
143 clock,
144 })
145 }
146
147 fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
149 self.ws_client
150 .as_mut()
151 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
152 }
153
154 fn get_interval(
159 &self,
160 params: &Option<indexmap::IndexMap<String, String>>,
161 ) -> Option<DeribitUpdateInterval> {
162 if let Some(interval) = params
163 .as_ref()
164 .and_then(|p| p.get("interval"))
165 .and_then(|v| v.parse::<DeribitUpdateInterval>().ok())
166 {
167 return Some(interval);
168 }
169
170 if let Some(ws) = self.ws_client.as_ref()
172 && ws.is_authenticated()
173 {
174 return Some(DeribitUpdateInterval::Raw);
175 }
176 None
177 }
178
179 fn spawn_stream_task(
181 &mut self,
182 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
183 ) -> anyhow::Result<()> {
184 let data_sender = self.data_sender.clone();
185 let instruments = Arc::clone(&self.instruments);
186 let cancellation = self.cancellation_token.clone();
187
188 let handle = get_runtime().spawn(async move {
189 tokio::pin!(stream);
190
191 loop {
192 tokio::select! {
193 maybe_msg = stream.next() => {
194 match maybe_msg {
195 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
196 None => {
197 log::debug!("WebSocket stream ended");
198 break;
199 }
200 }
201 }
202 () = cancellation.cancelled() => {
203 log::debug!("WebSocket stream task cancelled");
204 break;
205 }
206 }
207 }
208 });
209
210 self.tasks.push(handle);
211 Ok(())
212 }
213
214 fn handle_ws_message(
216 message: NautilusWsMessage,
217 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
218 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
219 ) {
220 match message {
221 NautilusWsMessage::Data(payloads) => {
222 for data in payloads {
223 Self::send_data(sender, data);
224 }
225 }
226 NautilusWsMessage::Deltas(deltas) => {
227 Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
228 }
229 NautilusWsMessage::Instrument(instrument) => {
230 let instrument_any = *instrument;
231 if let Ok(mut guard) = instruments.write() {
232 let instrument_id = instrument_any.id();
233 guard.insert(instrument_id, instrument_any.clone());
234 drop(guard);
235
236 if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
237 log::warn!("Failed to send instrument update: {e}");
238 }
239 } else {
240 log::error!("Instrument cache lock poisoned, skipping instrument update");
241 }
242 }
243 NautilusWsMessage::Error(e) => {
244 log::error!("WebSocket error: {e:?}");
245 }
246 NautilusWsMessage::Raw(value) => {
247 log::debug!("Unhandled raw message: {value}");
248 }
249 NautilusWsMessage::Reconnected => {
250 log::info!("WebSocket reconnected");
251 }
252 NautilusWsMessage::Authenticated(auth) => {
253 log::debug!("WebSocket authenticated: expires_in={}s", auth.expires_in);
254 }
255 NautilusWsMessage::FundingRates(funding_rates) => {
256 log::info!(
257 "Received {} funding rate update(s) from WebSocket",
258 funding_rates.len()
259 );
260 for funding_rate in funding_rates {
261 log::debug!("Sending funding rate: {funding_rate:?}");
262 if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
263 log::error!("Failed to send funding rate: {e}");
264 }
265 }
266 }
267 NautilusWsMessage::OrderStatusReports(reports) => {
268 log::warn!(
269 "Data client received OrderStatusReports message (should be handled by execution client): {} reports",
270 reports.len()
271 );
272 }
273 NautilusWsMessage::FillReports(reports) => {
274 log::warn!(
275 "Data client received FillReports message (should be handled by execution client): {} reports",
276 reports.len()
277 );
278 }
279 NautilusWsMessage::OrderRejected(order) => {
280 log::warn!(
281 "Data client received OrderRejected message (should be handled by execution client): {order:?}"
282 );
283 }
284 NautilusWsMessage::OrderAccepted(order) => {
285 log::warn!(
286 "Data client received OrderAccepted message (should be handled by execution client): {order:?}"
287 );
288 }
289 NautilusWsMessage::OrderCanceled(order) => {
290 log::warn!(
291 "Data client received OrderCanceled message (should be handled by execution client): {order:?}"
292 );
293 }
294 NautilusWsMessage::OrderExpired(order) => {
295 log::warn!(
296 "Data client received OrderExpired message (should be handled by execution client): {order:?}"
297 );
298 }
299 NautilusWsMessage::OrderUpdated(order) => {
300 log::warn!(
301 "Data client received OrderUpdated message (should be handled by execution client): {order:?}"
302 );
303 }
304 NautilusWsMessage::OrderCancelRejected(order) => {
305 log::warn!(
306 "Data client received OrderCancelRejected message (should be handled by execution client): {order:?}"
307 );
308 }
309 NautilusWsMessage::OrderModifyRejected(order) => {
310 log::warn!(
311 "Data client received OrderModifyRejected message (should be handled by execution client): {order:?}"
312 );
313 }
314 NautilusWsMessage::AccountState(state) => {
315 log::warn!(
316 "Data client received AccountState message (should be handled by execution client): {state:?}"
317 );
318 }
319 }
320 }
321
322 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
324 if let Err(e) = sender.send(DataEvent::Data(data)) {
325 log::error!("Failed to send data: {e}");
326 }
327 }
328}
329
330#[async_trait(?Send)]
331impl DataClient for DeribitDataClient {
332 fn client_id(&self) -> ClientId {
333 self.client_id
334 }
335
336 fn venue(&self) -> Option<Venue> {
337 Some(*DERIBIT_VENUE)
338 }
339
340 fn start(&mut self) -> anyhow::Result<()> {
341 log::info!(
342 "Starting data client: client_id={}, use_testnet={}",
343 self.client_id,
344 self.config.use_testnet
345 );
346 Ok(())
347 }
348
349 fn stop(&mut self) -> anyhow::Result<()> {
350 log::info!("Stopping data client: {}", self.client_id);
351 self.cancellation_token.cancel();
352 self.is_connected.store(false, Ordering::Relaxed);
353 Ok(())
354 }
355
356 fn reset(&mut self) -> anyhow::Result<()> {
357 log::info!("Resetting data client: {}", self.client_id);
358 self.is_connected.store(false, Ordering::Relaxed);
359 self.cancellation_token = CancellationToken::new();
360 self.tasks.clear();
361 if let Ok(mut instruments) = self.instruments.write() {
362 instruments.clear();
363 }
364 Ok(())
365 }
366
367 fn dispose(&mut self) -> anyhow::Result<()> {
368 log::info!("Disposing data client: {}", self.client_id);
369 self.stop()
370 }
371
372 fn is_connected(&self) -> bool {
373 self.is_connected.load(Ordering::SeqCst)
374 }
375
376 fn is_disconnected(&self) -> bool {
377 !self.is_connected()
378 }
379
380 async fn connect(&mut self) -> anyhow::Result<()> {
381 if self.is_connected() {
382 return Ok(());
383 }
384
385 let product_types = if self.config.product_types.is_empty() {
387 vec![DeribitProductType::Future]
388 } else {
389 self.config.product_types.clone()
390 };
391
392 let mut all_instruments = Vec::new();
393 for product_type in &product_types {
394 let fetched = self
395 .http_client
396 .request_instruments(DeribitCurrency::ANY, Some(*product_type))
397 .await
398 .with_context(|| format!("failed to request instruments for {product_type:?}"))?;
399
400 self.http_client.cache_instruments(fetched.clone());
402
403 let mut guard = self
405 .instruments
406 .write()
407 .map_err(|e| anyhow::anyhow!("{e}"))?;
408 for instrument in &fetched {
409 guard.insert(instrument.id(), instrument.clone());
410 }
411 drop(guard);
412
413 all_instruments.extend(fetched);
414 }
415
416 log::info!(
417 "Cached instruments: client_id={}, total={}",
418 self.client_id,
419 all_instruments.len()
420 );
421
422 for instrument in &all_instruments {
423 if let Err(e) = self
424 .data_sender
425 .send(DataEvent::Instrument(instrument.clone()))
426 {
427 log::warn!("Failed to send instrument: {e}");
428 }
429 }
430
431 let ws = self.ws_client_mut()?;
433 ws.cache_instruments(all_instruments);
434
435 ws.connect().await.context("failed to connect WebSocket")?;
437 ws.wait_until_active(10.0)
438 .await
439 .context("WebSocket failed to become active")?;
440
441 if ws.has_credentials() {
443 ws.authenticate_session(DERIBIT_DATA_SESSION_NAME)
444 .await
445 .context("failed to authenticate WebSocket")?;
446 log_info!("WebSocket authenticated");
447 }
448
449 let stream = self.ws_client_mut()?.stream();
451 self.spawn_stream_task(stream)?;
452
453 self.is_connected.store(true, Ordering::Release);
454 let network = if self.config.use_testnet {
455 "testnet"
456 } else {
457 "mainnet"
458 };
459 log_info!("Connected ({})", network);
460 Ok(())
461 }
462
463 async fn disconnect(&mut self) -> anyhow::Result<()> {
464 if self.is_disconnected() {
465 return Ok(());
466 }
467
468 self.cancellation_token.cancel();
470
471 if let Some(ws) = self.ws_client.as_ref()
473 && let Err(e) = ws.close().await
474 {
475 log::warn!("Error while closing WebSocket: {e:?}");
476 }
477
478 for handle in self.tasks.drain(..) {
480 if let Err(e) = handle.await {
481 log::error!("Error joining WebSocket task: {e:?}");
482 }
483 }
484
485 self.cancellation_token = CancellationToken::new();
487 self.is_connected.store(false, Ordering::Relaxed);
488
489 log_info!("Disconnected");
490 Ok(())
491 }
492
493 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
494 let kind = cmd
496 .params
497 .as_ref()
498 .and_then(|p| p.get("kind"))
499 .map_or("any", |s| s.as_str())
500 .to_string();
501 let currency = cmd
502 .params
503 .as_ref()
504 .and_then(|p| p.get("currency"))
505 .map_or("any", |s| s.as_str())
506 .to_string();
507
508 let ws = self
509 .ws_client
510 .as_ref()
511 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
512 .clone();
513
514 log::info!("Subscribing to instrument state changes for {kind}.{currency}");
515
516 get_runtime().spawn(async move {
517 if let Err(e) = ws.subscribe_instrument_state(&kind, ¤cy).await {
518 log::error!("Failed to subscribe to instrument state for {kind}.{currency}: {e}");
519 }
520 });
521
522 Ok(())
523 }
524
525 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
526 let instrument_id = cmd.instrument_id;
527
528 let guard = self
530 .instruments
531 .read()
532 .map_err(|e| anyhow::anyhow!("{e}"))?;
533 if !guard.contains_key(&instrument_id) {
534 log::warn!(
535 "Instrument {instrument_id} not in cache - it may have been created after connect()"
536 );
537 }
538 drop(guard);
539
540 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
542
543 let ws = self
544 .ws_client
545 .as_ref()
546 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
547 .clone();
548
549 log::info!(
550 "Subscribing to instrument state for {instrument_id} (channel: {kind}.{currency})"
551 );
552
553 get_runtime().spawn(async move {
555 if let Err(e) = ws.subscribe_instrument_state(&kind, ¤cy).await {
556 log::error!("Failed to subscribe to instrument state for {instrument_id}: {e}");
557 }
558 });
559
560 Ok(())
561 }
562
563 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
564 if cmd.book_type != BookType::L2_MBP {
565 anyhow::bail!("Deribit only supports L2_MBP order book deltas");
566 }
567
568 let ws = self
569 .ws_client
570 .as_ref()
571 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
572 .clone();
573 let instrument_id = cmd.instrument_id;
574 let interval = self.get_interval(&cmd.params);
575
576 let depth = cmd
577 .depth
578 .map(|d| d.get() as u32)
579 .or_else(|| {
580 cmd.params
581 .as_ref()
582 .and_then(|p| p.get("depth"))
583 .and_then(|v| v.parse::<u32>().ok())
584 })
585 .unwrap_or(DERIBIT_BOOK_DEFAULT_DEPTH);
586
587 if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
588 anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
589 }
590
591 let group = cmd
592 .params
593 .as_ref()
594 .and_then(|p| p.get("group"))
595 .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
596 .to_string();
597
598 log::info!(
599 "Subscribing to book deltas for {} (group: {}, depth: {}, interval: {}, book_type: {:?})",
600 instrument_id,
601 group,
602 depth,
603 interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
604 cmd.book_type
605 );
606
607 get_runtime().spawn(async move {
608 let result = if interval == Some(DeribitUpdateInterval::Raw) {
609 ws.subscribe_book(instrument_id, interval).await
610 } else {
611 ws.subscribe_book_grouped(instrument_id, &group, depth, interval)
612 .await
613 };
614
615 if let Err(e) = result {
616 log::error!("Failed to subscribe to book deltas for {instrument_id}: {e}");
617 }
618 });
619
620 Ok(())
621 }
622
623 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
624 if cmd.book_type != BookType::L2_MBP {
625 anyhow::bail!("Deribit only supports L2_MBP order book depth");
626 }
627
628 let ws = self
629 .ws_client
630 .as_ref()
631 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
632 .clone();
633 let instrument_id = cmd.instrument_id;
634 let interval = self.get_interval(&cmd.params);
635 let group = cmd
636 .params
637 .as_ref()
638 .and_then(|p| p.get("group"))
639 .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
640 .to_string();
641
642 log::info!(
643 "Subscribing to book depth10 for {} (group: {}, interval: {}, book_type: {:?})",
644 instrument_id,
645 group,
646 interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
647 cmd.book_type
648 );
649
650 get_runtime().spawn(async move {
651 if let Err(e) = ws
652 .subscribe_book_grouped(instrument_id, &group, 10, interval)
653 .await
654 {
655 log::error!("Failed to subscribe to book depth10 for {instrument_id}: {e}");
656 }
657 });
658
659 Ok(())
660 }
661
662 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
663 let ws = self
664 .ws_client
665 .as_ref()
666 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
667 .clone();
668 let instrument_id = cmd.instrument_id;
669
670 log::info!("Subscribing to quotes for {instrument_id}");
671
672 get_runtime().spawn(async move {
673 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
674 log::error!("Failed to subscribe to quotes for {instrument_id}: {e}");
675 }
676 });
677
678 Ok(())
679 }
680
681 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
682 let ws = self
683 .ws_client
684 .as_ref()
685 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
686 .clone();
687 let instrument_id = cmd.instrument_id;
688 let interval = self.get_interval(&cmd.params);
689
690 log::info!(
691 "Subscribing to trades for {} (interval: {})",
692 instrument_id,
693 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
694 );
695
696 get_runtime().spawn(async move {
697 if let Err(e) = ws.subscribe_trades(instrument_id, interval).await {
698 log::error!("Failed to subscribe to trades for {instrument_id}: {e}");
699 }
700 });
701
702 Ok(())
703 }
704
705 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
706 let ws = self
707 .ws_client
708 .as_ref()
709 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
710 .clone();
711 let instrument_id = cmd.instrument_id;
712 let interval = self.get_interval(&cmd.params);
713
714 log::info!(
715 "Subscribing to mark prices for {} (via ticker channel, interval: {})",
716 instrument_id,
717 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
718 );
719
720 get_runtime().spawn(async move {
721 if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
722 log::error!("Failed to subscribe to mark prices for {instrument_id}: {e}");
723 }
724 });
725
726 Ok(())
727 }
728
729 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
730 let ws = self
731 .ws_client
732 .as_ref()
733 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
734 .clone();
735 let instrument_id = cmd.instrument_id;
736 let interval = self.get_interval(&cmd.params);
737
738 log::info!(
739 "Subscribing to index prices for {} (via ticker channel, interval: {})",
740 instrument_id,
741 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
742 );
743
744 get_runtime().spawn(async move {
745 if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
746 log::error!("Failed to subscribe to index prices for {instrument_id}: {e}");
747 }
748 });
749
750 Ok(())
751 }
752
753 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
754 let ws = self
755 .ws_client
756 .as_ref()
757 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
758 .clone();
759 let instrument_id = cmd.bar_type.instrument_id();
760 let resolution = bar_spec_to_resolution(&cmd.bar_type);
761
762 get_runtime().spawn(async move {
763 if let Err(e) = ws.subscribe_chart(instrument_id, &resolution).await {
764 log::error!("Failed to subscribe to bars for {instrument_id}: {e}");
765 }
766 });
767
768 Ok(())
769 }
770
771 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
772 let instrument_id = cmd.instrument_id;
773
774 let is_perpetual = self
776 .instruments
777 .read()
778 .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
779 .get(&instrument_id)
780 .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
781
782 if !is_perpetual {
783 log::warn!(
784 "Funding rates subscription rejected for {instrument_id}: only available for perpetual instruments"
785 );
786 return Ok(());
787 }
788
789 let ws = self
790 .ws_client
791 .as_ref()
792 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
793 .clone();
794 let interval = self.get_interval(&cmd.params);
795
796 log::info!(
797 "Subscribing to funding rates for {} (perpetual channel, interval: {})",
798 instrument_id,
799 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
800 );
801
802 get_runtime().spawn(async move {
803 if let Err(e) = ws
804 .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
805 .await
806 {
807 log::error!("Failed to subscribe to funding rates for {instrument_id}: {e}");
808 }
809 });
810
811 Ok(())
812 }
813
814 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
815 let kind = cmd
816 .params
817 .as_ref()
818 .and_then(|p| p.get("kind"))
819 .map_or("any", |s| s.as_str())
820 .to_string();
821 let currency = cmd
822 .params
823 .as_ref()
824 .and_then(|p| p.get("currency"))
825 .map_or("any", |s| s.as_str())
826 .to_string();
827
828 let ws = self
829 .ws_client
830 .as_ref()
831 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
832 .clone();
833
834 log::info!("Unsubscribing from instrument state changes for {kind}.{currency}");
835
836 get_runtime().spawn(async move {
837 if let Err(e) = ws.unsubscribe_instrument_state(&kind, ¤cy).await {
838 log::error!(
839 "Failed to unsubscribe from instrument state for {kind}.{currency}: {e}"
840 );
841 }
842 });
843
844 Ok(())
845 }
846
847 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
848 let instrument_id = cmd.instrument_id;
849
850 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
852
853 let ws = self
854 .ws_client
855 .as_ref()
856 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
857 .clone();
858
859 log::info!(
860 "Unsubscribing from instrument state for {instrument_id} (channel: {kind}.{currency})"
861 );
862
863 get_runtime().spawn(async move {
864 if let Err(e) = ws.unsubscribe_instrument_state(&kind, ¤cy).await {
865 log::error!("Failed to unsubscribe from instrument state for {instrument_id}: {e}");
866 }
867 });
868
869 Ok(())
870 }
871
872 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
873 let ws = self
874 .ws_client
875 .as_ref()
876 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
877 .clone();
878 let instrument_id = cmd.instrument_id;
879 let interval = self.get_interval(&cmd.params);
880
881 let depth = cmd
882 .params
883 .as_ref()
884 .and_then(|p| p.get("depth"))
885 .and_then(|v| v.parse::<u32>().ok())
886 .unwrap_or(DERIBIT_BOOK_DEFAULT_DEPTH);
887
888 if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
889 anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
890 }
891
892 let group = cmd
893 .params
894 .as_ref()
895 .and_then(|p| p.get("group"))
896 .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
897 .to_string();
898
899 log::info!(
900 "Unsubscribing from book deltas for {} (group: {}, depth: {}, interval: {})",
901 instrument_id,
902 group,
903 depth,
904 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
905 );
906
907 get_runtime().spawn(async move {
908 let result = if interval == Some(DeribitUpdateInterval::Raw) {
909 ws.unsubscribe_book(instrument_id, interval).await
910 } else {
911 ws.unsubscribe_book_grouped(instrument_id, &group, depth, interval)
912 .await
913 };
914
915 if let Err(e) = result {
916 log::error!("Failed to unsubscribe from book deltas for {instrument_id}: {e}");
917 }
918 });
919
920 Ok(())
921 }
922
923 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
924 let ws = self
925 .ws_client
926 .as_ref()
927 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
928 .clone();
929 let instrument_id = cmd.instrument_id;
930 let interval = self.get_interval(&cmd.params);
931 let group = cmd
932 .params
933 .as_ref()
934 .and_then(|p| p.get("group"))
935 .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
936 .to_string();
937
938 log::info!(
939 "Unsubscribing from book depth10 for {} (group: {}, interval: {})",
940 instrument_id,
941 group,
942 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
943 );
944
945 get_runtime().spawn(async move {
946 if let Err(e) = ws
947 .unsubscribe_book_grouped(instrument_id, &group, 10, interval)
948 .await
949 {
950 log::error!("Failed to unsubscribe from book depth10 for {instrument_id}: {e}");
951 }
952 });
953
954 Ok(())
955 }
956
957 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
958 let ws = self
959 .ws_client
960 .as_ref()
961 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
962 .clone();
963 let instrument_id = cmd.instrument_id;
964
965 log::info!("Unsubscribing from quotes for {instrument_id}");
966
967 get_runtime().spawn(async move {
968 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
969 log::error!("Failed to unsubscribe from quotes for {instrument_id}: {e}");
970 }
971 });
972
973 Ok(())
974 }
975
976 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
977 let ws = self
978 .ws_client
979 .as_ref()
980 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
981 .clone();
982 let instrument_id = cmd.instrument_id;
983 let interval = self.get_interval(&cmd.params);
984
985 log::info!(
986 "Unsubscribing from trades for {} (interval: {})",
987 instrument_id,
988 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
989 );
990
991 get_runtime().spawn(async move {
992 if let Err(e) = ws.unsubscribe_trades(instrument_id, interval).await {
993 log::error!("Failed to unsubscribe from trades for {instrument_id}: {e}");
994 }
995 });
996
997 Ok(())
998 }
999
1000 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1001 let ws = self
1002 .ws_client
1003 .as_ref()
1004 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1005 .clone();
1006 let instrument_id = cmd.instrument_id;
1007 let interval = self.get_interval(&cmd.params);
1008
1009 log::info!(
1010 "Unsubscribing from mark prices for {} (via ticker channel, interval: {})",
1011 instrument_id,
1012 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1013 );
1014
1015 get_runtime().spawn(async move {
1016 if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1017 log::error!("Failed to unsubscribe from mark prices for {instrument_id}: {e}");
1018 }
1019 });
1020
1021 Ok(())
1022 }
1023
1024 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1025 let ws = self
1026 .ws_client
1027 .as_ref()
1028 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1029 .clone();
1030 let instrument_id = cmd.instrument_id;
1031 let interval = self.get_interval(&cmd.params);
1032
1033 log::info!(
1034 "Unsubscribing from index prices for {} (via ticker channel, interval: {})",
1035 instrument_id,
1036 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1037 );
1038
1039 get_runtime().spawn(async move {
1040 if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1041 log::error!("Failed to unsubscribe from index prices for {instrument_id}: {e}");
1042 }
1043 });
1044
1045 Ok(())
1046 }
1047
1048 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1049 let ws = self
1050 .ws_client
1051 .as_ref()
1052 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1053 .clone();
1054 let instrument_id = cmd.bar_type.instrument_id();
1055 let resolution = bar_spec_to_resolution(&cmd.bar_type);
1056
1057 get_runtime().spawn(async move {
1058 if let Err(e) = ws.unsubscribe_chart(instrument_id, &resolution).await {
1059 log::error!("Failed to unsubscribe from bars for {instrument_id}: {e}");
1060 }
1061 });
1062
1063 Ok(())
1064 }
1065
1066 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1067 let instrument_id = cmd.instrument_id;
1068
1069 let is_perpetual = self
1071 .instruments
1072 .read()
1073 .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
1074 .get(&instrument_id)
1075 .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
1076
1077 if !is_perpetual {
1078 log::warn!(
1079 "Funding rates unsubscription rejected for {instrument_id}: only available for perpetual instruments"
1080 );
1081 return Ok(());
1082 }
1083
1084 let ws = self
1085 .ws_client
1086 .as_ref()
1087 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1088 .clone();
1089 let interval = self.get_interval(&cmd.params);
1090
1091 log::info!(
1092 "Unsubscribing from funding rates for {} (perpetual channel, interval: {})",
1093 instrument_id,
1094 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1095 );
1096
1097 get_runtime().spawn(async move {
1098 if let Err(e) = ws
1099 .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
1100 .await
1101 {
1102 log::error!("Failed to unsubscribe from funding rates for {instrument_id}: {e}");
1103 }
1104 });
1105
1106 Ok(())
1107 }
1108
1109 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1110 if request.start.is_some() {
1111 log::warn!(
1112 "Requesting instruments for {:?} with specified `start` which has no effect",
1113 request.venue
1114 );
1115 }
1116 if request.end.is_some() {
1117 log::warn!(
1118 "Requesting instruments for {:?} with specified `end` which has no effect",
1119 request.venue
1120 );
1121 }
1122
1123 let http_client = self.http_client.clone();
1124 let instruments_cache = Arc::clone(&self.instruments);
1125 let sender = self.data_sender.clone();
1126 let request_id = request.request_id;
1127 let client_id = request.client_id.unwrap_or(self.client_id);
1128 let start_nanos = datetime_to_unix_nanos(request.start);
1129 let end_nanos = datetime_to_unix_nanos(request.end);
1130 let params = request.params;
1131 let clock = self.clock;
1132 let venue = *DERIBIT_VENUE;
1133
1134 let product_types = if self.config.product_types.is_empty() {
1136 vec![crate::http::models::DeribitProductType::Future]
1137 } else {
1138 self.config.product_types.clone()
1139 };
1140
1141 get_runtime().spawn(async move {
1142 let mut all_instruments = Vec::new();
1143 for product_type in &product_types {
1144 log::debug!("Requesting instruments for currency=ANY, product_type={product_type:?}");
1145
1146 match http_client
1147 .request_instruments(DeribitCurrency::ANY, Some(*product_type))
1148 .await
1149 {
1150 Ok(instruments) => {
1151 log::info!(
1152 "Fetched {} instruments for ANY/{:?}",
1153 instruments.len(),
1154 product_type
1155 );
1156
1157 for instrument in instruments {
1158 {
1160 match instruments_cache.write() {
1161 Ok(mut guard) => {
1162 guard.insert(instrument.id(), instrument.clone());
1163 }
1164 Err(e) => {
1165 log::error!(
1166 "Instrument cache lock poisoned: {e}, skipping cache update"
1167 );
1168 }
1169 }
1170 }
1171
1172 all_instruments.push(instrument);
1173 }
1174 }
1175 Err(e) => {
1176 log::error!("Failed to fetch instruments for ANY/{product_type:?}: {e:?}");
1177 }
1178 }
1179 }
1180
1181 let response = DataResponse::Instruments(InstrumentsResponse::new(
1183 request_id,
1184 client_id,
1185 venue,
1186 all_instruments,
1187 start_nanos,
1188 end_nanos,
1189 clock.get_time_ns(),
1190 params,
1191 ));
1192
1193 if let Err(e) = sender.send(DataEvent::Response(response)) {
1194 log::error!("Failed to send instruments response: {e}");
1195 }
1196 });
1197
1198 Ok(())
1199 }
1200
1201 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1202 if request.start.is_some() {
1203 log::warn!(
1204 "Requesting instrument {} with specified `start` which has no effect",
1205 request.instrument_id
1206 );
1207 }
1208 if request.end.is_some() {
1209 log::warn!(
1210 "Requesting instrument {} with specified `end` which has no effect",
1211 request.instrument_id
1212 );
1213 }
1214
1215 if let Some(instrument) = self
1217 .instruments
1218 .read()
1219 .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
1220 .get(&request.instrument_id)
1221 .cloned()
1222 {
1223 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1224 request.request_id,
1225 request.client_id.unwrap_or(self.client_id),
1226 instrument.id(),
1227 instrument,
1228 datetime_to_unix_nanos(request.start),
1229 datetime_to_unix_nanos(request.end),
1230 self.clock.get_time_ns(),
1231 request.params,
1232 )));
1233
1234 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1235 log::error!("Failed to send instrument response: {e}");
1236 }
1237 return Ok(());
1238 }
1239
1240 log::debug!(
1241 "Instrument {} not in cache, fetching from API",
1242 request.instrument_id
1243 );
1244
1245 let http_client = self.http_client.clone();
1246 let instruments_cache = Arc::clone(&self.instruments);
1247 let sender = self.data_sender.clone();
1248 let instrument_id = request.instrument_id;
1249 let request_id = request.request_id;
1250 let client_id = request.client_id.unwrap_or(self.client_id);
1251 let start_nanos = datetime_to_unix_nanos(request.start);
1252 let end_nanos = datetime_to_unix_nanos(request.end);
1253 let params = request.params;
1254 let clock = self.clock;
1255
1256 get_runtime().spawn(async move {
1257 match http_client
1258 .request_instrument(instrument_id)
1259 .await
1260 .context("failed to request instrument from Deribit")
1261 {
1262 Ok(instrument) => {
1263 log::info!("Successfully fetched instrument: {instrument_id}");
1264
1265 {
1267 let mut guard = instruments_cache
1268 .write()
1269 .expect("instrument cache lock poisoned");
1270 guard.insert(instrument.id(), instrument.clone());
1271 }
1272
1273 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1275 request_id,
1276 client_id,
1277 instrument.id(),
1278 instrument,
1279 start_nanos,
1280 end_nanos,
1281 clock.get_time_ns(),
1282 params,
1283 )));
1284
1285 if let Err(e) = sender.send(DataEvent::Response(response)) {
1286 log::error!("Failed to send instrument response: {e}");
1287 }
1288 }
1289 Err(e) => {
1290 log::error!("Instrument request failed for {instrument_id}: {e:?}");
1291 }
1292 }
1293 });
1294
1295 Ok(())
1296 }
1297
1298 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1299 let http_client = self.http_client.clone();
1300 let sender = self.data_sender.clone();
1301 let instrument_id = request.instrument_id;
1302 let start = request.start;
1303 let end = request.end;
1304 let limit = request.limit.map(|n| n.get() as u32);
1305 let request_id = request.request_id;
1306 let client_id = request.client_id.unwrap_or(self.client_id);
1307 let params = request.params;
1308 let clock = self.clock;
1309 let start_nanos = datetime_to_unix_nanos(start);
1310 let end_nanos = datetime_to_unix_nanos(end);
1311
1312 get_runtime().spawn(async move {
1313 match http_client
1314 .request_trades(instrument_id, start, end, limit)
1315 .await
1316 .context("failed to request trades from Deribit")
1317 {
1318 Ok(trades) => {
1319 let response = DataResponse::Trades(TradesResponse::new(
1320 request_id,
1321 client_id,
1322 instrument_id,
1323 trades,
1324 start_nanos,
1325 end_nanos,
1326 clock.get_time_ns(),
1327 params,
1328 ));
1329 if let Err(e) = sender.send(DataEvent::Response(response)) {
1330 log::error!("Failed to send trades response: {e}");
1331 }
1332 }
1333 Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
1334 }
1335 });
1336
1337 Ok(())
1338 }
1339
1340 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1341 let http_client = self.http_client.clone();
1342 let sender = self.data_sender.clone();
1343 let bar_type = request.bar_type;
1344 let start = request.start;
1345 let end = request.end;
1346 let limit = request.limit.map(|n| n.get() as u32);
1347 let request_id = request.request_id;
1348 let client_id = request.client_id.unwrap_or(self.client_id);
1349 let params = request.params;
1350 let clock = self.clock;
1351 let start_nanos = datetime_to_unix_nanos(start);
1352 let end_nanos = datetime_to_unix_nanos(end);
1353
1354 get_runtime().spawn(async move {
1355 match http_client
1356 .request_bars(bar_type, start, end, limit)
1357 .await
1358 .context("failed to request bars from Deribit")
1359 {
1360 Ok(bars) => {
1361 let response = DataResponse::Bars(BarsResponse::new(
1362 request_id,
1363 client_id,
1364 bar_type,
1365 bars,
1366 start_nanos,
1367 end_nanos,
1368 clock.get_time_ns(),
1369 params,
1370 ));
1371 if let Err(e) = sender.send(DataEvent::Response(response)) {
1372 log::error!("Failed to send bars response: {e}");
1373 }
1374 }
1375 Err(e) => log::error!("Bars request failed for {bar_type}: {e:?}"),
1376 }
1377 });
1378
1379 Ok(())
1380 }
1381
1382 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1383 let http_client = self.http_client.clone();
1384 let sender = self.data_sender.clone();
1385 let instrument_id = request.instrument_id;
1386 let depth = request.depth.map(|n| n.get() as u32);
1387 let request_id = request.request_id;
1388 let client_id = request.client_id.unwrap_or(self.client_id);
1389 let params = request.params;
1390 let clock = self.clock;
1391
1392 get_runtime().spawn(async move {
1393 match http_client
1394 .request_book_snapshot(instrument_id, depth)
1395 .await
1396 .context("failed to request book snapshot from Deribit")
1397 {
1398 Ok(book) => {
1399 let response = DataResponse::Book(BookResponse::new(
1400 request_id,
1401 client_id,
1402 instrument_id,
1403 book,
1404 None,
1405 None,
1406 clock.get_time_ns(),
1407 params,
1408 ));
1409 if let Err(e) = sender.send(DataEvent::Response(response)) {
1410 log::error!("Failed to send book snapshot response: {e}");
1411 }
1412 }
1413 Err(e) => {
1414 log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
1415 }
1416 }
1417 });
1418
1419 Ok(())
1420 }
1421}