nautilus_deribit/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Deribit adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent, DataResponse,
31        data::{
32            BarsResponse, BookResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
34            SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
35            SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments,
36            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
37            UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
38            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
39            UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
40        },
41    },
42};
43use nautilus_core::{
44    datetime::datetime_to_unix_nanos,
45    time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_data::client::DataClient;
48use nautilus_model::{
49    data::{Data, OrderBookDeltas_API},
50    identifiers::{ClientId, InstrumentId, Venue},
51    instruments::{Instrument, InstrumentAny},
52};
53use tokio::task::JoinHandle;
54use tokio_util::sync::CancellationToken;
55
56use crate::{
57    common::consts::DERIBIT_VENUE,
58    config::DeribitDataClientConfig,
59    http::{
60        client::DeribitHttpClient,
61        models::{DeribitCurrency, DeribitInstrumentKind},
62    },
63    websocket::{client::DeribitWebSocketClient, messages::NautilusWsMessage},
64};
65
66/// Deribit live data client.
67#[derive(Debug)]
68pub struct DeribitDataClient {
69    client_id: ClientId,
70    config: DeribitDataClientConfig,
71    http_client: DeribitHttpClient,
72    ws_client: Option<DeribitWebSocketClient>,
73    is_connected: AtomicBool,
74    cancellation_token: CancellationToken,
75    tasks: Vec<JoinHandle<()>>,
76    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78    clock: &'static AtomicTime,
79}
80
81impl DeribitDataClient {
82    /// Creates a new [`DeribitDataClient`] instance.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if the client fails to initialize.
87    pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
88        let clock = get_atomic_clock_realtime();
89        let data_sender = get_data_event_sender();
90
91        let http_client = if config.has_api_credentials() {
92            DeribitHttpClient::new_with_env(
93                config.api_key.clone(),
94                config.api_secret.clone(),
95                config.use_testnet,
96                config.http_timeout_secs,
97                config.max_retries,
98                config.retry_delay_initial_ms,
99                config.retry_delay_max_ms,
100                None, // proxy_url
101            )?
102        } else {
103            DeribitHttpClient::new(
104                config.base_url_http.clone(),
105                config.use_testnet,
106                config.http_timeout_secs,
107                config.max_retries,
108                config.retry_delay_initial_ms,
109                config.retry_delay_max_ms,
110                None, // proxy_url
111            )?
112        };
113
114        let ws_client = DeribitWebSocketClient::new(
115            Some(config.ws_url()),
116            config.api_key.clone(),
117            config.api_secret.clone(),
118            config.heartbeat_interval_secs,
119            config.use_testnet,
120        )?;
121
122        Ok(Self {
123            client_id,
124            config,
125            http_client,
126            ws_client: Some(ws_client),
127            is_connected: AtomicBool::new(false),
128            cancellation_token: CancellationToken::new(),
129            tasks: Vec::new(),
130            data_sender,
131            instruments: Arc::new(RwLock::new(AHashMap::new())),
132            clock,
133        })
134    }
135
136    /// Returns a mutable reference to the WebSocket client.
137    fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
138        self.ws_client
139            .as_mut()
140            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
141    }
142
143    /// Spawns a task to process WebSocket messages.
144    fn spawn_stream_task(
145        &mut self,
146        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
147    ) -> anyhow::Result<()> {
148        let data_sender = self.data_sender.clone();
149        let instruments = Arc::clone(&self.instruments);
150        let cancellation = self.cancellation_token.clone();
151
152        let handle = get_runtime().spawn(async move {
153            tokio::pin!(stream);
154
155            loop {
156                tokio::select! {
157                    maybe_msg = stream.next() => {
158                        match maybe_msg {
159                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
160                            None => {
161                                tracing::debug!("Deribit websocket stream ended");
162                                break;
163                            }
164                        }
165                    }
166                    _ = cancellation.cancelled() => {
167                        tracing::debug!("Deribit websocket stream task cancelled");
168                        break;
169                    }
170                }
171            }
172        });
173
174        self.tasks.push(handle);
175        Ok(())
176    }
177
178    /// Handles incoming WebSocket messages.
179    fn handle_ws_message(
180        message: NautilusWsMessage,
181        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
182        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
183    ) {
184        match message {
185            NautilusWsMessage::Data(payloads) => {
186                for data in payloads {
187                    Self::send_data(sender, data);
188                }
189            }
190            NautilusWsMessage::Deltas(deltas) => {
191                Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
192            }
193            NautilusWsMessage::Instrument(instrument) => {
194                let instrument_any = *instrument;
195                if let Ok(mut guard) = instruments.write() {
196                    let instrument_id = instrument_any.id();
197                    guard.insert(instrument_id, instrument_any.clone());
198                    drop(guard);
199
200                    if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
201                        tracing::warn!("Failed to send instrument update: {e}");
202                    }
203                } else {
204                    tracing::error!("Instrument cache lock poisoned, skipping instrument update");
205                }
206            }
207            NautilusWsMessage::Error(e) => {
208                tracing::error!("Deribit WebSocket error: {e:?}");
209            }
210            NautilusWsMessage::Raw(value) => {
211                tracing::debug!("Unhandled raw message: {value}");
212            }
213            NautilusWsMessage::Reconnected => {
214                tracing::info!("Deribit websocket reconnected");
215            }
216            NautilusWsMessage::Authenticated(auth) => {
217                tracing::debug!(
218                    "Deribit websocket authenticated: expires_in={}s",
219                    auth.expires_in
220                );
221            }
222        }
223    }
224
225    /// Sends data to the data channel.
226    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
227        if let Err(e) = sender.send(DataEvent::Data(data)) {
228            tracing::error!("Failed to send data: {e}");
229        }
230    }
231}
232
233#[async_trait(?Send)]
234impl DataClient for DeribitDataClient {
235    fn client_id(&self) -> ClientId {
236        self.client_id
237    }
238
239    fn venue(&self) -> Option<Venue> {
240        Some(*DERIBIT_VENUE)
241    }
242
243    fn start(&mut self) -> anyhow::Result<()> {
244        tracing::info!(
245            client_id = %self.client_id,
246            use_testnet = %self.config.use_testnet,
247            "Starting Deribit data client"
248        );
249        Ok(())
250    }
251
252    fn stop(&mut self) -> anyhow::Result<()> {
253        tracing::info!("Stopping Deribit data client: {}", self.client_id);
254        self.cancellation_token.cancel();
255        self.is_connected.store(false, Ordering::Relaxed);
256        Ok(())
257    }
258
259    fn reset(&mut self) -> anyhow::Result<()> {
260        tracing::info!("Resetting Deribit data client: {}", self.client_id);
261        self.is_connected.store(false, Ordering::Relaxed);
262        self.cancellation_token = CancellationToken::new();
263        self.tasks.clear();
264        if let Ok(mut instruments) = self.instruments.write() {
265            instruments.clear();
266        }
267        Ok(())
268    }
269
270    fn dispose(&mut self) -> anyhow::Result<()> {
271        tracing::info!("Disposing Deribit data client: {}", self.client_id);
272        self.stop()
273    }
274
275    fn is_connected(&self) -> bool {
276        self.is_connected.load(Ordering::SeqCst)
277    }
278
279    fn is_disconnected(&self) -> bool {
280        !self.is_connected()
281    }
282
283    async fn connect(&mut self) -> anyhow::Result<()> {
284        if self.is_connected() {
285            return Ok(());
286        }
287
288        // Fetch instruments for each configured instrument kind
289        let instrument_kinds = if self.config.instrument_kinds.is_empty() {
290            vec![DeribitInstrumentKind::Future]
291        } else {
292            self.config.instrument_kinds.clone()
293        };
294
295        let mut all_instruments = Vec::new();
296        for kind in &instrument_kinds {
297            let fetched = self
298                .http_client
299                .request_instruments(DeribitCurrency::ANY, Some(*kind))
300                .await
301                .with_context(|| format!("failed to request Deribit instruments for {kind:?}"))?;
302
303            // Cache in http client
304            self.http_client.cache_instruments(fetched.clone());
305
306            // Cache locally
307            let mut guard = self
308                .instruments
309                .write()
310                .map_err(|e| anyhow::anyhow!("{e}"))?;
311            for instrument in &fetched {
312                guard.insert(instrument.id(), instrument.clone());
313            }
314            drop(guard);
315
316            all_instruments.extend(fetched);
317        }
318
319        tracing::info!(
320            client_id = %self.client_id,
321            total = all_instruments.len(),
322            "Cached instruments"
323        );
324
325        for instrument in &all_instruments {
326            if let Err(e) = self
327                .data_sender
328                .send(DataEvent::Instrument(instrument.clone()))
329            {
330                tracing::warn!("Failed to send instrument: {e}");
331            }
332        }
333
334        // Cache instruments in WebSocket client before connecting
335        let ws = self.ws_client_mut()?;
336        ws.cache_instruments(all_instruments);
337
338        // Connect WebSocket and wait until active
339        ws.connect()
340            .await
341            .context("failed to connect Deribit websocket")?;
342        ws.wait_until_active(10.0)
343            .await
344            .context("websocket failed to become active")?;
345
346        // Get the stream and spawn processing task
347        let stream = self.ws_client_mut()?.stream();
348        self.spawn_stream_task(stream)?;
349
350        self.is_connected.store(true, Ordering::Release);
351        tracing::info!(client_id = %self.client_id, "Connected");
352        Ok(())
353    }
354
355    async fn disconnect(&mut self) -> anyhow::Result<()> {
356        if self.is_disconnected() {
357            return Ok(());
358        }
359
360        // Cancel all tasks
361        self.cancellation_token.cancel();
362
363        // Close WebSocket connection
364        if let Some(ws) = self.ws_client.as_ref()
365            && let Err(e) = ws.close().await
366        {
367            tracing::warn!("Error while closing Deribit websocket: {e:?}");
368        }
369
370        // Wait for all tasks to complete
371        for handle in self.tasks.drain(..) {
372            if let Err(e) = handle.await {
373                tracing::error!("Error joining websocket task: {e:?}");
374            }
375        }
376
377        // Reset cancellation token for potential reconnection
378        self.cancellation_token = CancellationToken::new();
379        self.is_connected.store(false, Ordering::Relaxed);
380
381        tracing::info!(client_id = %self.client_id, "Disconnected");
382        Ok(())
383    }
384
385    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
386        todo!("Implement subscribe_instruments");
387    }
388
389    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
390        todo!("Implement subscribe_instrument");
391    }
392
393    fn subscribe_book_deltas(&mut self, _cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
394        todo!("Implement subscribe_book_deltas");
395    }
396
397    fn subscribe_book_depth10(&mut self, _cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
398        todo!("Implement subscribe_book_depth10")
399    }
400
401    fn subscribe_book_snapshots(&mut self, _cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
402        todo!("Implement subscribe_book_snapshots");
403    }
404
405    fn subscribe_quotes(&mut self, _cmd: &SubscribeQuotes) -> anyhow::Result<()> {
406        todo!("Implement subscribe_quotes")
407    }
408
409    fn subscribe_trades(&mut self, _cmd: &SubscribeTrades) -> anyhow::Result<()> {
410        todo!("Implement subscribe_trades")
411    }
412
413    fn subscribe_mark_prices(&mut self, _cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
414        todo!("Implement subscribe_mark_prices")
415    }
416
417    fn subscribe_index_prices(&mut self, _cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
418        todo!("Implement subscribe_index_prices")
419    }
420
421    fn subscribe_funding_rates(&mut self, _cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
422        todo!("Implement subscribe_funding_rates")
423    }
424
425    fn subscribe_bars(&mut self, _cmd: &SubscribeBars) -> anyhow::Result<()> {
426        todo!("Implement subscribe_bars");
427    }
428
429    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
430        todo!("Implement unsubscribe_instruments");
431    }
432
433    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
434        todo!("Implement unsubscribe_instrument");
435    }
436
437    fn unsubscribe_book_deltas(&mut self, _cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
438        todo!("Implement unsubscribe_book_deltas");
439    }
440
441    fn unsubscribe_book_depth10(&mut self, _cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
442        todo!("Implement unsubscribe_book_depth10");
443    }
444
445    fn unsubscribe_book_snapshots(
446        &mut self,
447        _cmd: &UnsubscribeBookSnapshots,
448    ) -> anyhow::Result<()> {
449        todo!("Implement unsubscribe_book_snapshots");
450    }
451
452    fn unsubscribe_quotes(&mut self, _cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
453        todo!("Implement unsubscribe_quotes");
454    }
455
456    fn unsubscribe_trades(&mut self, _cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
457        todo!("Implement unsubscribe_trades");
458    }
459
460    fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
461        todo!("Implement unsubscribe_mark_prices");
462    }
463
464    fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
465        todo!("Implement unsubscribe_index_prices");
466    }
467
468    fn unsubscribe_funding_rates(&mut self, _cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
469        todo!("Implement unsubscribe_funding_rates")
470    }
471
472    fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
473        todo!("Implement unsubscribe_bars");
474    }
475
476    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
477        if request.start.is_some() {
478            tracing::warn!(
479                "Requesting instruments for {:?} with specified `start` which has no effect",
480                request.venue
481            );
482        }
483        if request.end.is_some() {
484            tracing::warn!(
485                "Requesting instruments for {:?} with specified `end` which has no effect",
486                request.venue
487            );
488        }
489
490        let http_client = self.http_client.clone();
491        let instruments_cache = Arc::clone(&self.instruments);
492        let sender = self.data_sender.clone();
493        let request_id = request.request_id;
494        let client_id = request.client_id.unwrap_or(self.client_id);
495        let start_nanos = datetime_to_unix_nanos(request.start);
496        let end_nanos = datetime_to_unix_nanos(request.end);
497        let params = request.params.clone();
498        let clock = self.clock;
499        let venue = *DERIBIT_VENUE;
500
501        // Get instrument kinds from config, default to Future if empty
502        let instrument_kinds = if self.config.instrument_kinds.is_empty() {
503            vec![crate::http::models::DeribitInstrumentKind::Future]
504        } else {
505            self.config.instrument_kinds.clone()
506        };
507
508        get_runtime().spawn(async move {
509            let mut all_instruments = Vec::new();
510            for kind in &instrument_kinds {
511                tracing::debug!("Requesting instruments for currency=ANY, kind={:?}", kind);
512
513                match http_client
514                    .request_instruments(DeribitCurrency::ANY, Some(*kind))
515                    .await
516                {
517                    Ok(instruments) => {
518                        tracing::info!(
519                            "Fetched {} instruments for ANY/{:?}",
520                            instruments.len(),
521                            kind
522                        );
523
524                        for instrument in instruments {
525                            // Cache the instrument
526                            {
527                                match instruments_cache.write() {
528                                    Ok(mut guard) => {
529                                        guard.insert(instrument.id(), instrument.clone());
530                                    }
531                                    Err(e) => {
532                                        tracing::error!(
533                                            "Instrument cache lock poisoned: {e}, skipping cache update"
534                                        );
535                                    }
536                                }
537                            }
538
539                            all_instruments.push(instrument);
540                        }
541                    }
542                    Err(e) => {
543                        tracing::error!("Failed to fetch instruments for ANY/{:?}: {:?}", kind, e);
544                    }
545                }
546            }
547
548            // Send response with all collected instruments
549            let response = DataResponse::Instruments(InstrumentsResponse::new(
550                request_id,
551                client_id,
552                venue,
553                all_instruments,
554                start_nanos,
555                end_nanos,
556                clock.get_time_ns(),
557                params,
558            ));
559
560            if let Err(e) = sender.send(DataEvent::Response(response)) {
561                tracing::error!("Failed to send instruments response: {}", e);
562            }
563        });
564
565        Ok(())
566    }
567
568    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
569        if request.start.is_some() {
570            tracing::warn!(
571                "Requesting instrument {} with specified `start` which has no effect",
572                request.instrument_id
573            );
574        }
575        if request.end.is_some() {
576            tracing::warn!(
577                "Requesting instrument {} with specified `end` which has no effect",
578                request.instrument_id
579            );
580        }
581
582        // First, check if instrument exists in cache
583        if let Some(instrument) = self
584            .instruments
585            .read()
586            .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
587            .get(&request.instrument_id)
588            .cloned()
589        {
590            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
591                request.request_id,
592                request.client_id.unwrap_or(self.client_id),
593                instrument.id(),
594                instrument,
595                datetime_to_unix_nanos(request.start),
596                datetime_to_unix_nanos(request.end),
597                self.clock.get_time_ns(),
598                request.params.clone(),
599            )));
600
601            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
602                tracing::error!("Failed to send instrument response: {}", e);
603            }
604            return Ok(());
605        }
606
607        tracing::debug!(
608            "Instrument {} not in cache, fetching from API",
609            request.instrument_id
610        );
611
612        let http_client = self.http_client.clone();
613        let instruments_cache = Arc::clone(&self.instruments);
614        let sender = self.data_sender.clone();
615        let instrument_id = request.instrument_id;
616        let request_id = request.request_id;
617        let client_id = request.client_id.unwrap_or(self.client_id);
618        let start_nanos = datetime_to_unix_nanos(request.start);
619        let end_nanos = datetime_to_unix_nanos(request.end);
620        let params = request.params.clone();
621        let clock = self.clock;
622
623        get_runtime().spawn(async move {
624            match http_client
625                .request_instrument(instrument_id)
626                .await
627                .context("failed to request instrument from Deribit")
628            {
629                Ok(instrument) => {
630                    tracing::info!("Successfully fetched instrument: {}", instrument_id);
631
632                    // Cache the instrument
633                    {
634                        let mut guard = instruments_cache
635                            .write()
636                            .expect("instrument cache lock poisoned");
637                        guard.insert(instrument.id(), instrument.clone());
638                    }
639
640                    // Send response
641                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
642                        request_id,
643                        client_id,
644                        instrument.id(),
645                        instrument,
646                        start_nanos,
647                        end_nanos,
648                        clock.get_time_ns(),
649                        params,
650                    )));
651
652                    if let Err(e) = sender.send(DataEvent::Response(response)) {
653                        tracing::error!("Failed to send instrument response: {}", e);
654                    }
655                }
656                Err(e) => {
657                    tracing::error!("Instrument request failed for {}: {:?}", instrument_id, e);
658                }
659            }
660        });
661
662        Ok(())
663    }
664
665    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
666        let http_client = self.http_client.clone();
667        let sender = self.data_sender.clone();
668        let instrument_id = request.instrument_id;
669        let start = request.start;
670        let end = request.end;
671        let limit = request.limit.map(|n| n.get() as u32);
672        let request_id = request.request_id;
673        let client_id = request.client_id.unwrap_or(self.client_id);
674        let params = request.params.clone();
675        let clock = self.clock;
676        let start_nanos = datetime_to_unix_nanos(start);
677        let end_nanos = datetime_to_unix_nanos(end);
678
679        get_runtime().spawn(async move {
680            match http_client
681                .request_trades(instrument_id, start, end, limit)
682                .await
683                .context("failed to request trades from Deribit")
684            {
685                Ok(trades) => {
686                    let response = DataResponse::Trades(TradesResponse::new(
687                        request_id,
688                        client_id,
689                        instrument_id,
690                        trades,
691                        start_nanos,
692                        end_nanos,
693                        clock.get_time_ns(),
694                        params,
695                    ));
696                    if let Err(e) = sender.send(DataEvent::Response(response)) {
697                        tracing::error!("Failed to send trades response: {e}");
698                    }
699                }
700                Err(e) => tracing::error!("Trades request failed for {}: {:?}", instrument_id, e),
701            }
702        });
703
704        Ok(())
705    }
706
707    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
708        let http_client = self.http_client.clone();
709        let sender = self.data_sender.clone();
710        let bar_type = request.bar_type;
711        let start = request.start;
712        let end = request.end;
713        let limit = request.limit.map(|n| n.get() as u32);
714        let request_id = request.request_id;
715        let client_id = request.client_id.unwrap_or(self.client_id);
716        let params = request.params.clone();
717        let clock = self.clock;
718        let start_nanos = datetime_to_unix_nanos(start);
719        let end_nanos = datetime_to_unix_nanos(end);
720
721        get_runtime().spawn(async move {
722            match http_client
723                .request_bars(bar_type, start, end, limit)
724                .await
725                .context("failed to request bars from Deribit")
726            {
727                Ok(bars) => {
728                    let response = DataResponse::Bars(BarsResponse::new(
729                        request_id,
730                        client_id,
731                        bar_type,
732                        bars,
733                        start_nanos,
734                        end_nanos,
735                        clock.get_time_ns(),
736                        params,
737                    ));
738                    if let Err(e) = sender.send(DataEvent::Response(response)) {
739                        tracing::error!("Failed to send bars response: {e}");
740                    }
741                }
742                Err(e) => tracing::error!("Bars request failed for {}: {:?}", bar_type, e),
743            }
744        });
745
746        Ok(())
747    }
748
749    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
750        let http_client = self.http_client.clone();
751        let sender = self.data_sender.clone();
752        let instrument_id = request.instrument_id;
753        let depth = request.depth.map(|n| n.get() as u32);
754        let request_id = request.request_id;
755        let client_id = request.client_id.unwrap_or(self.client_id);
756        let params = request.params.clone();
757        let clock = self.clock;
758
759        get_runtime().spawn(async move {
760            match http_client
761                .request_book_snapshot(instrument_id, depth)
762                .await
763                .context("failed to request book snapshot from Deribit")
764            {
765                Ok(book) => {
766                    let response = DataResponse::Book(BookResponse::new(
767                        request_id,
768                        client_id,
769                        instrument_id,
770                        book,
771                        None,
772                        None,
773                        clock.get_time_ns(),
774                        params,
775                    ));
776                    if let Err(e) = sender.send(DataEvent::Response(response)) {
777                        tracing::error!("Failed to send book snapshot response: {e}");
778                    }
779                }
780                Err(e) => {
781                    tracing::error!(
782                        "Book snapshot request failed for {}: {:?}",
783                        instrument_id,
784                        e
785                    );
786                }
787            }
788        });
789
790        Ok(())
791    }
792}