nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Defines the `DataClient` trait, the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    any::Any,
23    fmt::{Debug, Display},
24    ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29    RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30    RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31    SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32    SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33    SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34    SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35    UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36    UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37    UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38    UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42    RequestPoolSnapshot, SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects,
43    SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
44    UnsubscribePool, UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
45    UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50    data::{BarType, DataType},
51    identifiers::{ClientId, InstrumentId, Venue},
52};
53
54#[cfg(feature = "defi")]
55#[allow(unused_imports)] // Brings DeFi impl blocks into scope
56use crate::defi::client as _;
57
58/// Defines the interface for a data client, managing connections, subscriptions, and requests.
59#[async_trait::async_trait]
60pub trait DataClient: Any + Sync + Send {
61    /// Returns the unique identifier for this data client.
62    fn client_id(&self) -> ClientId;
63
64    /// Returns the optional venue this client is associated with.
65    fn venue(&self) -> Option<Venue>;
66
67    /// Starts the data client.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the operation fails.
72    fn start(&mut self) -> anyhow::Result<()>;
73
74    /// Stops the data client.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the operation fails.
79    fn stop(&mut self) -> anyhow::Result<()>;
80
81    /// Resets the data client to its initial state.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the operation fails.
86    fn reset(&mut self) -> anyhow::Result<()>;
87
88    /// Disposes of client resources and cleans up.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the operation fails.
93    fn dispose(&mut self) -> anyhow::Result<()>;
94
95    /// Connects external API's if needed.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the operation fails.
100    async fn connect(&mut self) -> anyhow::Result<()> {
101        Ok(())
102    }
103
104    /// Disconnects external API's if needed.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the operation fails.
109    async fn disconnect(&mut self) -> anyhow::Result<()> {
110        Ok(())
111    }
112
113    /// Returns `true` if the client is currently connected.
114    fn is_connected(&self) -> bool;
115
116    /// Returns `true` if the client is currently disconnected.
117    fn is_disconnected(&self) -> bool;
118
119    /// Subscribes to custom data types according to the command.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the subscribe operation fails.
124    fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
125        log_not_implemented(&cmd);
126        Ok(())
127    }
128
129    /// Subscribes to instruments list for the specified venue.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the subscribe operation fails.
134    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
135        log_not_implemented(&cmd);
136        Ok(())
137    }
138
139    /// Subscribes to data for a single instrument.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if the subscribe operation fails.
144    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
145        log_not_implemented(&cmd);
146        Ok(())
147    }
148
149    /// Subscribes to order book delta updates for the specified instrument.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the subscribe operation fails.
154    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
155        log_not_implemented(&cmd);
156        Ok(())
157    }
158
159    /// Subscribes to top 10 order book depth updates for the specified instrument.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the subscribe operation fails.
164    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
165        log_not_implemented(&cmd);
166        Ok(())
167    }
168
169    /// Subscribes to periodic order book snapshots for the specified instrument.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the subscribe operation fails.
174    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
175        log_not_implemented(&cmd);
176        Ok(())
177    }
178
179    /// Subscribes to quote updates for the specified instrument.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if the subscribe operation fails.
184    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
185        log_not_implemented(&cmd);
186        Ok(())
187    }
188
189    /// Subscribes to trade updates for the specified instrument.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if the subscribe operation fails.
194    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
195        log_not_implemented(&cmd);
196        Ok(())
197    }
198
199    /// Subscribes to mark price updates for the specified instrument.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the subscribe operation fails.
204    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
205        log_not_implemented(&cmd);
206        Ok(())
207    }
208
209    /// Subscribes to index price updates for the specified instrument.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the subscribe operation fails.
214    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
215        log_not_implemented(&cmd);
216        Ok(())
217    }
218
219    /// Subscribes to funding rate updates for the specified instrument.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the subscribe operation fails.
224    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
225        log_not_implemented(&cmd);
226        Ok(())
227    }
228
229    /// Subscribes to bar updates of the specified bar type.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the subscribe operation fails.
234    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
235        log_not_implemented(&cmd);
236        Ok(())
237    }
238
239    /// Subscribes to status updates for the specified instrument.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the subscribe operation fails.
244    fn subscribe_instrument_status(
245        &mut self,
246        cmd: &SubscribeInstrumentStatus,
247    ) -> anyhow::Result<()> {
248        log_not_implemented(&cmd);
249        Ok(())
250    }
251
252    /// Subscribes to instrument close events for the specified instrument.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the subscription operation fails.
257    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
258        log_not_implemented(&cmd);
259        Ok(())
260    }
261
262    #[cfg(feature = "defi")]
263    /// Subscribes to blocks for a specified blockchain.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the subscription operation fails.
268    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
269        log_not_implemented(&cmd);
270        Ok(())
271    }
272
273    #[cfg(feature = "defi")]
274    /// Subscribes to pool definition updates for a specified AMM pool.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the subscription operation fails.
279    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
280        log_not_implemented(&cmd);
281        Ok(())
282    }
283
284    #[cfg(feature = "defi")]
285    /// Subscribes to pool swaps for a specified AMM pool.
286    ///
287    /// # Errors
288    ///
289    /// Returns an error if the subscription operation fails.
290    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
291        log_not_implemented(&cmd);
292        Ok(())
293    }
294
295    #[cfg(feature = "defi")]
296    /// Subscribes to pool liquidity updates for a specified AMM pool.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the subscription operation fails.
301    fn subscribe_pool_liquidity_updates(
302        &mut self,
303        cmd: &SubscribePoolLiquidityUpdates,
304    ) -> anyhow::Result<()> {
305        log_not_implemented(&cmd);
306        Ok(())
307    }
308
309    #[cfg(feature = "defi")]
310    /// Subscribes to pool fee collects for a specified AMM pool.
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if the subscription operation fails.
315    fn subscribe_pool_fee_collects(
316        &mut self,
317        cmd: &SubscribePoolFeeCollects,
318    ) -> anyhow::Result<()> {
319        log_not_implemented(&cmd);
320        Ok(())
321    }
322
323    #[cfg(feature = "defi")]
324    /// Subscribes to pool flash loan events for a specified AMM pool.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the subscription operation fails.
329    fn subscribe_pool_flash_events(
330        &mut self,
331        cmd: &SubscribePoolFlashEvents,
332    ) -> anyhow::Result<()> {
333        log_not_implemented(&cmd);
334        Ok(())
335    }
336
337    /// Unsubscribes from custom data types according to the command.
338    ///
339    /// # Errors
340    ///
341    /// Returns an error if the unsubscribe operation fails.
342    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
343        log_not_implemented(&cmd);
344        Ok(())
345    }
346
347    /// Unsubscribes from instruments list for the specified venue.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if the unsubscribe operation fails.
352    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
353        log_not_implemented(&cmd);
354        Ok(())
355    }
356
357    /// Unsubscribes from data for the specified instrument.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the unsubscribe operation fails.
362    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
363        log_not_implemented(&cmd);
364        Ok(())
365    }
366
367    /// Unsubscribes from order book delta updates for the specified instrument.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the unsubscribe operation fails.
372    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
373        log_not_implemented(&cmd);
374        Ok(())
375    }
376
377    /// Unsubscribes from top 10 order book depth updates for the specified instrument.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if the unsubscribe operation fails.
382    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
383        log_not_implemented(&cmd);
384        Ok(())
385    }
386
387    /// Unsubscribes from periodic order book snapshots for the specified instrument.
388    ///
389    /// # Errors
390    ///
391    /// Returns an error if the unsubscribe operation fails.
392    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
393        log_not_implemented(&cmd);
394        Ok(())
395    }
396
397    /// Unsubscribes from quote updates for the specified instrument.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if the unsubscribe operation fails.
402    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
403        log_not_implemented(&cmd);
404        Ok(())
405    }
406
407    /// Unsubscribes from trade updates for the specified instrument.
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if the unsubscribe operation fails.
412    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
413        log_not_implemented(&cmd);
414        Ok(())
415    }
416
417    /// Unsubscribes from mark price updates for the specified instrument.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if the unsubscribe operation fails.
422    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
423        log_not_implemented(&cmd);
424        Ok(())
425    }
426
427    /// Unsubscribes from index price updates for the specified instrument.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the unsubscribe operation fails.
432    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
433        log_not_implemented(&cmd);
434        Ok(())
435    }
436
437    /// Unsubscribes from funding rate updates for the specified instrument.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if the unsubscribe operation fails.
442    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
443        log_not_implemented(&cmd);
444        Ok(())
445    }
446
447    /// Unsubscribes from bar updates of the specified bar type.
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if the unsubscribe operation fails.
452    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
453        log_not_implemented(&cmd);
454        Ok(())
455    }
456
457    /// Unsubscribes from instrument status updates for the specified instrument.
458    ///
459    /// # Errors
460    ///
461    /// Returns an error if the unsubscribe operation fails.
462    fn unsubscribe_instrument_status(
463        &mut self,
464        cmd: &UnsubscribeInstrumentStatus,
465    ) -> anyhow::Result<()> {
466        log_not_implemented(&cmd);
467        Ok(())
468    }
469
470    /// Unsubscribes from instrument close events for the specified instrument.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the unsubscribe operation fails.
475    fn unsubscribe_instrument_close(
476        &mut self,
477        cmd: &UnsubscribeInstrumentClose,
478    ) -> anyhow::Result<()> {
479        log_not_implemented(&cmd);
480        Ok(())
481    }
482
483    #[cfg(feature = "defi")]
484    /// Unsubscribes from blocks for a specified blockchain.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if the subscription operation fails.
489    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
490        log_not_implemented(&cmd);
491        Ok(())
492    }
493
494    #[cfg(feature = "defi")]
495    /// Unsubscribes from pool definition updates for a specified AMM pool.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if the subscription operation fails.
500    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
501        log_not_implemented(&cmd);
502        Ok(())
503    }
504
505    #[cfg(feature = "defi")]
506    /// Unsubscribes from swaps for a specified AMM pool.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the subscription operation fails.
511    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
512        log_not_implemented(&cmd);
513        Ok(())
514    }
515
516    #[cfg(feature = "defi")]
517    /// Unsubscribes from pool liquidity updates for a specified AMM pool.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if the subscription operation fails.
522    fn unsubscribe_pool_liquidity_updates(
523        &mut self,
524        cmd: &UnsubscribePoolLiquidityUpdates,
525    ) -> anyhow::Result<()> {
526        log_not_implemented(&cmd);
527        Ok(())
528    }
529
530    #[cfg(feature = "defi")]
531    /// Unsubscribes from pool fee collects for a specified AMM pool.
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if the subscription operation fails.
536    fn unsubscribe_pool_fee_collects(
537        &mut self,
538        cmd: &UnsubscribePoolFeeCollects,
539    ) -> anyhow::Result<()> {
540        log_not_implemented(&cmd);
541        Ok(())
542    }
543
544    #[cfg(feature = "defi")]
545    /// Unsubscribes from pool flash loan events for a specified AMM pool.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if the subscription operation fails.
550    fn unsubscribe_pool_flash_events(
551        &mut self,
552        cmd: &UnsubscribePoolFlashEvents,
553    ) -> anyhow::Result<()> {
554        log_not_implemented(&cmd);
555        Ok(())
556    }
557
558    /// Sends a custom data request to the provider.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if the data request fails.
563    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
564        log_not_implemented(&request);
565        Ok(())
566    }
567
568    /// Requests a list of instruments from the provider for a given venue.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the instruments request fails.
573    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
574        log_not_implemented(&request);
575        Ok(())
576    }
577
578    /// Requests detailed data for a single instrument.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if the instrument request fails.
583    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
584        log_not_implemented(&request);
585        Ok(())
586    }
587
588    /// Requests a snapshot of the order book for a specified instrument.
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if the book snapshot request fails.
593    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
594        log_not_implemented(&request);
595        Ok(())
596    }
597
598    /// Requests historical or streaming quote data for a specified instrument.
599    ///
600    /// # Errors
601    ///
602    /// Returns an error if the quotes request fails.
603    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
604        log_not_implemented(&request);
605        Ok(())
606    }
607
608    /// Requests historical or streaming trade data for a specified instrument.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the trades request fails.
613    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
614        log_not_implemented(&request);
615        Ok(())
616    }
617
618    /// Requests historical or streaming bar data for a specified instrument and bar type.
619    ///
620    /// # Errors
621    ///
622    /// Returns an error if the bars request fails.
623    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
624        log_not_implemented(&request);
625        Ok(())
626    }
627
628    /// Requests historical order book depth data for a specified instrument.
629    ///
630    /// # Errors
631    ///
632    /// Returns an error if the order book depths request fails.
633    fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
634        log_not_implemented(&request);
635        Ok(())
636    }
637
638    #[cfg(feature = "defi")]
639    /// Requests a snapshot of a specific AMM pool.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the pool snapshot request fails.
644    fn request_pool_snapshot(&self, request: &RequestPoolSnapshot) -> anyhow::Result<()> {
645        log_not_implemented(&request);
646        Ok(())
647    }
648}
649
650/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
651pub struct DataClientAdapter {
652    pub(crate) client: Box<dyn DataClient>,
653    pub client_id: ClientId,
654    pub venue: Option<Venue>,
655    pub handles_book_deltas: bool,
656    pub handles_book_snapshots: bool,
657    pub subscriptions_custom: AHashSet<DataType>,
658    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
659    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
660    pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
661    pub subscriptions_quotes: AHashSet<InstrumentId>,
662    pub subscriptions_trades: AHashSet<InstrumentId>,
663    pub subscriptions_bars: AHashSet<BarType>,
664    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
665    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
666    pub subscriptions_instrument: AHashSet<InstrumentId>,
667    pub subscriptions_instrument_venue: AHashSet<Venue>,
668    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
669    pub subscriptions_index_prices: AHashSet<InstrumentId>,
670    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
671    #[cfg(feature = "defi")]
672    pub subscriptions_blocks: AHashSet<Blockchain>,
673    #[cfg(feature = "defi")]
674    pub subscriptions_pools: AHashSet<InstrumentId>,
675    #[cfg(feature = "defi")]
676    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
677    #[cfg(feature = "defi")]
678    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
679    #[cfg(feature = "defi")]
680    pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
681    #[cfg(feature = "defi")]
682    pub subscriptions_pool_flash: AHashSet<InstrumentId>,
683}
684
685impl Deref for DataClientAdapter {
686    type Target = Box<dyn DataClient>;
687
688    fn deref(&self) -> &Self::Target {
689        &self.client
690    }
691}
692
693impl DerefMut for DataClientAdapter {
694    fn deref_mut(&mut self) -> &mut Self::Target {
695        &mut self.client
696    }
697}
698
699impl Debug for DataClientAdapter {
700    #[rustfmt::skip]
701    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702        f.debug_struct(stringify!(DataClientAdapter))
703            .field("client_id", &self.client_id)
704            .field("venue", &self.venue)
705            .field("handles_book_deltas", &self.handles_book_deltas)
706            .field("handles_book_snapshots", &self.handles_book_snapshots)
707            .field("subscriptions_custom", &self.subscriptions_custom)
708            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
709            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
710            .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
711            .field("subscriptions_quotes", &self.subscriptions_quotes)
712            .field("subscriptions_trades", &self.subscriptions_trades)
713            .field("subscriptions_bars", &self.subscriptions_bars)
714            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
715            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
716            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
717            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
718            .field("subscriptions_instrument", &self.subscriptions_instrument)
719            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
720            .finish()
721    }
722}
723
724impl DataClientAdapter {
725    /// Creates a new [`DataClientAdapter`] with the given client and clock.
726    #[must_use]
727    pub fn new(
728        client_id: ClientId,
729        venue: Option<Venue>,
730        handles_order_book_deltas: bool,
731        handles_order_book_snapshots: bool,
732        client: Box<dyn DataClient>,
733    ) -> Self {
734        Self {
735            client,
736            client_id,
737            venue,
738            handles_book_deltas: handles_order_book_deltas,
739            handles_book_snapshots: handles_order_book_snapshots,
740            subscriptions_custom: AHashSet::new(),
741            subscriptions_book_deltas: AHashSet::new(),
742            subscriptions_book_depth10: AHashSet::new(),
743            subscriptions_book_snapshots: AHashSet::new(),
744            subscriptions_quotes: AHashSet::new(),
745            subscriptions_trades: AHashSet::new(),
746            subscriptions_mark_prices: AHashSet::new(),
747            subscriptions_index_prices: AHashSet::new(),
748            subscriptions_funding_rates: AHashSet::new(),
749            subscriptions_bars: AHashSet::new(),
750            subscriptions_instrument_status: AHashSet::new(),
751            subscriptions_instrument_close: AHashSet::new(),
752            subscriptions_instrument: AHashSet::new(),
753            subscriptions_instrument_venue: AHashSet::new(),
754            #[cfg(feature = "defi")]
755            subscriptions_blocks: AHashSet::new(),
756            #[cfg(feature = "defi")]
757            subscriptions_pools: AHashSet::new(),
758            #[cfg(feature = "defi")]
759            subscriptions_pool_swaps: AHashSet::new(),
760            #[cfg(feature = "defi")]
761            subscriptions_pool_liquidity_updates: AHashSet::new(),
762            #[cfg(feature = "defi")]
763            subscriptions_pool_fee_collects: AHashSet::new(),
764            #[cfg(feature = "defi")]
765            subscriptions_pool_flash: AHashSet::new(),
766        }
767    }
768
769    #[allow(clippy::borrowed_box)]
770    #[must_use]
771    pub fn get_client(&self) -> &Box<dyn DataClient> {
772        &self.client
773    }
774
775    #[inline]
776    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
777        if let Err(e) = match cmd {
778            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
779            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
780            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
781            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
782            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
783            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
784            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
785            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
786            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
787            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
788            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
789            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
790            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
791            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
792        } {
793            log_command_error(&cmd, &e);
794        }
795    }
796
797    #[inline]
798    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
799        if let Err(e) = match cmd {
800            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
801            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
802            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
803            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
804            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
805            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
806            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
807            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
808            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
809            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
810            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
811            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
812            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
813            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
814        } {
815            log_command_error(&cmd, &e);
816        }
817    }
818
819    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
820
821    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if the underlying client subscribe operation fails.
826    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
827        if !self.subscriptions_custom.contains(&cmd.data_type) {
828            self.subscriptions_custom.insert(cmd.data_type.clone());
829            self.client.subscribe(cmd)?;
830        }
831        Ok(())
832    }
833
834    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
835    ///
836    /// # Errors
837    ///
838    /// Returns an error if the underlying client unsubscribe operation fails.
839    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
840        if self.subscriptions_custom.contains(&cmd.data_type) {
841            self.subscriptions_custom.remove(&cmd.data_type);
842            self.client.unsubscribe(cmd)?;
843        }
844        Ok(())
845    }
846
847    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
848    ///
849    /// # Errors
850    ///
851    /// Returns an error if the underlying client subscribe operation fails.
852    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
853        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
854            self.subscriptions_instrument_venue.insert(cmd.venue);
855            self.client.subscribe_instruments(cmd)?;
856        }
857
858        Ok(())
859    }
860
861    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
862    ///
863    /// # Errors
864    ///
865    /// Returns an error if the underlying client unsubscribe operation fails.
866    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
867        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
868            self.subscriptions_instrument_venue.remove(&cmd.venue);
869            self.client.unsubscribe_instruments(cmd)?;
870        }
871
872        Ok(())
873    }
874
875    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
876    ///
877    /// # Errors
878    ///
879    /// Returns an error if the underlying client subscribe operation fails.
880    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
881        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
882            self.subscriptions_instrument.insert(cmd.instrument_id);
883            self.client.subscribe_instrument(cmd)?;
884        }
885
886        Ok(())
887    }
888
889    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if the underlying client unsubscribe operation fails.
894    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
895        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
896            self.subscriptions_instrument.remove(&cmd.instrument_id);
897            self.client.unsubscribe_instrument(cmd)?;
898        }
899
900        Ok(())
901    }
902
903    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
904    ///
905    /// # Errors
906    ///
907    /// Returns an error if the underlying client subscribe operation fails.
908    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
909        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
910            self.subscriptions_book_deltas.insert(cmd.instrument_id);
911            self.client.subscribe_book_deltas(cmd)?;
912        }
913
914        Ok(())
915    }
916
917    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the underlying client unsubscribe operation fails.
922    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
923        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
924            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
925            self.client.unsubscribe_book_deltas(cmd)?;
926        }
927
928        Ok(())
929    }
930
931    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the underlying client subscribe operation fails.
936    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
937        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
938            self.subscriptions_book_depth10.insert(cmd.instrument_id);
939            self.client.subscribe_book_depth10(cmd)?;
940        }
941
942        Ok(())
943    }
944
945    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
946    ///
947    /// # Errors
948    ///
949    /// Returns an error if the underlying client unsubscribe operation fails.
950    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
951        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
952            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
953            self.client.unsubscribe_book_depth10(cmd)?;
954        }
955
956        Ok(())
957    }
958
959    /// Subscribes to book snapshots for an instrument, updating internal state and forwarding to the client.
960    ///
961    /// # Errors
962    ///
963    /// Returns an error if the underlying client subscribe operation fails.
964    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
965        if !self
966            .subscriptions_book_snapshots
967            .contains(&cmd.instrument_id)
968        {
969            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
970            self.client.subscribe_book_snapshots(cmd)?;
971        }
972
973        Ok(())
974    }
975
976    /// Unsubscribes from book snapshots for an instrument, updating internal state and forwarding to the client.
977    ///
978    /// # Errors
979    ///
980    /// Returns an error if the underlying client unsubscribe operation fails.
981    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
982        if self
983            .subscriptions_book_snapshots
984            .contains(&cmd.instrument_id)
985        {
986            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
987            self.client.unsubscribe_book_snapshots(cmd)?;
988        }
989
990        Ok(())
991    }
992
993    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
994    ///
995    /// # Errors
996    ///
997    /// Returns an error if the underlying client subscribe operation fails.
998    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
999        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
1000            self.subscriptions_quotes.insert(cmd.instrument_id);
1001            self.client.subscribe_quotes(cmd)?;
1002        }
1003        Ok(())
1004    }
1005
1006    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns an error if the underlying client unsubscribe operation fails.
1011    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1012        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
1013            self.subscriptions_quotes.remove(&cmd.instrument_id);
1014            self.client.unsubscribe_quotes(cmd)?;
1015        }
1016        Ok(())
1017    }
1018
1019    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns an error if the underlying client subscribe operation fails.
1024    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1025        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
1026            self.subscriptions_trades.insert(cmd.instrument_id);
1027            self.client.subscribe_trades(cmd)?;
1028        }
1029        Ok(())
1030    }
1031
1032    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
1033    ///
1034    /// # Errors
1035    ///
1036    /// Returns an error if the underlying client unsubscribe operation fails.
1037    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1038        if self.subscriptions_trades.contains(&cmd.instrument_id) {
1039            self.subscriptions_trades.remove(&cmd.instrument_id);
1040            self.client.unsubscribe_trades(cmd)?;
1041        }
1042        Ok(())
1043    }
1044
1045    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
1046    ///
1047    /// # Errors
1048    ///
1049    /// Returns an error if the underlying client subscribe operation fails.
1050    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1051        if !self.subscriptions_bars.contains(&cmd.bar_type) {
1052            self.subscriptions_bars.insert(cmd.bar_type);
1053            self.client.subscribe_bars(cmd)?;
1054        }
1055        Ok(())
1056    }
1057
1058    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
1059    ///
1060    /// # Errors
1061    ///
1062    /// Returns an error if the underlying client unsubscribe operation fails.
1063    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1064        if self.subscriptions_bars.contains(&cmd.bar_type) {
1065            self.subscriptions_bars.remove(&cmd.bar_type);
1066            self.client.unsubscribe_bars(cmd)?;
1067        }
1068        Ok(())
1069    }
1070
1071    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
1072    ///
1073    /// # Errors
1074    ///
1075    /// Returns an error if the underlying client subscribe operation fails.
1076    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1077        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1078            self.subscriptions_mark_prices.insert(cmd.instrument_id);
1079            self.client.subscribe_mark_prices(cmd)?;
1080        }
1081        Ok(())
1082    }
1083
1084    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
1085    ///
1086    /// # Errors
1087    ///
1088    /// Returns an error if the underlying client unsubscribe operation fails.
1089    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1090        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1091            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1092            self.client.unsubscribe_mark_prices(cmd)?;
1093        }
1094        Ok(())
1095    }
1096
1097    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
1098    ///
1099    /// # Errors
1100    ///
1101    /// Returns an error if the underlying client subscribe operation fails.
1102    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1103        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1104            self.subscriptions_index_prices.insert(cmd.instrument_id);
1105            self.client.subscribe_index_prices(cmd)?;
1106        }
1107        Ok(())
1108    }
1109
1110    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if the underlying client unsubscribe operation fails.
1115    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1116        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1117            self.subscriptions_index_prices.remove(&cmd.instrument_id);
1118            self.client.unsubscribe_index_prices(cmd)?;
1119        }
1120        Ok(())
1121    }
1122
1123    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
1124    ///
1125    /// # Errors
1126    ///
1127    /// Returns an error if the underlying client subscribe operation fails.
1128    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1129        if !self
1130            .subscriptions_funding_rates
1131            .contains(&cmd.instrument_id)
1132        {
1133            self.subscriptions_funding_rates.insert(cmd.instrument_id);
1134            self.client.subscribe_funding_rates(cmd)?;
1135        }
1136        Ok(())
1137    }
1138
1139    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
1140    ///
1141    /// # Errors
1142    ///
1143    /// Returns an error if the underlying client unsubscribe operation fails.
1144    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1145        if self
1146            .subscriptions_funding_rates
1147            .contains(&cmd.instrument_id)
1148        {
1149            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1150            self.client.unsubscribe_funding_rates(cmd)?;
1151        }
1152        Ok(())
1153    }
1154
1155    /// Subscribes to instrument status updates for the specified instrument.
1156    ///
1157    /// # Errors
1158    ///
1159    /// Returns an error if the underlying client subscribe operation fails.
1160    fn subscribe_instrument_status(
1161        &mut self,
1162        cmd: &SubscribeInstrumentStatus,
1163    ) -> anyhow::Result<()> {
1164        if !self
1165            .subscriptions_instrument_status
1166            .contains(&cmd.instrument_id)
1167        {
1168            self.subscriptions_instrument_status
1169                .insert(cmd.instrument_id);
1170            self.client.subscribe_instrument_status(cmd)?;
1171        }
1172        Ok(())
1173    }
1174
1175    /// Unsubscribes from instrument status updates for the specified instrument.
1176    ///
1177    /// # Errors
1178    ///
1179    /// Returns an error if the underlying client unsubscribe operation fails.
1180    fn unsubscribe_instrument_status(
1181        &mut self,
1182        cmd: &UnsubscribeInstrumentStatus,
1183    ) -> anyhow::Result<()> {
1184        if self
1185            .subscriptions_instrument_status
1186            .contains(&cmd.instrument_id)
1187        {
1188            self.subscriptions_instrument_status
1189                .remove(&cmd.instrument_id);
1190            self.client.unsubscribe_instrument_status(cmd)?;
1191        }
1192        Ok(())
1193    }
1194
1195    /// Subscribes to instrument close events for the specified instrument.
1196    ///
1197    /// # Errors
1198    ///
1199    /// Returns an error if the underlying client subscribe operation fails.
1200    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1201        if !self
1202            .subscriptions_instrument_close
1203            .contains(&cmd.instrument_id)
1204        {
1205            self.subscriptions_instrument_close
1206                .insert(cmd.instrument_id);
1207            self.client.subscribe_instrument_close(cmd)?;
1208        }
1209        Ok(())
1210    }
1211
1212    /// Unsubscribes from instrument close events for the specified instrument.
1213    ///
1214    /// # Errors
1215    ///
1216    /// Returns an error if the underlying client unsubscribe operation fails.
1217    fn unsubscribe_instrument_close(
1218        &mut self,
1219        cmd: &UnsubscribeInstrumentClose,
1220    ) -> anyhow::Result<()> {
1221        if self
1222            .subscriptions_instrument_close
1223            .contains(&cmd.instrument_id)
1224        {
1225            self.subscriptions_instrument_close
1226                .remove(&cmd.instrument_id);
1227            self.client.unsubscribe_instrument_close(cmd)?;
1228        }
1229        Ok(())
1230    }
1231
1232    // -- REQUEST HANDLERS ------------------------------------------------------------------------
1233
1234    /// Sends a data request to the underlying client.
1235    ///
1236    /// # Errors
1237    ///
1238    /// Returns an error if the client request fails.
1239    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1240        self.client.request_data(req)
1241    }
1242
1243    /// Sends a single instrument request to the client.
1244    ///
1245    /// # Errors
1246    ///
1247    /// Returns an error if the client fails to process the request.
1248    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1249        self.client.request_instrument(req)
1250    }
1251
1252    /// Sends a batch instruments request to the client.
1253    ///
1254    /// # Errors
1255    ///
1256    /// Returns an error if the client fails to process the request.
1257    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1258        self.client.request_instruments(req)
1259    }
1260
1261    /// Sends a quotes request for a given instrument.
1262    ///
1263    /// # Errors
1264    ///
1265    /// Returns an error if the client fails to process the quotes request.
1266    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1267        self.client.request_quotes(req)
1268    }
1269
1270    /// Sends a trades request for a given instrument.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if the client fails to process the trades request.
1275    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1276        self.client.request_trades(req)
1277    }
1278
1279    /// Sends a bars request for a given instrument and bar type.
1280    ///
1281    /// # Errors
1282    ///
1283    /// Returns an error if the client fails to process the bars request.
1284    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1285        self.client.request_bars(req)
1286    }
1287
1288    /// Sends an order book depths request for a given instrument.
1289    ///
1290    /// # Errors
1291    ///
1292    /// Returns an error if the client fails to process the order book depths request.
1293    pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1294        self.client.request_book_depth(req)
1295    }
1296}
1297
1298#[inline(always)]
1299fn log_not_implemented<T: Debug>(msg: &T) {
1300    log::warn!("{msg:?} – handler not implemented");
1301}
1302
1303#[inline(always)]
1304fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1305    log::error!("Error on {cmd:?}: {e}");
1306}