1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent, DataResponse,
31 data::{
32 BarsResponse, BookResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
34 SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
35 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments,
36 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
37 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
38 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
39 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
40 },
41 },
42};
43use nautilus_core::{
44 datetime::datetime_to_unix_nanos,
45 time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_data::client::DataClient;
48use nautilus_model::{
49 data::{Data, OrderBookDeltas_API},
50 identifiers::{ClientId, InstrumentId, Venue},
51 instruments::{Instrument, InstrumentAny},
52};
53use tokio::task::JoinHandle;
54use tokio_util::sync::CancellationToken;
55
56use crate::{
57 common::consts::DERIBIT_VENUE,
58 config::DeribitDataClientConfig,
59 http::{
60 client::DeribitHttpClient,
61 models::{DeribitCurrency, DeribitInstrumentKind},
62 },
63 websocket::{client::DeribitWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Debug)]
68pub struct DeribitDataClient {
69 client_id: ClientId,
70 config: DeribitDataClientConfig,
71 http_client: DeribitHttpClient,
72 ws_client: Option<DeribitWebSocketClient>,
73 is_connected: AtomicBool,
74 cancellation_token: CancellationToken,
75 tasks: Vec<JoinHandle<()>>,
76 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78 clock: &'static AtomicTime,
79}
80
81impl DeribitDataClient {
82 pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
88 let clock = get_atomic_clock_realtime();
89 let data_sender = get_data_event_sender();
90
91 let http_client = if config.has_api_credentials() {
92 DeribitHttpClient::new_with_env(
93 config.api_key.clone(),
94 config.api_secret.clone(),
95 config.use_testnet,
96 config.http_timeout_secs,
97 config.max_retries,
98 config.retry_delay_initial_ms,
99 config.retry_delay_max_ms,
100 None, )?
102 } else {
103 DeribitHttpClient::new(
104 config.base_url_http.clone(),
105 config.use_testnet,
106 config.http_timeout_secs,
107 config.max_retries,
108 config.retry_delay_initial_ms,
109 config.retry_delay_max_ms,
110 None, )?
112 };
113
114 let ws_client = DeribitWebSocketClient::new(
115 Some(config.ws_url()),
116 config.api_key.clone(),
117 config.api_secret.clone(),
118 config.heartbeat_interval_secs,
119 config.use_testnet,
120 )?;
121
122 Ok(Self {
123 client_id,
124 config,
125 http_client,
126 ws_client: Some(ws_client),
127 is_connected: AtomicBool::new(false),
128 cancellation_token: CancellationToken::new(),
129 tasks: Vec::new(),
130 data_sender,
131 instruments: Arc::new(RwLock::new(AHashMap::new())),
132 clock,
133 })
134 }
135
136 fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
138 self.ws_client
139 .as_mut()
140 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
141 }
142
143 fn spawn_stream_task(
145 &mut self,
146 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
147 ) -> anyhow::Result<()> {
148 let data_sender = self.data_sender.clone();
149 let instruments = Arc::clone(&self.instruments);
150 let cancellation = self.cancellation_token.clone();
151
152 let handle = get_runtime().spawn(async move {
153 tokio::pin!(stream);
154
155 loop {
156 tokio::select! {
157 maybe_msg = stream.next() => {
158 match maybe_msg {
159 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
160 None => {
161 tracing::debug!("Deribit websocket stream ended");
162 break;
163 }
164 }
165 }
166 _ = cancellation.cancelled() => {
167 tracing::debug!("Deribit websocket stream task cancelled");
168 break;
169 }
170 }
171 }
172 });
173
174 self.tasks.push(handle);
175 Ok(())
176 }
177
178 fn handle_ws_message(
180 message: NautilusWsMessage,
181 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
182 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
183 ) {
184 match message {
185 NautilusWsMessage::Data(payloads) => {
186 for data in payloads {
187 Self::send_data(sender, data);
188 }
189 }
190 NautilusWsMessage::Deltas(deltas) => {
191 Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
192 }
193 NautilusWsMessage::Instrument(instrument) => {
194 let instrument_any = *instrument;
195 if let Ok(mut guard) = instruments.write() {
196 let instrument_id = instrument_any.id();
197 guard.insert(instrument_id, instrument_any.clone());
198 drop(guard);
199
200 if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
201 tracing::warn!("Failed to send instrument update: {e}");
202 }
203 } else {
204 tracing::error!("Instrument cache lock poisoned, skipping instrument update");
205 }
206 }
207 NautilusWsMessage::Error(e) => {
208 tracing::error!("Deribit WebSocket error: {e:?}");
209 }
210 NautilusWsMessage::Raw(value) => {
211 tracing::debug!("Unhandled raw message: {value}");
212 }
213 NautilusWsMessage::Reconnected => {
214 tracing::info!("Deribit websocket reconnected");
215 }
216 NautilusWsMessage::Authenticated(auth) => {
217 tracing::debug!(
218 "Deribit websocket authenticated: expires_in={}s",
219 auth.expires_in
220 );
221 }
222 }
223 }
224
225 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
227 if let Err(e) = sender.send(DataEvent::Data(data)) {
228 tracing::error!("Failed to send data: {e}");
229 }
230 }
231}
232
233#[async_trait(?Send)]
234impl DataClient for DeribitDataClient {
235 fn client_id(&self) -> ClientId {
236 self.client_id
237 }
238
239 fn venue(&self) -> Option<Venue> {
240 Some(*DERIBIT_VENUE)
241 }
242
243 fn start(&mut self) -> anyhow::Result<()> {
244 tracing::info!(
245 client_id = %self.client_id,
246 use_testnet = %self.config.use_testnet,
247 "Starting Deribit data client"
248 );
249 Ok(())
250 }
251
252 fn stop(&mut self) -> anyhow::Result<()> {
253 tracing::info!("Stopping Deribit data client: {}", self.client_id);
254 self.cancellation_token.cancel();
255 self.is_connected.store(false, Ordering::Relaxed);
256 Ok(())
257 }
258
259 fn reset(&mut self) -> anyhow::Result<()> {
260 tracing::info!("Resetting Deribit data client: {}", self.client_id);
261 self.is_connected.store(false, Ordering::Relaxed);
262 self.cancellation_token = CancellationToken::new();
263 self.tasks.clear();
264 if let Ok(mut instruments) = self.instruments.write() {
265 instruments.clear();
266 }
267 Ok(())
268 }
269
270 fn dispose(&mut self) -> anyhow::Result<()> {
271 tracing::info!("Disposing Deribit data client: {}", self.client_id);
272 self.stop()
273 }
274
275 fn is_connected(&self) -> bool {
276 self.is_connected.load(Ordering::SeqCst)
277 }
278
279 fn is_disconnected(&self) -> bool {
280 !self.is_connected()
281 }
282
283 async fn connect(&mut self) -> anyhow::Result<()> {
284 if self.is_connected() {
285 return Ok(());
286 }
287
288 let instrument_kinds = if self.config.instrument_kinds.is_empty() {
290 vec![DeribitInstrumentKind::Future]
291 } else {
292 self.config.instrument_kinds.clone()
293 };
294
295 let mut all_instruments = Vec::new();
296 for kind in &instrument_kinds {
297 let fetched = self
298 .http_client
299 .request_instruments(DeribitCurrency::ANY, Some(*kind))
300 .await
301 .with_context(|| format!("failed to request Deribit instruments for {kind:?}"))?;
302
303 self.http_client.cache_instruments(fetched.clone());
305
306 let mut guard = self
308 .instruments
309 .write()
310 .map_err(|e| anyhow::anyhow!("{e}"))?;
311 for instrument in &fetched {
312 guard.insert(instrument.id(), instrument.clone());
313 }
314 drop(guard);
315
316 all_instruments.extend(fetched);
317 }
318
319 tracing::info!(
320 client_id = %self.client_id,
321 total = all_instruments.len(),
322 "Cached instruments"
323 );
324
325 for instrument in &all_instruments {
326 if let Err(e) = self
327 .data_sender
328 .send(DataEvent::Instrument(instrument.clone()))
329 {
330 tracing::warn!("Failed to send instrument: {e}");
331 }
332 }
333
334 let ws = self.ws_client_mut()?;
336 ws.cache_instruments(all_instruments);
337
338 ws.connect()
340 .await
341 .context("failed to connect Deribit websocket")?;
342 ws.wait_until_active(10.0)
343 .await
344 .context("websocket failed to become active")?;
345
346 let stream = self.ws_client_mut()?.stream();
348 self.spawn_stream_task(stream)?;
349
350 self.is_connected.store(true, Ordering::Release);
351 tracing::info!(client_id = %self.client_id, "Connected");
352 Ok(())
353 }
354
355 async fn disconnect(&mut self) -> anyhow::Result<()> {
356 if self.is_disconnected() {
357 return Ok(());
358 }
359
360 self.cancellation_token.cancel();
362
363 if let Some(ws) = self.ws_client.as_ref()
365 && let Err(e) = ws.close().await
366 {
367 tracing::warn!("Error while closing Deribit websocket: {e:?}");
368 }
369
370 for handle in self.tasks.drain(..) {
372 if let Err(e) = handle.await {
373 tracing::error!("Error joining websocket task: {e:?}");
374 }
375 }
376
377 self.cancellation_token = CancellationToken::new();
379 self.is_connected.store(false, Ordering::Relaxed);
380
381 tracing::info!(client_id = %self.client_id, "Disconnected");
382 Ok(())
383 }
384
385 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
386 todo!("Implement subscribe_instruments");
387 }
388
389 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
390 todo!("Implement subscribe_instrument");
391 }
392
393 fn subscribe_book_deltas(&mut self, _cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
394 todo!("Implement subscribe_book_deltas");
395 }
396
397 fn subscribe_book_depth10(&mut self, _cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
398 todo!("Implement subscribe_book_depth10")
399 }
400
401 fn subscribe_book_snapshots(&mut self, _cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
402 todo!("Implement subscribe_book_snapshots");
403 }
404
405 fn subscribe_quotes(&mut self, _cmd: &SubscribeQuotes) -> anyhow::Result<()> {
406 todo!("Implement subscribe_quotes")
407 }
408
409 fn subscribe_trades(&mut self, _cmd: &SubscribeTrades) -> anyhow::Result<()> {
410 todo!("Implement subscribe_trades")
411 }
412
413 fn subscribe_mark_prices(&mut self, _cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
414 todo!("Implement subscribe_mark_prices")
415 }
416
417 fn subscribe_index_prices(&mut self, _cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
418 todo!("Implement subscribe_index_prices")
419 }
420
421 fn subscribe_funding_rates(&mut self, _cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
422 todo!("Implement subscribe_funding_rates")
423 }
424
425 fn subscribe_bars(&mut self, _cmd: &SubscribeBars) -> anyhow::Result<()> {
426 todo!("Implement subscribe_bars");
427 }
428
429 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
430 todo!("Implement unsubscribe_instruments");
431 }
432
433 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
434 todo!("Implement unsubscribe_instrument");
435 }
436
437 fn unsubscribe_book_deltas(&mut self, _cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
438 todo!("Implement unsubscribe_book_deltas");
439 }
440
441 fn unsubscribe_book_depth10(&mut self, _cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
442 todo!("Implement unsubscribe_book_depth10");
443 }
444
445 fn unsubscribe_book_snapshots(
446 &mut self,
447 _cmd: &UnsubscribeBookSnapshots,
448 ) -> anyhow::Result<()> {
449 todo!("Implement unsubscribe_book_snapshots");
450 }
451
452 fn unsubscribe_quotes(&mut self, _cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
453 todo!("Implement unsubscribe_quotes");
454 }
455
456 fn unsubscribe_trades(&mut self, _cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
457 todo!("Implement unsubscribe_trades");
458 }
459
460 fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
461 todo!("Implement unsubscribe_mark_prices");
462 }
463
464 fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
465 todo!("Implement unsubscribe_index_prices");
466 }
467
468 fn unsubscribe_funding_rates(&mut self, _cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
469 todo!("Implement unsubscribe_funding_rates")
470 }
471
472 fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
473 todo!("Implement unsubscribe_bars");
474 }
475
476 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
477 if request.start.is_some() {
478 tracing::warn!(
479 "Requesting instruments for {:?} with specified `start` which has no effect",
480 request.venue
481 );
482 }
483 if request.end.is_some() {
484 tracing::warn!(
485 "Requesting instruments for {:?} with specified `end` which has no effect",
486 request.venue
487 );
488 }
489
490 let http_client = self.http_client.clone();
491 let instruments_cache = Arc::clone(&self.instruments);
492 let sender = self.data_sender.clone();
493 let request_id = request.request_id;
494 let client_id = request.client_id.unwrap_or(self.client_id);
495 let start_nanos = datetime_to_unix_nanos(request.start);
496 let end_nanos = datetime_to_unix_nanos(request.end);
497 let params = request.params.clone();
498 let clock = self.clock;
499 let venue = *DERIBIT_VENUE;
500
501 let instrument_kinds = if self.config.instrument_kinds.is_empty() {
503 vec![crate::http::models::DeribitInstrumentKind::Future]
504 } else {
505 self.config.instrument_kinds.clone()
506 };
507
508 get_runtime().spawn(async move {
509 let mut all_instruments = Vec::new();
510 for kind in &instrument_kinds {
511 tracing::debug!("Requesting instruments for currency=ANY, kind={:?}", kind);
512
513 match http_client
514 .request_instruments(DeribitCurrency::ANY, Some(*kind))
515 .await
516 {
517 Ok(instruments) => {
518 tracing::info!(
519 "Fetched {} instruments for ANY/{:?}",
520 instruments.len(),
521 kind
522 );
523
524 for instrument in instruments {
525 {
527 match instruments_cache.write() {
528 Ok(mut guard) => {
529 guard.insert(instrument.id(), instrument.clone());
530 }
531 Err(e) => {
532 tracing::error!(
533 "Instrument cache lock poisoned: {e}, skipping cache update"
534 );
535 }
536 }
537 }
538
539 all_instruments.push(instrument);
540 }
541 }
542 Err(e) => {
543 tracing::error!("Failed to fetch instruments for ANY/{:?}: {:?}", kind, e);
544 }
545 }
546 }
547
548 let response = DataResponse::Instruments(InstrumentsResponse::new(
550 request_id,
551 client_id,
552 venue,
553 all_instruments,
554 start_nanos,
555 end_nanos,
556 clock.get_time_ns(),
557 params,
558 ));
559
560 if let Err(e) = sender.send(DataEvent::Response(response)) {
561 tracing::error!("Failed to send instruments response: {}", e);
562 }
563 });
564
565 Ok(())
566 }
567
568 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
569 if request.start.is_some() {
570 tracing::warn!(
571 "Requesting instrument {} with specified `start` which has no effect",
572 request.instrument_id
573 );
574 }
575 if request.end.is_some() {
576 tracing::warn!(
577 "Requesting instrument {} with specified `end` which has no effect",
578 request.instrument_id
579 );
580 }
581
582 if let Some(instrument) = self
584 .instruments
585 .read()
586 .map_err(|e| anyhow::anyhow!("Instrument cache lock poisoned: {e}"))?
587 .get(&request.instrument_id)
588 .cloned()
589 {
590 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
591 request.request_id,
592 request.client_id.unwrap_or(self.client_id),
593 instrument.id(),
594 instrument,
595 datetime_to_unix_nanos(request.start),
596 datetime_to_unix_nanos(request.end),
597 self.clock.get_time_ns(),
598 request.params.clone(),
599 )));
600
601 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
602 tracing::error!("Failed to send instrument response: {}", e);
603 }
604 return Ok(());
605 }
606
607 tracing::debug!(
608 "Instrument {} not in cache, fetching from API",
609 request.instrument_id
610 );
611
612 let http_client = self.http_client.clone();
613 let instruments_cache = Arc::clone(&self.instruments);
614 let sender = self.data_sender.clone();
615 let instrument_id = request.instrument_id;
616 let request_id = request.request_id;
617 let client_id = request.client_id.unwrap_or(self.client_id);
618 let start_nanos = datetime_to_unix_nanos(request.start);
619 let end_nanos = datetime_to_unix_nanos(request.end);
620 let params = request.params.clone();
621 let clock = self.clock;
622
623 get_runtime().spawn(async move {
624 match http_client
625 .request_instrument(instrument_id)
626 .await
627 .context("failed to request instrument from Deribit")
628 {
629 Ok(instrument) => {
630 tracing::info!("Successfully fetched instrument: {}", instrument_id);
631
632 {
634 let mut guard = instruments_cache
635 .write()
636 .expect("instrument cache lock poisoned");
637 guard.insert(instrument.id(), instrument.clone());
638 }
639
640 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
642 request_id,
643 client_id,
644 instrument.id(),
645 instrument,
646 start_nanos,
647 end_nanos,
648 clock.get_time_ns(),
649 params,
650 )));
651
652 if let Err(e) = sender.send(DataEvent::Response(response)) {
653 tracing::error!("Failed to send instrument response: {}", e);
654 }
655 }
656 Err(e) => {
657 tracing::error!("Instrument request failed for {}: {:?}", instrument_id, e);
658 }
659 }
660 });
661
662 Ok(())
663 }
664
665 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
666 let http_client = self.http_client.clone();
667 let sender = self.data_sender.clone();
668 let instrument_id = request.instrument_id;
669 let start = request.start;
670 let end = request.end;
671 let limit = request.limit.map(|n| n.get() as u32);
672 let request_id = request.request_id;
673 let client_id = request.client_id.unwrap_or(self.client_id);
674 let params = request.params.clone();
675 let clock = self.clock;
676 let start_nanos = datetime_to_unix_nanos(start);
677 let end_nanos = datetime_to_unix_nanos(end);
678
679 get_runtime().spawn(async move {
680 match http_client
681 .request_trades(instrument_id, start, end, limit)
682 .await
683 .context("failed to request trades from Deribit")
684 {
685 Ok(trades) => {
686 let response = DataResponse::Trades(TradesResponse::new(
687 request_id,
688 client_id,
689 instrument_id,
690 trades,
691 start_nanos,
692 end_nanos,
693 clock.get_time_ns(),
694 params,
695 ));
696 if let Err(e) = sender.send(DataEvent::Response(response)) {
697 tracing::error!("Failed to send trades response: {e}");
698 }
699 }
700 Err(e) => tracing::error!("Trades request failed for {}: {:?}", instrument_id, e),
701 }
702 });
703
704 Ok(())
705 }
706
707 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
708 let http_client = self.http_client.clone();
709 let sender = self.data_sender.clone();
710 let bar_type = request.bar_type;
711 let start = request.start;
712 let end = request.end;
713 let limit = request.limit.map(|n| n.get() as u32);
714 let request_id = request.request_id;
715 let client_id = request.client_id.unwrap_or(self.client_id);
716 let params = request.params.clone();
717 let clock = self.clock;
718 let start_nanos = datetime_to_unix_nanos(start);
719 let end_nanos = datetime_to_unix_nanos(end);
720
721 get_runtime().spawn(async move {
722 match http_client
723 .request_bars(bar_type, start, end, limit)
724 .await
725 .context("failed to request bars from Deribit")
726 {
727 Ok(bars) => {
728 let response = DataResponse::Bars(BarsResponse::new(
729 request_id,
730 client_id,
731 bar_type,
732 bars,
733 start_nanos,
734 end_nanos,
735 clock.get_time_ns(),
736 params,
737 ));
738 if let Err(e) = sender.send(DataEvent::Response(response)) {
739 tracing::error!("Failed to send bars response: {e}");
740 }
741 }
742 Err(e) => tracing::error!("Bars request failed for {}: {:?}", bar_type, e),
743 }
744 });
745
746 Ok(())
747 }
748
749 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
750 let http_client = self.http_client.clone();
751 let sender = self.data_sender.clone();
752 let instrument_id = request.instrument_id;
753 let depth = request.depth.map(|n| n.get() as u32);
754 let request_id = request.request_id;
755 let client_id = request.client_id.unwrap_or(self.client_id);
756 let params = request.params.clone();
757 let clock = self.clock;
758
759 get_runtime().spawn(async move {
760 match http_client
761 .request_book_snapshot(instrument_id, depth)
762 .await
763 .context("failed to request book snapshot from Deribit")
764 {
765 Ok(book) => {
766 let response = DataResponse::Book(BookResponse::new(
767 request_id,
768 client_id,
769 instrument_id,
770 book,
771 None,
772 None,
773 clock.get_time_ns(),
774 params,
775 ));
776 if let Err(e) = sender.send(DataEvent::Response(response)) {
777 tracing::error!("Failed to send book snapshot response: {e}");
778 }
779 }
780 Err(e) => {
781 tracing::error!(
782 "Book snapshot request failed for {}: {:?}",
783 instrument_id,
784 e
785 );
786 }
787 }
788 });
789
790 Ok(())
791 }
792}