Skip to main content

nautilus_deribit/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Deribit adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28    clients::DataClient,
29    live::{runner::get_data_event_sender, runtime::get_runtime},
30    log_info,
31    messages::{
32        DataEvent, DataResponse,
33        data::{
34            BarsResponse, BookResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
36            SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates,
37            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
38            SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39            UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
40            UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstruments,
41            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
42        },
43    },
44};
45use nautilus_core::{
46    datetime::datetime_to_unix_nanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50    data::{Data, OrderBookDeltas_API},
51    enums::BookType,
52    identifiers::{ClientId, InstrumentId, Venue},
53    instruments::{Instrument, InstrumentAny},
54};
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59    common::{
60        consts::{
61            DERIBIT_BOOK_DEFAULT_DEPTH, DERIBIT_BOOK_DEFAULT_GROUP, DERIBIT_BOOK_VALID_DEPTHS,
62            DERIBIT_VENUE,
63        },
64        parse::{bar_spec_to_resolution, parse_instrument_kind_currency},
65    },
66    config::DeribitDataClientConfig,
67    http::{
68        client::DeribitHttpClient,
69        models::{DeribitCurrency, DeribitProductType},
70    },
71    websocket::{
72        auth::DERIBIT_DATA_SESSION_NAME, client::DeribitWebSocketClient,
73        enums::DeribitUpdateInterval, messages::NautilusWsMessage,
74    },
75};
76
77/// Deribit live data client.
78#[derive(Debug)]
79pub struct DeribitDataClient {
80    client_id: ClientId,
81    config: DeribitDataClientConfig,
82    http_client: DeribitHttpClient,
83    ws_client: Option<DeribitWebSocketClient>,
84    is_connected: AtomicBool,
85    cancellation_token: CancellationToken,
86    tasks: Vec<JoinHandle<()>>,
87    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
88    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
89    clock: &'static AtomicTime,
90}
91
92impl DeribitDataClient {
93    /// Creates a new [`DeribitDataClient`] instance.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the client fails to initialize.
98    pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
99        let clock = get_atomic_clock_realtime();
100        let data_sender = get_data_event_sender();
101
102        let http_client = if config.has_api_credentials() {
103            DeribitHttpClient::new_with_env(
104                config.api_key.clone(),
105                config.api_secret.clone(),
106                config.use_testnet,
107                config.http_timeout_secs,
108                config.max_retries,
109                config.retry_delay_initial_ms,
110                config.retry_delay_max_ms,
111                None, // proxy_url
112            )?
113        } else {
114            DeribitHttpClient::new(
115                config.base_url_http.clone(),
116                config.use_testnet,
117                config.http_timeout_secs,
118                config.max_retries,
119                config.retry_delay_initial_ms,
120                config.retry_delay_max_ms,
121                None, // proxy_url
122            )?
123        };
124
125        let ws_client = DeribitWebSocketClient::new(
126            Some(config.ws_url()),
127            config.api_key.clone(),
128            config.api_secret.clone(),
129            config.heartbeat_interval_secs,
130            config.use_testnet,
131        )?;
132
133        Ok(Self {
134            client_id,
135            config,
136            http_client,
137            ws_client: Some(ws_client),
138            is_connected: AtomicBool::new(false),
139            cancellation_token: CancellationToken::new(),
140            tasks: Vec::new(),
141            data_sender,
142            instruments: Arc::new(RwLock::new(AHashMap::new())),
143            clock,
144        })
145    }
146
147    /// Returns a mutable reference to the WebSocket client.
148    fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
149        self.ws_client
150            .as_mut()
151            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
152    }
153
154    /// Gets the interval from params, defaulting to Raw if authenticated.
155    ///
156    /// If authenticated, we prefer Raw interval for best data quality.
157    /// Users can still override via params if they want 100ms or agg2.
158    fn get_interval(
159        &self,
160        params: &Option<indexmap::IndexMap<String, String>>,
161    ) -> Option<DeribitUpdateInterval> {
162        if let Some(interval) = params
163            .as_ref()
164            .and_then(|p| p.get("interval"))
165            .and_then(|v| v.parse::<DeribitUpdateInterval>().ok())
166        {
167            return Some(interval);
168        }
169
170        // Default to Raw if authenticated, otherwise None (100ms default)
171        if let Some(ws) = self.ws_client.as_ref()
172            && ws.is_authenticated()
173        {
174            return Some(DeribitUpdateInterval::Raw);
175        }
176        None
177    }
178
179    /// Spawns a task to process WebSocket messages.
180    fn spawn_stream_task(
181        &mut self,
182        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
183    ) -> anyhow::Result<()> {
184        let data_sender = self.data_sender.clone();
185        let instruments = Arc::clone(&self.instruments);
186        let cancellation = self.cancellation_token.clone();
187
188        let handle = get_runtime().spawn(async move {
189            tokio::pin!(stream);
190
191            loop {
192                tokio::select! {
193                    maybe_msg = stream.next() => {
194                        match maybe_msg {
195                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
196                            None => {
197                                log::debug!("WebSocket stream ended");
198                                break;
199                            }
200                        }
201                    }
202                    () = cancellation.cancelled() => {
203                        log::debug!("WebSocket stream task cancelled");
204                        break;
205                    }
206                }
207            }
208        });
209
210        self.tasks.push(handle);
211        Ok(())
212    }
213
214    /// Handles incoming WebSocket messages.
215    fn handle_ws_message(
216        message: NautilusWsMessage,
217        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
218        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
219    ) {
220        match message {
221            NautilusWsMessage::Data(payloads) => {
222                for data in payloads {
223                    Self::send_data(sender, data);
224                }
225            }
226            NautilusWsMessage::Deltas(deltas) => {
227                Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
228            }
229            NautilusWsMessage::Instrument(instrument) => {
230                let instrument_any = *instrument;
231                if let Ok(mut guard) = instruments.write() {
232                    let instrument_id = instrument_any.id();
233                    guard.insert(instrument_id, instrument_any.clone());
234                    drop(guard);
235
236                    if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
237                        log::warn!("Failed to send instrument update: {e}");
238                    }
239                } else {
240                    log::error!("Instrument cache lock poisoned, skipping instrument update");
241                }
242            }
243            NautilusWsMessage::Error(e) => {
244                log::error!("WebSocket error: {e:?}");
245            }
246            NautilusWsMessage::Raw(value) => {
247                log::debug!("Unhandled raw message: {value}");
248            }
249            NautilusWsMessage::Reconnected => {
250                log::info!("WebSocket reconnected");
251            }
252            NautilusWsMessage::Authenticated(auth) => {
253                log::debug!("WebSocket authenticated: expires_in={}s", auth.expires_in);
254            }
255            NautilusWsMessage::FundingRates(funding_rates) => {
256                log::info!(
257                    "Received {} funding rate update(s) from WebSocket",
258                    funding_rates.len()
259                );
260                for funding_rate in funding_rates {
261                    log::debug!("Sending funding rate: {funding_rate:?}");
262                    if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
263                        log::error!("Failed to send funding rate: {e}");
264                    }
265                }
266            }
267            NautilusWsMessage::OrderStatusReports(reports) => {
268                log::warn!(
269                    "Data client received OrderStatusReports message (should be handled by execution client): {} reports",
270                    reports.len()
271                );
272            }
273            NautilusWsMessage::FillReports(reports) => {
274                log::warn!(
275                    "Data client received FillReports message (should be handled by execution client): {} reports",
276                    reports.len()
277                );
278            }
279            NautilusWsMessage::OrderRejected(order) => {
280                log::warn!(
281                    "Data client received OrderRejected message (should be handled by execution client): {order:?}"
282                );
283            }
284            NautilusWsMessage::OrderAccepted(order) => {
285                log::warn!(
286                    "Data client received OrderAccepted message (should be handled by execution client): {order:?}"
287                );
288            }
289            NautilusWsMessage::OrderCanceled(order) => {
290                log::warn!(
291                    "Data client received OrderCanceled message (should be handled by execution client): {order:?}"
292                );
293            }
294            NautilusWsMessage::OrderExpired(order) => {
295                log::warn!(
296                    "Data client received OrderExpired message (should be handled by execution client): {order:?}"
297                );
298            }
299            NautilusWsMessage::OrderUpdated(order) => {
300                log::warn!(
301                    "Data client received OrderUpdated message (should be handled by execution client): {order:?}"
302                );
303            }
304            NautilusWsMessage::OrderCancelRejected(order) => {
305                log::warn!(
306                    "Data client received OrderCancelRejected message (should be handled by execution client): {order:?}"
307                );
308            }
309            NautilusWsMessage::OrderModifyRejected(order) => {
310                log::warn!(
311                    "Data client received OrderModifyRejected message (should be handled by execution client): {order:?}"
312                );
313            }
314            NautilusWsMessage::AccountState(state) => {
315                log::warn!(
316                    "Data client received AccountState message (should be handled by execution client): {state:?}"
317                );
318            }
319        }
320    }
321
322    /// Sends data to the data channel.
323    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
324        if let Err(e) = sender.send(DataEvent::Data(data)) {
325            log::error!("Failed to send data: {e}");
326        }
327    }
328}
329
330#[async_trait(?Send)]
331impl DataClient for DeribitDataClient {
332    fn client_id(&self) -> ClientId {
333        self.client_id
334    }
335
336    fn venue(&self) -> Option<Venue> {
337        Some(*DERIBIT_VENUE)
338    }
339
340    fn start(&mut self) -> anyhow::Result<()> {
341        log::info!(
342            "Starting data client: client_id={}, use_testnet={}",
343            self.client_id,
344            self.config.use_testnet
345        );
346        Ok(())
347    }
348
349    fn stop(&mut self) -> anyhow::Result<()> {
350        log::info!("Stopping data client: {}", self.client_id);
351        self.cancellation_token.cancel();
352        self.is_connected.store(false, Ordering::Relaxed);
353        Ok(())
354    }
355
356    fn reset(&mut self) -> anyhow::Result<()> {
357        log::info!("Resetting data client: {}", self.client_id);
358        self.is_connected.store(false, Ordering::Relaxed);
359        self.cancellation_token = CancellationToken::new();
360        self.tasks.clear();
361        if let Ok(mut instruments) = self.instruments.write() {
362            instruments.clear();
363        }
364        Ok(())
365    }
366
367    fn dispose(&mut self) -> anyhow::Result<()> {
368        log::info!("Disposing data client: {}", self.client_id);
369        self.stop()
370    }
371
372    fn is_connected(&self) -> bool {
373        self.is_connected.load(Ordering::SeqCst)
374    }
375
376    fn is_disconnected(&self) -> bool {
377        !self.is_connected()
378    }
379
380    async fn connect(&mut self) -> anyhow::Result<()> {
381        if self.is_connected() {
382            return Ok(());
383        }
384
385        // Fetch instruments for each configured product type
386        let product_types = if self.config.product_types.is_empty() {
387            vec![DeribitProductType::Future]
388        } else {
389            self.config.product_types.clone()
390        };
391
392        let mut all_instruments = Vec::new();
393        for product_type in &product_types {
394            let fetched = self
395                .http_client
396                .request_instruments(DeribitCurrency::ANY, Some(*product_type))
397                .await
398                .with_context(|| format!("failed to request instruments for {product_type:?}"))?;
399
400            // Cache in http client
401            self.http_client.cache_instruments(fetched.clone());
402
403            // Cache locally
404            let mut guard = self
405                .instruments
406                .write()
407                .map_err(|e| anyhow::anyhow!("{e}"))?;
408            for instrument in &fetched {
409                guard.insert(instrument.id(), instrument.clone());
410            }
411            drop(guard);
412
413            all_instruments.extend(fetched);
414        }
415
416        log::info!(
417            "Cached instruments: client_id={}, total={}",
418            self.client_id,
419            all_instruments.len()
420        );
421
422        for instrument in &all_instruments {
423            if let Err(e) = self
424                .data_sender
425                .send(DataEvent::Instrument(instrument.clone()))
426            {
427                log::warn!("Failed to send instrument: {e}");
428            }
429        }
430
431        // Cache instruments in WebSocket client before connecting
432        let ws = self.ws_client_mut()?;
433        ws.cache_instruments(all_instruments);
434
435        // Connect WebSocket and wait until active
436        ws.connect().await.context("failed to connect WebSocket")?;
437        ws.wait_until_active(10.0)
438            .await
439            .context("WebSocket failed to become active")?;
440
441        // Authenticate if credentials are configured (required for raw streams)
442        if ws.has_credentials() {
443            ws.authenticate_session(DERIBIT_DATA_SESSION_NAME)
444                .await
445                .context("failed to authenticate WebSocket")?;
446            log_info!("WebSocket authenticated");
447        }
448
449        // Get the stream and spawn processing task
450        let stream = self.ws_client_mut()?.stream();
451        self.spawn_stream_task(stream)?;
452
453        self.is_connected.store(true, Ordering::Release);
454        let network = if self.config.use_testnet {
455            "testnet"
456        } else {
457            "mainnet"
458        };
459        log_info!("Connected ({})", network);
460        Ok(())
461    }
462
463    async fn disconnect(&mut self) -> anyhow::Result<()> {
464        if self.is_disconnected() {
465            return Ok(());
466        }
467
468        // Cancel all tasks
469        self.cancellation_token.cancel();
470
471        // Close WebSocket connection
472        if let Some(ws) = self.ws_client.as_ref()
473            && let Err(e) = ws.close().await
474        {
475            log::warn!("Error while closing WebSocket: {e:?}");
476        }
477
478        // Wait for all tasks to complete
479        for handle in self.tasks.drain(..) {
480            if let Err(e) = handle.await {
481                log::error!("Error joining WebSocket task: {e:?}");
482            }
483        }
484
485        // Reset cancellation token for potential reconnection
486        self.cancellation_token = CancellationToken::new();
487        self.is_connected.store(false, Ordering::Relaxed);
488
489        log_info!("Disconnected");
490        Ok(())
491    }
492
493    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
494        // Extract kind and currency from params, defaulting to "any.any" (all instruments)
495        let kind = cmd
496            .params
497            .as_ref()
498            .and_then(|p| p.get("kind"))
499            .map_or("any", |s| s.as_str())
500            .to_string();
501        let currency = cmd
502            .params
503            .as_ref()
504            .and_then(|p| p.get("currency"))
505            .map_or("any", |s| s.as_str())
506            .to_string();
507
508        let ws = self
509            .ws_client
510            .as_ref()
511            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
512            .clone();
513
514        log::info!("Subscribing to instrument state changes for {kind}.{currency}");
515
516        get_runtime().spawn(async move {
517            if let Err(e) = ws.subscribe_instrument_state(&kind, &currency).await {
518                log::error!("Failed to subscribe to instrument state for {kind}.{currency}: {e}");
519            }
520        });
521
522        Ok(())
523    }
524
525    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
526        let instrument_id = cmd.instrument_id;
527
528        // Check if instrument is in cache (should be from connect())
529        let guard = self
530            .instruments
531            .read()
532            .map_err(|e| anyhow::anyhow!("{e}"))?;
533        if !guard.contains_key(&instrument_id) {
534            log::warn!(
535                "Instrument {instrument_id} not in cache - it may have been created after connect()"
536            );
537        }
538        drop(guard);
539
540        // Determine kind and currency from instrument_id
541        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
542
543        let ws = self
544            .ws_client
545            .as_ref()
546            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
547            .clone();
548
549        log::info!(
550            "Subscribing to instrument state for {instrument_id} (channel: {kind}.{currency})"
551        );
552
553        // Subscribe to broader kind/currency channel (filter in handler)
554        get_runtime().spawn(async move {
555            if let Err(e) = ws.subscribe_instrument_state(&kind, &currency).await {
556                log::error!("Failed to subscribe to instrument state for {instrument_id}: {e}");
557            }
558        });
559
560        Ok(())
561    }
562
563    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
564        if cmd.book_type != BookType::L2_MBP {
565            anyhow::bail!("Deribit only supports L2_MBP order book deltas");
566        }
567
568        let ws = self
569            .ws_client
570            .as_ref()
571            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
572            .clone();
573        let instrument_id = cmd.instrument_id;
574        let interval = self.get_interval(&cmd.params);
575
576        let depth = cmd
577            .depth
578            .map(|d| d.get() as u32)
579            .or_else(|| {
580                cmd.params
581                    .as_ref()
582                    .and_then(|p| p.get("depth"))
583                    .and_then(|v| v.parse::<u32>().ok())
584            })
585            .unwrap_or(DERIBIT_BOOK_DEFAULT_DEPTH);
586
587        if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
588            anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
589        }
590
591        let group = cmd
592            .params
593            .as_ref()
594            .and_then(|p| p.get("group"))
595            .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
596            .to_string();
597
598        log::info!(
599            "Subscribing to book deltas for {} (group: {}, depth: {}, interval: {}, book_type: {:?})",
600            instrument_id,
601            group,
602            depth,
603            interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
604            cmd.book_type
605        );
606
607        get_runtime().spawn(async move {
608            let result = if interval == Some(DeribitUpdateInterval::Raw) {
609                ws.subscribe_book(instrument_id, interval).await
610            } else {
611                ws.subscribe_book_grouped(instrument_id, &group, depth, interval)
612                    .await
613            };
614
615            if let Err(e) = result {
616                log::error!("Failed to subscribe to book deltas for {instrument_id}: {e}");
617            }
618        });
619
620        Ok(())
621    }
622
623    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
624        if cmd.book_type != BookType::L2_MBP {
625            anyhow::bail!("Deribit only supports L2_MBP order book depth");
626        }
627
628        let ws = self
629            .ws_client
630            .as_ref()
631            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
632            .clone();
633        let instrument_id = cmd.instrument_id;
634        let interval = self.get_interval(&cmd.params);
635        let group = cmd
636            .params
637            .as_ref()
638            .and_then(|p| p.get("group"))
639            .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
640            .to_string();
641
642        log::info!(
643            "Subscribing to book depth10 for {} (group: {}, interval: {}, book_type: {:?})",
644            instrument_id,
645            group,
646            interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
647            cmd.book_type
648        );
649
650        get_runtime().spawn(async move {
651            if let Err(e) = ws
652                .subscribe_book_grouped(instrument_id, &group, 10, interval)
653                .await
654            {
655                log::error!("Failed to subscribe to book depth10 for {instrument_id}: {e}");
656            }
657        });
658
659        Ok(())
660    }
661
662    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
663        let ws = self
664            .ws_client
665            .as_ref()
666            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
667            .clone();
668        let instrument_id = cmd.instrument_id;
669
670        log::info!("Subscribing to quotes for {instrument_id}");
671
672        get_runtime().spawn(async move {
673            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
674                log::error!("Failed to subscribe to quotes for {instrument_id}: {e}");
675            }
676        });
677
678        Ok(())
679    }
680
681    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
682        let ws = self
683            .ws_client
684            .as_ref()
685            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
686            .clone();
687        let instrument_id = cmd.instrument_id;
688        let interval = self.get_interval(&cmd.params);
689
690        log::info!(
691            "Subscribing to trades for {} (interval: {})",
692            instrument_id,
693            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
694        );
695
696        get_runtime().spawn(async move {
697            if let Err(e) = ws.subscribe_trades(instrument_id, interval).await {
698                log::error!("Failed to subscribe to trades for {instrument_id}: {e}");
699            }
700        });
701
702        Ok(())
703    }
704
705    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
706        let ws = self
707            .ws_client
708            .as_ref()
709            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
710            .clone();
711        let instrument_id = cmd.instrument_id;
712        let interval = self.get_interval(&cmd.params);
713
714        log::info!(
715            "Subscribing to mark prices for {} (via ticker channel, interval: {})",
716            instrument_id,
717            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
718        );
719
720        get_runtime().spawn(async move {
721            if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
722                log::error!("Failed to subscribe to mark prices for {instrument_id}: {e}");
723            }
724        });
725
726        Ok(())
727    }
728
729    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
730        let ws = self
731            .ws_client
732            .as_ref()
733            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
734            .clone();
735        let instrument_id = cmd.instrument_id;
736        let interval = self.get_interval(&cmd.params);
737
738        log::info!(
739            "Subscribing to index prices for {} (via ticker channel, interval: {})",
740            instrument_id,
741            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
742        );
743
744        get_runtime().spawn(async move {
745            if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
746                log::error!("Failed to subscribe to index prices for {instrument_id}: {e}");
747            }
748        });
749
750        Ok(())
751    }
752
753    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
754        let ws = self
755            .ws_client
756            .as_ref()
757            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
758            .clone();
759        let instrument_id = cmd.bar_type.instrument_id();
760        let resolution = bar_spec_to_resolution(&cmd.bar_type);
761
762        get_runtime().spawn(async move {
763            if let Err(e) = ws.subscribe_chart(instrument_id, &resolution).await {
764                log::error!("Failed to subscribe to bars for {instrument_id}: {e}");
765            }
766        });
767
768        Ok(())
769    }
770
771    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
772        let instrument_id = cmd.instrument_id;
773
774        // Validate instrument is a perpetual - funding rates only apply to perpetual contracts
775        let is_perpetual = self
776            .instruments
777            .read()
778            .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
779            .get(&instrument_id)
780            .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
781
782        if !is_perpetual {
783            log::warn!(
784                "Funding rates subscription rejected for {instrument_id}: only available for perpetual instruments"
785            );
786            return Ok(());
787        }
788
789        let ws = self
790            .ws_client
791            .as_ref()
792            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
793            .clone();
794        let interval = self.get_interval(&cmd.params);
795
796        log::info!(
797            "Subscribing to funding rates for {} (perpetual channel, interval: {})",
798            instrument_id,
799            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
800        );
801
802        get_runtime().spawn(async move {
803            if let Err(e) = ws
804                .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
805                .await
806            {
807                log::error!("Failed to subscribe to funding rates for {instrument_id}: {e}");
808            }
809        });
810
811        Ok(())
812    }
813
814    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
815        let kind = cmd
816            .params
817            .as_ref()
818            .and_then(|p| p.get("kind"))
819            .map_or("any", |s| s.as_str())
820            .to_string();
821        let currency = cmd
822            .params
823            .as_ref()
824            .and_then(|p| p.get("currency"))
825            .map_or("any", |s| s.as_str())
826            .to_string();
827
828        let ws = self
829            .ws_client
830            .as_ref()
831            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
832            .clone();
833
834        log::info!("Unsubscribing from instrument state changes for {kind}.{currency}");
835
836        get_runtime().spawn(async move {
837            if let Err(e) = ws.unsubscribe_instrument_state(&kind, &currency).await {
838                log::error!(
839                    "Failed to unsubscribe from instrument state for {kind}.{currency}: {e}"
840                );
841            }
842        });
843
844        Ok(())
845    }
846
847    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
848        let instrument_id = cmd.instrument_id;
849
850        // Determine kind and currency from instrument_id
851        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
852
853        let ws = self
854            .ws_client
855            .as_ref()
856            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
857            .clone();
858
859        log::info!(
860            "Unsubscribing from instrument state for {instrument_id} (channel: {kind}.{currency})"
861        );
862
863        get_runtime().spawn(async move {
864            if let Err(e) = ws.unsubscribe_instrument_state(&kind, &currency).await {
865                log::error!("Failed to unsubscribe from instrument state for {instrument_id}: {e}");
866            }
867        });
868
869        Ok(())
870    }
871
872    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
873        let ws = self
874            .ws_client
875            .as_ref()
876            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
877            .clone();
878        let instrument_id = cmd.instrument_id;
879        let interval = self.get_interval(&cmd.params);
880
881        let depth = cmd
882            .params
883            .as_ref()
884            .and_then(|p| p.get("depth"))
885            .and_then(|v| v.parse::<u32>().ok())
886            .unwrap_or(DERIBIT_BOOK_DEFAULT_DEPTH);
887
888        if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
889            anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
890        }
891
892        let group = cmd
893            .params
894            .as_ref()
895            .and_then(|p| p.get("group"))
896            .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
897            .to_string();
898
899        log::info!(
900            "Unsubscribing from book deltas for {} (group: {}, depth: {}, interval: {})",
901            instrument_id,
902            group,
903            depth,
904            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
905        );
906
907        get_runtime().spawn(async move {
908            let result = if interval == Some(DeribitUpdateInterval::Raw) {
909                ws.unsubscribe_book(instrument_id, interval).await
910            } else {
911                ws.unsubscribe_book_grouped(instrument_id, &group, depth, interval)
912                    .await
913            };
914
915            if let Err(e) = result {
916                log::error!("Failed to unsubscribe from book deltas for {instrument_id}: {e}");
917            }
918        });
919
920        Ok(())
921    }
922
923    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
924        let ws = self
925            .ws_client
926            .as_ref()
927            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
928            .clone();
929        let instrument_id = cmd.instrument_id;
930        let interval = self.get_interval(&cmd.params);
931        let group = cmd
932            .params
933            .as_ref()
934            .and_then(|p| p.get("group"))
935            .map_or(DERIBIT_BOOK_DEFAULT_GROUP, String::as_str)
936            .to_string();
937
938        log::info!(
939            "Unsubscribing from book depth10 for {} (group: {}, interval: {})",
940            instrument_id,
941            group,
942            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
943        );
944
945        get_runtime().spawn(async move {
946            if let Err(e) = ws
947                .unsubscribe_book_grouped(instrument_id, &group, 10, interval)
948                .await
949            {
950                log::error!("Failed to unsubscribe from book depth10 for {instrument_id}: {e}");
951            }
952        });
953
954        Ok(())
955    }
956
957    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
958        let ws = self
959            .ws_client
960            .as_ref()
961            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
962            .clone();
963        let instrument_id = cmd.instrument_id;
964
965        log::info!("Unsubscribing from quotes for {instrument_id}");
966
967        get_runtime().spawn(async move {
968            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
969                log::error!("Failed to unsubscribe from quotes for {instrument_id}: {e}");
970            }
971        });
972
973        Ok(())
974    }
975
976    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
977        let ws = self
978            .ws_client
979            .as_ref()
980            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
981            .clone();
982        let instrument_id = cmd.instrument_id;
983        let interval = self.get_interval(&cmd.params);
984
985        log::info!(
986            "Unsubscribing from trades for {} (interval: {})",
987            instrument_id,
988            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
989        );
990
991        get_runtime().spawn(async move {
992            if let Err(e) = ws.unsubscribe_trades(instrument_id, interval).await {
993                log::error!("Failed to unsubscribe from trades for {instrument_id}: {e}");
994            }
995        });
996
997        Ok(())
998    }
999
1000    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1001        let ws = self
1002            .ws_client
1003            .as_ref()
1004            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1005            .clone();
1006        let instrument_id = cmd.instrument_id;
1007        let interval = self.get_interval(&cmd.params);
1008
1009        log::info!(
1010            "Unsubscribing from mark prices for {} (via ticker channel, interval: {})",
1011            instrument_id,
1012            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1013        );
1014
1015        get_runtime().spawn(async move {
1016            if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1017                log::error!("Failed to unsubscribe from mark prices for {instrument_id}: {e}");
1018            }
1019        });
1020
1021        Ok(())
1022    }
1023
1024    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1025        let ws = self
1026            .ws_client
1027            .as_ref()
1028            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1029            .clone();
1030        let instrument_id = cmd.instrument_id;
1031        let interval = self.get_interval(&cmd.params);
1032
1033        log::info!(
1034            "Unsubscribing from index prices for {} (via ticker channel, interval: {})",
1035            instrument_id,
1036            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1037        );
1038
1039        get_runtime().spawn(async move {
1040            if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1041                log::error!("Failed to unsubscribe from index prices for {instrument_id}: {e}");
1042            }
1043        });
1044
1045        Ok(())
1046    }
1047
1048    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1049        let ws = self
1050            .ws_client
1051            .as_ref()
1052            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1053            .clone();
1054        let instrument_id = cmd.bar_type.instrument_id();
1055        let resolution = bar_spec_to_resolution(&cmd.bar_type);
1056
1057        get_runtime().spawn(async move {
1058            if let Err(e) = ws.unsubscribe_chart(instrument_id, &resolution).await {
1059                log::error!("Failed to unsubscribe from bars for {instrument_id}: {e}");
1060            }
1061        });
1062
1063        Ok(())
1064    }
1065
1066    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1067        let instrument_id = cmd.instrument_id;
1068
1069        // Validate instrument is a perpetual - funding rates only apply to perpetual contracts
1070        let is_perpetual = self
1071            .instruments
1072            .read()
1073            .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
1074            .get(&instrument_id)
1075            .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
1076
1077        if !is_perpetual {
1078            log::warn!(
1079                "Funding rates unsubscription rejected for {instrument_id}: only available for perpetual instruments"
1080            );
1081            return Ok(());
1082        }
1083
1084        let ws = self
1085            .ws_client
1086            .as_ref()
1087            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1088            .clone();
1089        let interval = self.get_interval(&cmd.params);
1090
1091        log::info!(
1092            "Unsubscribing from funding rates for {} (perpetual channel, interval: {})",
1093            instrument_id,
1094            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1095        );
1096
1097        get_runtime().spawn(async move {
1098            if let Err(e) = ws
1099                .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
1100                .await
1101            {
1102                log::error!("Failed to unsubscribe from funding rates for {instrument_id}: {e}");
1103            }
1104        });
1105
1106        Ok(())
1107    }
1108
1109    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1110        if request.start.is_some() {
1111            log::warn!(
1112                "Requesting instruments for {:?} with specified `start` which has no effect",
1113                request.venue
1114            );
1115        }
1116        if request.end.is_some() {
1117            log::warn!(
1118                "Requesting instruments for {:?} with specified `end` which has no effect",
1119                request.venue
1120            );
1121        }
1122
1123        let http_client = self.http_client.clone();
1124        let instruments_cache = Arc::clone(&self.instruments);
1125        let sender = self.data_sender.clone();
1126        let request_id = request.request_id;
1127        let client_id = request.client_id.unwrap_or(self.client_id);
1128        let start_nanos = datetime_to_unix_nanos(request.start);
1129        let end_nanos = datetime_to_unix_nanos(request.end);
1130        let params = request.params;
1131        let clock = self.clock;
1132        let venue = *DERIBIT_VENUE;
1133
1134        // Get product types from config, default to Future if empty
1135        let product_types = if self.config.product_types.is_empty() {
1136            vec![crate::http::models::DeribitProductType::Future]
1137        } else {
1138            self.config.product_types.clone()
1139        };
1140
1141        get_runtime().spawn(async move {
1142            let mut all_instruments = Vec::new();
1143            for product_type in &product_types {
1144                log::debug!("Requesting instruments for currency=ANY, product_type={product_type:?}");
1145
1146                match http_client
1147                    .request_instruments(DeribitCurrency::ANY, Some(*product_type))
1148                    .await
1149                {
1150                    Ok(instruments) => {
1151                        log::info!(
1152                            "Fetched {} instruments for ANY/{:?}",
1153                            instruments.len(),
1154                            product_type
1155                        );
1156
1157                        for instrument in instruments {
1158                            // Cache the instrument
1159                            {
1160                                match instruments_cache.write() {
1161                                    Ok(mut guard) => {
1162                                        guard.insert(instrument.id(), instrument.clone());
1163                                    }
1164                                    Err(e) => {
1165                                        log::error!(
1166                                            "Instrument cache lock poisoned: {e}, skipping cache update"
1167                                        );
1168                                    }
1169                                }
1170                            }
1171
1172                            all_instruments.push(instrument);
1173                        }
1174                    }
1175                    Err(e) => {
1176                        log::error!("Failed to fetch instruments for ANY/{product_type:?}: {e:?}");
1177                    }
1178                }
1179            }
1180
1181            // Send response with all collected instruments
1182            let response = DataResponse::Instruments(InstrumentsResponse::new(
1183                request_id,
1184                client_id,
1185                venue,
1186                all_instruments,
1187                start_nanos,
1188                end_nanos,
1189                clock.get_time_ns(),
1190                params,
1191            ));
1192
1193            if let Err(e) = sender.send(DataEvent::Response(response)) {
1194                log::error!("Failed to send instruments response: {e}");
1195            }
1196        });
1197
1198        Ok(())
1199    }
1200
1201    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1202        if request.start.is_some() {
1203            log::warn!(
1204                "Requesting instrument {} with specified `start` which has no effect",
1205                request.instrument_id
1206            );
1207        }
1208        if request.end.is_some() {
1209            log::warn!(
1210                "Requesting instrument {} with specified `end` which has no effect",
1211                request.instrument_id
1212            );
1213        }
1214
1215        // First, check if instrument exists in cache
1216        if let Some(instrument) = self
1217            .instruments
1218            .read()
1219            .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
1220            .get(&request.instrument_id)
1221            .cloned()
1222        {
1223            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1224                request.request_id,
1225                request.client_id.unwrap_or(self.client_id),
1226                instrument.id(),
1227                instrument,
1228                datetime_to_unix_nanos(request.start),
1229                datetime_to_unix_nanos(request.end),
1230                self.clock.get_time_ns(),
1231                request.params,
1232            )));
1233
1234            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1235                log::error!("Failed to send instrument response: {e}");
1236            }
1237            return Ok(());
1238        }
1239
1240        log::debug!(
1241            "Instrument {} not in cache, fetching from API",
1242            request.instrument_id
1243        );
1244
1245        let http_client = self.http_client.clone();
1246        let instruments_cache = Arc::clone(&self.instruments);
1247        let sender = self.data_sender.clone();
1248        let instrument_id = request.instrument_id;
1249        let request_id = request.request_id;
1250        let client_id = request.client_id.unwrap_or(self.client_id);
1251        let start_nanos = datetime_to_unix_nanos(request.start);
1252        let end_nanos = datetime_to_unix_nanos(request.end);
1253        let params = request.params;
1254        let clock = self.clock;
1255
1256        get_runtime().spawn(async move {
1257            match http_client
1258                .request_instrument(instrument_id)
1259                .await
1260                .context("failed to request instrument from Deribit")
1261            {
1262                Ok(instrument) => {
1263                    log::info!("Successfully fetched instrument: {instrument_id}");
1264
1265                    // Cache the instrument
1266                    {
1267                        let mut guard = instruments_cache
1268                            .write()
1269                            .expect("instrument cache lock poisoned");
1270                        guard.insert(instrument.id(), instrument.clone());
1271                    }
1272
1273                    // Send response
1274                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1275                        request_id,
1276                        client_id,
1277                        instrument.id(),
1278                        instrument,
1279                        start_nanos,
1280                        end_nanos,
1281                        clock.get_time_ns(),
1282                        params,
1283                    )));
1284
1285                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1286                        log::error!("Failed to send instrument response: {e}");
1287                    }
1288                }
1289                Err(e) => {
1290                    log::error!("Instrument request failed for {instrument_id}: {e:?}");
1291                }
1292            }
1293        });
1294
1295        Ok(())
1296    }
1297
1298    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1299        let http_client = self.http_client.clone();
1300        let sender = self.data_sender.clone();
1301        let instrument_id = request.instrument_id;
1302        let start = request.start;
1303        let end = request.end;
1304        let limit = request.limit.map(|n| n.get() as u32);
1305        let request_id = request.request_id;
1306        let client_id = request.client_id.unwrap_or(self.client_id);
1307        let params = request.params;
1308        let clock = self.clock;
1309        let start_nanos = datetime_to_unix_nanos(start);
1310        let end_nanos = datetime_to_unix_nanos(end);
1311
1312        get_runtime().spawn(async move {
1313            match http_client
1314                .request_trades(instrument_id, start, end, limit)
1315                .await
1316                .context("failed to request trades from Deribit")
1317            {
1318                Ok(trades) => {
1319                    let response = DataResponse::Trades(TradesResponse::new(
1320                        request_id,
1321                        client_id,
1322                        instrument_id,
1323                        trades,
1324                        start_nanos,
1325                        end_nanos,
1326                        clock.get_time_ns(),
1327                        params,
1328                    ));
1329                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1330                        log::error!("Failed to send trades response: {e}");
1331                    }
1332                }
1333                Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
1334            }
1335        });
1336
1337        Ok(())
1338    }
1339
1340    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1341        let http_client = self.http_client.clone();
1342        let sender = self.data_sender.clone();
1343        let bar_type = request.bar_type;
1344        let start = request.start;
1345        let end = request.end;
1346        let limit = request.limit.map(|n| n.get() as u32);
1347        let request_id = request.request_id;
1348        let client_id = request.client_id.unwrap_or(self.client_id);
1349        let params = request.params;
1350        let clock = self.clock;
1351        let start_nanos = datetime_to_unix_nanos(start);
1352        let end_nanos = datetime_to_unix_nanos(end);
1353
1354        get_runtime().spawn(async move {
1355            match http_client
1356                .request_bars(bar_type, start, end, limit)
1357                .await
1358                .context("failed to request bars from Deribit")
1359            {
1360                Ok(bars) => {
1361                    let response = DataResponse::Bars(BarsResponse::new(
1362                        request_id,
1363                        client_id,
1364                        bar_type,
1365                        bars,
1366                        start_nanos,
1367                        end_nanos,
1368                        clock.get_time_ns(),
1369                        params,
1370                    ));
1371                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1372                        log::error!("Failed to send bars response: {e}");
1373                    }
1374                }
1375                Err(e) => log::error!("Bars request failed for {bar_type}: {e:?}"),
1376            }
1377        });
1378
1379        Ok(())
1380    }
1381
1382    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1383        let http_client = self.http_client.clone();
1384        let sender = self.data_sender.clone();
1385        let instrument_id = request.instrument_id;
1386        let depth = request.depth.map(|n| n.get() as u32);
1387        let request_id = request.request_id;
1388        let client_id = request.client_id.unwrap_or(self.client_id);
1389        let params = request.params;
1390        let clock = self.clock;
1391
1392        get_runtime().spawn(async move {
1393            match http_client
1394                .request_book_snapshot(instrument_id, depth)
1395                .await
1396                .context("failed to request book snapshot from Deribit")
1397            {
1398                Ok(book) => {
1399                    let response = DataResponse::Book(BookResponse::new(
1400                        request_id,
1401                        client_id,
1402                        instrument_id,
1403                        book,
1404                        None,
1405                        None,
1406                        clock.get_time_ns(),
1407                        params,
1408                    ));
1409                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1410                        log::error!("Failed to send book snapshot response: {e}");
1411                    }
1412                }
1413                Err(e) => {
1414                    log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
1415                }
1416            }
1417        });
1418
1419        Ok(())
1420    }
1421}