nautilus_deribit/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Deribit adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28    clients::DataClient,
29    live::{runner::get_data_event_sender, runtime::get_runtime},
30    log_info,
31    messages::{
32        DataEvent, DataResponse,
33        data::{
34            BarsResponse, BookResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
36            SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates,
37            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
38            SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39            UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
40            UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstruments,
41            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
42        },
43    },
44};
45use nautilus_core::{
46    datetime::datetime_to_unix_nanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50    data::{Data, OrderBookDeltas_API},
51    identifiers::{ClientId, InstrumentId, Venue},
52    instruments::{Instrument, InstrumentAny},
53};
54use tokio::task::JoinHandle;
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58    common::{
59        consts::DERIBIT_VENUE,
60        parse::{bar_spec_to_resolution, parse_instrument_kind_currency},
61    },
62    config::DeribitDataClientConfig,
63    http::{
64        client::DeribitHttpClient,
65        models::{DeribitCurrency, DeribitInstrumentKind},
66    },
67    websocket::{
68        auth::DERIBIT_DATA_SESSION_NAME, client::DeribitWebSocketClient,
69        enums::DeribitUpdateInterval, messages::NautilusWsMessage,
70    },
71};
72
73/// Deribit live data client.
74#[derive(Debug)]
75pub struct DeribitDataClient {
76    client_id: ClientId,
77    config: DeribitDataClientConfig,
78    http_client: DeribitHttpClient,
79    ws_client: Option<DeribitWebSocketClient>,
80    is_connected: AtomicBool,
81    cancellation_token: CancellationToken,
82    tasks: Vec<JoinHandle<()>>,
83    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
84    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
85    clock: &'static AtomicTime,
86}
87
88impl DeribitDataClient {
89    /// Creates a new [`DeribitDataClient`] instance.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the client fails to initialize.
94    pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
95        let clock = get_atomic_clock_realtime();
96        let data_sender = get_data_event_sender();
97
98        let http_client = if config.has_api_credentials() {
99            DeribitHttpClient::new_with_env(
100                config.api_key.clone(),
101                config.api_secret.clone(),
102                config.use_testnet,
103                config.http_timeout_secs,
104                config.max_retries,
105                config.retry_delay_initial_ms,
106                config.retry_delay_max_ms,
107                None, // proxy_url
108            )?
109        } else {
110            DeribitHttpClient::new(
111                config.base_url_http.clone(),
112                config.use_testnet,
113                config.http_timeout_secs,
114                config.max_retries,
115                config.retry_delay_initial_ms,
116                config.retry_delay_max_ms,
117                None, // proxy_url
118            )?
119        };
120
121        let ws_client = DeribitWebSocketClient::new(
122            Some(config.ws_url()),
123            config.api_key.clone(),
124            config.api_secret.clone(),
125            config.heartbeat_interval_secs,
126            config.use_testnet,
127        )?;
128
129        Ok(Self {
130            client_id,
131            config,
132            http_client,
133            ws_client: Some(ws_client),
134            is_connected: AtomicBool::new(false),
135            cancellation_token: CancellationToken::new(),
136            tasks: Vec::new(),
137            data_sender,
138            instruments: Arc::new(RwLock::new(AHashMap::new())),
139            clock,
140        })
141    }
142
143    /// Returns a mutable reference to the WebSocket client.
144    fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
145        self.ws_client
146            .as_mut()
147            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
148    }
149
150    /// Spawns a task to process WebSocket messages.
151    fn spawn_stream_task(
152        &mut self,
153        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
154    ) -> anyhow::Result<()> {
155        let data_sender = self.data_sender.clone();
156        let instruments = Arc::clone(&self.instruments);
157        let cancellation = self.cancellation_token.clone();
158
159        let handle = get_runtime().spawn(async move {
160            tokio::pin!(stream);
161
162            loop {
163                tokio::select! {
164                    maybe_msg = stream.next() => {
165                        match maybe_msg {
166                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
167                            None => {
168                                log::debug!("Deribit websocket stream ended");
169                                break;
170                            }
171                        }
172                    }
173                    () = cancellation.cancelled() => {
174                        log::debug!("Deribit websocket stream task cancelled");
175                        break;
176                    }
177                }
178            }
179        });
180
181        self.tasks.push(handle);
182        Ok(())
183    }
184
185    /// Handles incoming WebSocket messages.
186    fn handle_ws_message(
187        message: NautilusWsMessage,
188        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
189        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
190    ) {
191        match message {
192            NautilusWsMessage::Data(payloads) => {
193                for data in payloads {
194                    Self::send_data(sender, data);
195                }
196            }
197            NautilusWsMessage::Deltas(deltas) => {
198                Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
199            }
200            NautilusWsMessage::Instrument(instrument) => {
201                let instrument_any = *instrument;
202                if let Ok(mut guard) = instruments.write() {
203                    let instrument_id = instrument_any.id();
204                    guard.insert(instrument_id, instrument_any.clone());
205                    drop(guard);
206
207                    if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
208                        log::warn!("Failed to send instrument update: {e}");
209                    }
210                } else {
211                    log::error!("Instrument cache lock poisoned, skipping instrument update");
212                }
213            }
214            NautilusWsMessage::Error(e) => {
215                log::error!("Deribit WebSocket error: {e:?}");
216            }
217            NautilusWsMessage::Raw(value) => {
218                log::debug!("Unhandled raw message: {value}");
219            }
220            NautilusWsMessage::Reconnected => {
221                log::info!("Deribit websocket reconnected");
222            }
223            NautilusWsMessage::Authenticated(auth) => {
224                log::debug!(
225                    "Deribit websocket authenticated: expires_in={}s",
226                    auth.expires_in
227                );
228            }
229            NautilusWsMessage::FundingRates(funding_rates) => {
230                log::info!(
231                    "Received {} funding rate update(s) from WebSocket",
232                    funding_rates.len()
233                );
234                for funding_rate in funding_rates {
235                    log::debug!("Sending funding rate: {funding_rate:?}");
236                    if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
237                        log::error!("Failed to send funding rate: {e}");
238                    }
239                }
240            }
241        }
242    }
243
244    /// Sends data to the data channel.
245    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
246        if let Err(e) = sender.send(DataEvent::Data(data)) {
247            log::error!("Failed to send data: {e}");
248        }
249    }
250}
251
252#[async_trait(?Send)]
253impl DataClient for DeribitDataClient {
254    fn client_id(&self) -> ClientId {
255        self.client_id
256    }
257
258    fn venue(&self) -> Option<Venue> {
259        Some(*DERIBIT_VENUE)
260    }
261
262    fn start(&mut self) -> anyhow::Result<()> {
263        log::info!(
264            "Starting Deribit data client: client_id={}, use_testnet={}",
265            self.client_id,
266            self.config.use_testnet
267        );
268        Ok(())
269    }
270
271    fn stop(&mut self) -> anyhow::Result<()> {
272        log::info!("Stopping Deribit data client: {}", self.client_id);
273        self.cancellation_token.cancel();
274        self.is_connected.store(false, Ordering::Relaxed);
275        Ok(())
276    }
277
278    fn reset(&mut self) -> anyhow::Result<()> {
279        log::info!("Resetting Deribit data client: {}", self.client_id);
280        self.is_connected.store(false, Ordering::Relaxed);
281        self.cancellation_token = CancellationToken::new();
282        self.tasks.clear();
283        if let Ok(mut instruments) = self.instruments.write() {
284            instruments.clear();
285        }
286        Ok(())
287    }
288
289    fn dispose(&mut self) -> anyhow::Result<()> {
290        log::info!("Disposing Deribit data client: {}", self.client_id);
291        self.stop()
292    }
293
294    fn is_connected(&self) -> bool {
295        self.is_connected.load(Ordering::SeqCst)
296    }
297
298    fn is_disconnected(&self) -> bool {
299        !self.is_connected()
300    }
301
302    async fn connect(&mut self) -> anyhow::Result<()> {
303        if self.is_connected() {
304            return Ok(());
305        }
306
307        // Fetch instruments for each configured instrument kind
308        let instrument_kinds = if self.config.instrument_kinds.is_empty() {
309            vec![DeribitInstrumentKind::Future]
310        } else {
311            self.config.instrument_kinds.clone()
312        };
313
314        let mut all_instruments = Vec::new();
315        for kind in &instrument_kinds {
316            let fetched = self
317                .http_client
318                .request_instruments(DeribitCurrency::ANY, Some(*kind))
319                .await
320                .with_context(|| format!("failed to request Deribit instruments for {kind:?}"))?;
321
322            // Cache in http client
323            self.http_client.cache_instruments(fetched.clone());
324
325            // Cache locally
326            let mut guard = self
327                .instruments
328                .write()
329                .map_err(|e| anyhow::anyhow!("{e}"))?;
330            for instrument in &fetched {
331                guard.insert(instrument.id(), instrument.clone());
332            }
333            drop(guard);
334
335            all_instruments.extend(fetched);
336        }
337
338        log::info!(
339            "Cached instruments: client_id={}, total={}",
340            self.client_id,
341            all_instruments.len()
342        );
343
344        for instrument in &all_instruments {
345            if let Err(e) = self
346                .data_sender
347                .send(DataEvent::Instrument(instrument.clone()))
348            {
349                log::warn!("Failed to send instrument: {e}");
350            }
351        }
352
353        // Cache instruments in WebSocket client before connecting
354        let ws = self.ws_client_mut()?;
355        ws.cache_instruments(all_instruments);
356
357        // Connect WebSocket and wait until active
358        ws.connect()
359            .await
360            .context("failed to connect Deribit websocket")?;
361        ws.wait_until_active(10.0)
362            .await
363            .context("websocket failed to become active")?;
364
365        // Authenticate if credentials are configured (required for raw streams)
366        if ws.has_credentials() {
367            ws.authenticate_session(DERIBIT_DATA_SESSION_NAME)
368                .await
369                .context("failed to authenticate Deribit websocket")?;
370            log_info!("Deribit WebSocket authenticated");
371        }
372
373        // Get the stream and spawn processing task
374        let stream = self.ws_client_mut()?.stream();
375        self.spawn_stream_task(stream)?;
376
377        self.is_connected.store(true, Ordering::Release);
378        let network = if self.config.use_testnet {
379            "testnet"
380        } else {
381            "mainnet"
382        };
383        log_info!("Deribit data client connected ({})", network);
384        Ok(())
385    }
386
387    async fn disconnect(&mut self) -> anyhow::Result<()> {
388        if self.is_disconnected() {
389            return Ok(());
390        }
391
392        // Cancel all tasks
393        self.cancellation_token.cancel();
394
395        // Close WebSocket connection
396        if let Some(ws) = self.ws_client.as_ref()
397            && let Err(e) = ws.close().await
398        {
399            log::warn!("Error while closing Deribit websocket: {e:?}");
400        }
401
402        // Wait for all tasks to complete
403        for handle in self.tasks.drain(..) {
404            if let Err(e) = handle.await {
405                log::error!("Error joining websocket task: {e:?}");
406            }
407        }
408
409        // Reset cancellation token for potential reconnection
410        self.cancellation_token = CancellationToken::new();
411        self.is_connected.store(false, Ordering::Relaxed);
412
413        log_info!("Deribit data client disconnected");
414        Ok(())
415    }
416
417    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
418        // Extract kind and currency from params, defaulting to "any.any" (all instruments)
419        let kind = cmd
420            .params
421            .as_ref()
422            .and_then(|p| p.get("kind"))
423            .map_or("any", |s| s.as_str())
424            .to_string();
425        let currency = cmd
426            .params
427            .as_ref()
428            .and_then(|p| p.get("currency"))
429            .map_or("any", |s| s.as_str())
430            .to_string();
431
432        let ws = self
433            .ws_client
434            .as_ref()
435            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
436            .clone();
437
438        log::info!("Subscribing to instrument state changes for {kind}.{currency}");
439
440        get_runtime().spawn(async move {
441            if let Err(e) = ws.subscribe_instrument_state(&kind, &currency).await {
442                log::error!("Failed to subscribe to instrument state for {kind}.{currency}: {e}");
443            }
444        });
445
446        Ok(())
447    }
448
449    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
450        let instrument_id = cmd.instrument_id;
451
452        // Check if instrument is in cache (should be from connect())
453        let guard = self
454            .instruments
455            .read()
456            .map_err(|e| anyhow::anyhow!("{e}"))?;
457        if !guard.contains_key(&instrument_id) {
458            log::warn!(
459                "Instrument {instrument_id} not in cache - it may have been created after connect()"
460            );
461        }
462        drop(guard);
463
464        // Determine kind and currency from instrument_id
465        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
466
467        let ws = self
468            .ws_client
469            .as_ref()
470            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
471            .clone();
472
473        log::info!(
474            "Subscribing to instrument state for {instrument_id} (channel: {kind}.{currency})"
475        );
476
477        // Subscribe to broader kind/currency channel (filter in handler)
478        get_runtime().spawn(async move {
479            if let Err(e) = ws.subscribe_instrument_state(&kind, &currency).await {
480                log::error!("Failed to subscribe to instrument state for {instrument_id}: {e}");
481            }
482        });
483
484        Ok(())
485    }
486
487    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
488        let ws = self
489            .ws_client
490            .as_ref()
491            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
492            .clone();
493        let instrument_id = cmd.instrument_id;
494
495        // Get interval from params, default to 100ms (public)
496        let interval = cmd
497            .params
498            .as_ref()
499            .and_then(|p| p.get("interval"))
500            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
501
502        log::info!(
503            "Subscribing to book deltas for {} (interval: {}, book_type: {:?})",
504            instrument_id,
505            interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
506            cmd.book_type
507        );
508
509        get_runtime().spawn(async move {
510            if let Err(e) = ws.subscribe_book(instrument_id, interval).await {
511                log::error!("Failed to subscribe to book deltas for {instrument_id}: {e}");
512            }
513        });
514
515        Ok(())
516    }
517
518    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
519        let ws = self
520            .ws_client
521            .as_ref()
522            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
523            .clone();
524        let instrument_id = cmd.instrument_id;
525
526        // Get interval from params, default to 100ms (public)
527        let interval = cmd
528            .params
529            .as_ref()
530            .and_then(|p| p.get("interval"))
531            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
532
533        // Get price grouping from params, default to "none" (no grouping)
534        let group = cmd
535            .params
536            .as_ref()
537            .and_then(|p| p.get("group"))
538            .map_or("none", String::as_str)
539            .to_string();
540
541        log::info!(
542            "Subscribing to book depth10 for {} (group: {}, interval: {}, book_type: {:?})",
543            instrument_id,
544            group,
545            interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
546            cmd.book_type
547        );
548
549        get_runtime().spawn(async move {
550            if let Err(e) = ws
551                .subscribe_book_grouped(instrument_id, &group, 10, interval)
552                .await
553            {
554                log::error!("Failed to subscribe to book depth10 for {instrument_id}: {e}");
555            }
556        });
557
558        Ok(())
559    }
560
561    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
562        let ws = self
563            .ws_client
564            .as_ref()
565            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
566            .clone();
567        let instrument_id = cmd.instrument_id;
568
569        log::info!("Subscribing to quotes for {instrument_id}");
570
571        get_runtime().spawn(async move {
572            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
573                log::error!("Failed to subscribe to quotes for {instrument_id}: {e}");
574            }
575        });
576
577        Ok(())
578    }
579
580    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
581        let ws = self
582            .ws_client
583            .as_ref()
584            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
585            .clone();
586        let instrument_id = cmd.instrument_id;
587
588        let interval = cmd
589            .params
590            .as_ref()
591            .and_then(|p| p.get("interval"))
592            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
593
594        log::info!(
595            "Subscribing to trades for {} (interval: {})",
596            instrument_id,
597            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
598        );
599
600        get_runtime().spawn(async move {
601            if let Err(e) = ws.subscribe_trades(instrument_id, interval).await {
602                log::error!("Failed to subscribe to trades for {instrument_id}: {e}");
603            }
604        });
605
606        Ok(())
607    }
608
609    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
610        let ws = self
611            .ws_client
612            .as_ref()
613            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
614            .clone();
615        let instrument_id = cmd.instrument_id;
616
617        let interval = cmd
618            .params
619            .as_ref()
620            .and_then(|p| p.get("interval"))
621            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
622
623        log::info!(
624            "Subscribing to mark prices for {} (via ticker channel, interval: {})",
625            instrument_id,
626            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
627        );
628
629        get_runtime().spawn(async move {
630            if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
631                log::error!("Failed to subscribe to mark prices for {instrument_id}: {e}");
632            }
633        });
634
635        Ok(())
636    }
637
638    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
639        let ws = self
640            .ws_client
641            .as_ref()
642            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
643            .clone();
644        let instrument_id = cmd.instrument_id;
645
646        let interval = cmd
647            .params
648            .as_ref()
649            .and_then(|p| p.get("interval"))
650            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
651
652        log::info!(
653            "Subscribing to index prices for {} (via ticker channel, interval: {})",
654            instrument_id,
655            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
656        );
657
658        get_runtime().spawn(async move {
659            if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
660                log::error!("Failed to subscribe to index prices for {instrument_id}: {e}");
661            }
662        });
663
664        Ok(())
665    }
666
667    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
668        let instrument_id = cmd.instrument_id;
669
670        // Validate instrument is a perpetual - funding rates only apply to perpetual contracts
671        let is_perpetual = self
672            .instruments
673            .read()
674            .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
675            .get(&instrument_id)
676            .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
677
678        if !is_perpetual {
679            log::warn!(
680                "Funding rates subscription rejected for {instrument_id}: only available for perpetual instruments."
681            );
682            return Ok(());
683        }
684
685        let ws = self
686            .ws_client
687            .as_ref()
688            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
689            .clone();
690
691        let interval = cmd
692            .params
693            .as_ref()
694            .and_then(|p| p.get("interval"))
695            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
696
697        // Funding rates use the dedicated perpetual channel
698        log::info!(
699            "Subscribing to funding rates for {} (perpetual channel, interval: {})",
700            instrument_id,
701            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
702        );
703
704        get_runtime().spawn(async move {
705            if let Err(e) = ws
706                .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
707                .await
708            {
709                log::error!("Failed to subscribe to funding rates for {instrument_id}: {e}");
710            }
711        });
712
713        Ok(())
714    }
715
716    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
717        let ws = self
718            .ws_client
719            .as_ref()
720            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
721            .clone();
722        let instrument_id = cmd.bar_type.instrument_id();
723        // Convert bar spec to Deribit resolution
724        let resolution = bar_spec_to_resolution(&cmd.bar_type);
725
726        get_runtime().spawn(async move {
727            if let Err(e) = ws.subscribe_chart(instrument_id, &resolution).await {
728                log::error!("Failed to subscribe to bars for {instrument_id}: {e}");
729            }
730        });
731
732        Ok(())
733    }
734
735    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
736        let kind = cmd
737            .params
738            .as_ref()
739            .and_then(|p| p.get("kind"))
740            .map_or("any", |s| s.as_str())
741            .to_string();
742        let currency = cmd
743            .params
744            .as_ref()
745            .and_then(|p| p.get("currency"))
746            .map_or("any", |s| s.as_str())
747            .to_string();
748
749        let ws = self
750            .ws_client
751            .as_ref()
752            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
753            .clone();
754
755        log::info!("Unsubscribing from instrument state changes for {kind}.{currency}");
756
757        get_runtime().spawn(async move {
758            if let Err(e) = ws.unsubscribe_instrument_state(&kind, &currency).await {
759                log::error!(
760                    "Failed to unsubscribe from instrument state for {kind}.{currency}: {e}"
761                );
762            }
763        });
764
765        Ok(())
766    }
767
768    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
769        let instrument_id = cmd.instrument_id;
770
771        // Determine kind and currency from instrument_id
772        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
773
774        let ws = self
775            .ws_client
776            .as_ref()
777            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
778            .clone();
779
780        log::info!(
781            "Unsubscribing from instrument state for {instrument_id} (channel: {kind}.{currency})"
782        );
783
784        get_runtime().spawn(async move {
785            if let Err(e) = ws.unsubscribe_instrument_state(&kind, &currency).await {
786                log::error!("Failed to unsubscribe from instrument state for {instrument_id}: {e}");
787            }
788        });
789
790        Ok(())
791    }
792
793    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
794        let ws = self
795            .ws_client
796            .as_ref()
797            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
798            .clone();
799        let instrument_id = cmd.instrument_id;
800
801        // Get interval from params to match the subscribed channel
802        let interval = cmd
803            .params
804            .as_ref()
805            .and_then(|p| p.get("interval"))
806            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
807
808        log::info!(
809            "Unsubscribing from book deltas for {} (interval: {})",
810            instrument_id,
811            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
812        );
813
814        get_runtime().spawn(async move {
815            if let Err(e) = ws.unsubscribe_book(instrument_id, interval).await {
816                log::error!("Failed to unsubscribe from book deltas for {instrument_id}: {e}");
817            }
818        });
819
820        Ok(())
821    }
822
823    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
824        let ws = self
825            .ws_client
826            .as_ref()
827            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
828            .clone();
829        let instrument_id = cmd.instrument_id;
830
831        // Get interval from params to match the subscribed channel
832        let interval = cmd
833            .params
834            .as_ref()
835            .and_then(|p| p.get("interval"))
836            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
837
838        // Get price grouping from params to match the subscribed channel
839        let group = cmd
840            .params
841            .as_ref()
842            .and_then(|p| p.get("group"))
843            .map_or("none", String::as_str)
844            .to_string();
845
846        log::info!(
847            "Unsubscribing from book depth10 for {} (group: {}, interval: {})",
848            instrument_id,
849            group,
850            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
851        );
852
853        get_runtime().spawn(async move {
854            if let Err(e) = ws
855                .unsubscribe_book_grouped(instrument_id, &group, 10, interval)
856                .await
857            {
858                log::error!("Failed to unsubscribe from book depth10 for {instrument_id}: {e}");
859            }
860        });
861
862        Ok(())
863    }
864
865    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
866        let ws = self
867            .ws_client
868            .as_ref()
869            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
870            .clone();
871        let instrument_id = cmd.instrument_id;
872
873        log::info!("Unsubscribing from quotes for {instrument_id}");
874
875        get_runtime().spawn(async move {
876            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
877                log::error!("Failed to unsubscribe from quotes for {instrument_id}: {e}");
878            }
879        });
880
881        Ok(())
882    }
883
884    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
885        let ws = self
886            .ws_client
887            .as_ref()
888            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
889            .clone();
890        let instrument_id = cmd.instrument_id;
891
892        let interval = cmd
893            .params
894            .as_ref()
895            .and_then(|p| p.get("interval"))
896            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
897
898        log::info!(
899            "Unsubscribing from trades for {} (interval: {})",
900            instrument_id,
901            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
902        );
903
904        get_runtime().spawn(async move {
905            if let Err(e) = ws.unsubscribe_trades(instrument_id, interval).await {
906                log::error!("Failed to unsubscribe from trades for {instrument_id}: {e}");
907            }
908        });
909
910        Ok(())
911    }
912
913    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
914        let ws = self
915            .ws_client
916            .as_ref()
917            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
918            .clone();
919        let instrument_id = cmd.instrument_id;
920
921        let interval = cmd
922            .params
923            .as_ref()
924            .and_then(|p| p.get("interval"))
925            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
926
927        log::info!(
928            "Unsubscribing from mark prices for {} (via ticker channel, interval: {})",
929            instrument_id,
930            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
931        );
932
933        get_runtime().spawn(async move {
934            if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
935                log::error!("Failed to unsubscribe from mark prices for {instrument_id}: {e}");
936            }
937        });
938
939        Ok(())
940    }
941
942    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
943        let ws = self
944            .ws_client
945            .as_ref()
946            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
947            .clone();
948        let instrument_id = cmd.instrument_id;
949
950        let interval = cmd
951            .params
952            .as_ref()
953            .and_then(|p| p.get("interval"))
954            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
955
956        log::info!(
957            "Unsubscribing from index prices for {} (via ticker channel, interval: {})",
958            instrument_id,
959            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
960        );
961
962        get_runtime().spawn(async move {
963            if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
964                log::error!("Failed to unsubscribe from index prices for {instrument_id}: {e}");
965            }
966        });
967
968        Ok(())
969    }
970
971    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
972        let instrument_id = cmd.instrument_id;
973
974        // Validate instrument is a perpetual - funding rates only apply to perpetual contracts
975        let is_perpetual = self
976            .instruments
977            .read()
978            .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
979            .get(&instrument_id)
980            .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
981
982        if !is_perpetual {
983            log::warn!(
984                "Funding rates unsubscription rejected for {instrument_id}: only available for perpetual instruments."
985            );
986            return Ok(());
987        }
988
989        let ws = self
990            .ws_client
991            .as_ref()
992            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
993            .clone();
994
995        let interval = cmd
996            .params
997            .as_ref()
998            .and_then(|p| p.get("interval"))
999            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok());
1000
1001        log::info!(
1002            "Unsubscribing from funding rates for {} (perpetual channel, interval: {})",
1003            instrument_id,
1004            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1005        );
1006
1007        get_runtime().spawn(async move {
1008            if let Err(e) = ws
1009                .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
1010                .await
1011            {
1012                log::error!("Failed to unsubscribe from funding rates for {instrument_id}: {e}");
1013            }
1014        });
1015
1016        Ok(())
1017    }
1018
1019    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1020        let ws = self
1021            .ws_client
1022            .as_ref()
1023            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1024            .clone();
1025        let instrument_id = cmd.bar_type.instrument_id();
1026        let resolution = bar_spec_to_resolution(&cmd.bar_type);
1027
1028        get_runtime().spawn(async move {
1029            if let Err(e) = ws.unsubscribe_chart(instrument_id, &resolution).await {
1030                log::error!("Failed to unsubscribe from bars for {instrument_id}: {e}");
1031            }
1032        });
1033
1034        Ok(())
1035    }
1036
1037    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
1038        if request.start.is_some() {
1039            log::warn!(
1040                "Requesting instruments for {:?} with specified `start` which has no effect",
1041                request.venue
1042            );
1043        }
1044        if request.end.is_some() {
1045            log::warn!(
1046                "Requesting instruments for {:?} with specified `end` which has no effect",
1047                request.venue
1048            );
1049        }
1050
1051        let http_client = self.http_client.clone();
1052        let instruments_cache = Arc::clone(&self.instruments);
1053        let sender = self.data_sender.clone();
1054        let request_id = request.request_id;
1055        let client_id = request.client_id.unwrap_or(self.client_id);
1056        let start_nanos = datetime_to_unix_nanos(request.start);
1057        let end_nanos = datetime_to_unix_nanos(request.end);
1058        let params = request.params.clone();
1059        let clock = self.clock;
1060        let venue = *DERIBIT_VENUE;
1061
1062        // Get instrument kinds from config, default to Future if empty
1063        let instrument_kinds = if self.config.instrument_kinds.is_empty() {
1064            vec![crate::http::models::DeribitInstrumentKind::Future]
1065        } else {
1066            self.config.instrument_kinds.clone()
1067        };
1068
1069        get_runtime().spawn(async move {
1070            let mut all_instruments = Vec::new();
1071            for kind in &instrument_kinds {
1072                log::debug!("Requesting instruments for currency=ANY, kind={kind:?}");
1073
1074                match http_client
1075                    .request_instruments(DeribitCurrency::ANY, Some(*kind))
1076                    .await
1077                {
1078                    Ok(instruments) => {
1079                        log::info!(
1080                            "Fetched {} instruments for ANY/{:?}",
1081                            instruments.len(),
1082                            kind
1083                        );
1084
1085                        for instrument in instruments {
1086                            // Cache the instrument
1087                            {
1088                                match instruments_cache.write() {
1089                                    Ok(mut guard) => {
1090                                        guard.insert(instrument.id(), instrument.clone());
1091                                    }
1092                                    Err(e) => {
1093                                        log::error!(
1094                                            "Instrument cache lock poisoned: {e}, skipping cache update"
1095                                        );
1096                                    }
1097                                }
1098                            }
1099
1100                            all_instruments.push(instrument);
1101                        }
1102                    }
1103                    Err(e) => {
1104                        log::error!("Failed to fetch instruments for ANY/{kind:?}: {e:?}");
1105                    }
1106                }
1107            }
1108
1109            // Send response with all collected instruments
1110            let response = DataResponse::Instruments(InstrumentsResponse::new(
1111                request_id,
1112                client_id,
1113                venue,
1114                all_instruments,
1115                start_nanos,
1116                end_nanos,
1117                clock.get_time_ns(),
1118                params,
1119            ));
1120
1121            if let Err(e) = sender.send(DataEvent::Response(response)) {
1122                log::error!("Failed to send instruments response: {e}");
1123            }
1124        });
1125
1126        Ok(())
1127    }
1128
1129    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
1130        if request.start.is_some() {
1131            log::warn!(
1132                "Requesting instrument {} with specified `start` which has no effect",
1133                request.instrument_id
1134            );
1135        }
1136        if request.end.is_some() {
1137            log::warn!(
1138                "Requesting instrument {} with specified `end` which has no effect",
1139                request.instrument_id
1140            );
1141        }
1142
1143        // First, check if instrument exists in cache
1144        if let Some(instrument) = self
1145            .instruments
1146            .read()
1147            .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
1148            .get(&request.instrument_id)
1149            .cloned()
1150        {
1151            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1152                request.request_id,
1153                request.client_id.unwrap_or(self.client_id),
1154                instrument.id(),
1155                instrument,
1156                datetime_to_unix_nanos(request.start),
1157                datetime_to_unix_nanos(request.end),
1158                self.clock.get_time_ns(),
1159                request.params.clone(),
1160            )));
1161
1162            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1163                log::error!("Failed to send instrument response: {e}");
1164            }
1165            return Ok(());
1166        }
1167
1168        log::debug!(
1169            "Instrument {} not in cache, fetching from API",
1170            request.instrument_id
1171        );
1172
1173        let http_client = self.http_client.clone();
1174        let instruments_cache = Arc::clone(&self.instruments);
1175        let sender = self.data_sender.clone();
1176        let instrument_id = request.instrument_id;
1177        let request_id = request.request_id;
1178        let client_id = request.client_id.unwrap_or(self.client_id);
1179        let start_nanos = datetime_to_unix_nanos(request.start);
1180        let end_nanos = datetime_to_unix_nanos(request.end);
1181        let params = request.params.clone();
1182        let clock = self.clock;
1183
1184        get_runtime().spawn(async move {
1185            match http_client
1186                .request_instrument(instrument_id)
1187                .await
1188                .context("failed to request instrument from Deribit")
1189            {
1190                Ok(instrument) => {
1191                    log::info!("Successfully fetched instrument: {instrument_id}");
1192
1193                    // Cache the instrument
1194                    {
1195                        let mut guard = instruments_cache
1196                            .write()
1197                            .expect("instrument cache lock poisoned");
1198                        guard.insert(instrument.id(), instrument.clone());
1199                    }
1200
1201                    // Send response
1202                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1203                        request_id,
1204                        client_id,
1205                        instrument.id(),
1206                        instrument,
1207                        start_nanos,
1208                        end_nanos,
1209                        clock.get_time_ns(),
1210                        params,
1211                    )));
1212
1213                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1214                        log::error!("Failed to send instrument response: {e}");
1215                    }
1216                }
1217                Err(e) => {
1218                    log::error!("Instrument request failed for {instrument_id}: {e:?}");
1219                }
1220            }
1221        });
1222
1223        Ok(())
1224    }
1225
1226    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1227        let http_client = self.http_client.clone();
1228        let sender = self.data_sender.clone();
1229        let instrument_id = request.instrument_id;
1230        let start = request.start;
1231        let end = request.end;
1232        let limit = request.limit.map(|n| n.get() as u32);
1233        let request_id = request.request_id;
1234        let client_id = request.client_id.unwrap_or(self.client_id);
1235        let params = request.params.clone();
1236        let clock = self.clock;
1237        let start_nanos = datetime_to_unix_nanos(start);
1238        let end_nanos = datetime_to_unix_nanos(end);
1239
1240        get_runtime().spawn(async move {
1241            match http_client
1242                .request_trades(instrument_id, start, end, limit)
1243                .await
1244                .context("failed to request trades from Deribit")
1245            {
1246                Ok(trades) => {
1247                    let response = DataResponse::Trades(TradesResponse::new(
1248                        request_id,
1249                        client_id,
1250                        instrument_id,
1251                        trades,
1252                        start_nanos,
1253                        end_nanos,
1254                        clock.get_time_ns(),
1255                        params,
1256                    ));
1257                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1258                        log::error!("Failed to send trades response: {e}");
1259                    }
1260                }
1261                Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
1262            }
1263        });
1264
1265        Ok(())
1266    }
1267
1268    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1269        let http_client = self.http_client.clone();
1270        let sender = self.data_sender.clone();
1271        let bar_type = request.bar_type;
1272        let start = request.start;
1273        let end = request.end;
1274        let limit = request.limit.map(|n| n.get() as u32);
1275        let request_id = request.request_id;
1276        let client_id = request.client_id.unwrap_or(self.client_id);
1277        let params = request.params.clone();
1278        let clock = self.clock;
1279        let start_nanos = datetime_to_unix_nanos(start);
1280        let end_nanos = datetime_to_unix_nanos(end);
1281
1282        get_runtime().spawn(async move {
1283            match http_client
1284                .request_bars(bar_type, start, end, limit)
1285                .await
1286                .context("failed to request bars from Deribit")
1287            {
1288                Ok(bars) => {
1289                    let response = DataResponse::Bars(BarsResponse::new(
1290                        request_id,
1291                        client_id,
1292                        bar_type,
1293                        bars,
1294                        start_nanos,
1295                        end_nanos,
1296                        clock.get_time_ns(),
1297                        params,
1298                    ));
1299                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1300                        log::error!("Failed to send bars response: {e}");
1301                    }
1302                }
1303                Err(e) => log::error!("Bars request failed for {bar_type}: {e:?}"),
1304            }
1305        });
1306
1307        Ok(())
1308    }
1309
1310    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
1311        let http_client = self.http_client.clone();
1312        let sender = self.data_sender.clone();
1313        let instrument_id = request.instrument_id;
1314        let depth = request.depth.map(|n| n.get() as u32);
1315        let request_id = request.request_id;
1316        let client_id = request.client_id.unwrap_or(self.client_id);
1317        let params = request.params.clone();
1318        let clock = self.clock;
1319
1320        get_runtime().spawn(async move {
1321            match http_client
1322                .request_book_snapshot(instrument_id, depth)
1323                .await
1324                .context("failed to request book snapshot from Deribit")
1325            {
1326                Ok(book) => {
1327                    let response = DataResponse::Book(BookResponse::new(
1328                        request_id,
1329                        client_id,
1330                        instrument_id,
1331                        book,
1332                        None,
1333                        None,
1334                        clock.get_time_ns(),
1335                        params,
1336                    ));
1337                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1338                        log::error!("Failed to send book snapshot response: {e}");
1339                    }
1340                }
1341                Err(e) => {
1342                    log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
1343                }
1344            }
1345        });
1346
1347        Ok(())
1348    }
1349}