nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Defines the `DataClient` trait, the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    any::Any,
23    fmt::{Debug, Display},
24    ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29    RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30    RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31    SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32    SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33    SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34    SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35    UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36    UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37    UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38    UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42    RequestPoolSnapshot, SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects,
43    SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
44    UnsubscribePool, UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
45    UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50    data::{BarType, DataType},
51    identifiers::{ClientId, InstrumentId, Venue},
52};
53
54#[cfg(feature = "defi")]
55#[allow(unused_imports)] // Brings DeFi impl blocks into scope
56use crate::defi::client as _;
57
58/// Defines the interface for a data client, managing connections, subscriptions, and requests.
59#[async_trait::async_trait]
60pub trait DataClient: Any + Sync + Send {
61    /// Returns the unique identifier for this data client.
62    fn client_id(&self) -> ClientId;
63
64    /// Returns the optional venue this client is associated with.
65    fn venue(&self) -> Option<Venue>;
66
67    /// Starts the data client.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the operation fails.
72    fn start(&mut self) -> anyhow::Result<()>;
73
74    /// Stops the data client.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the operation fails.
79    fn stop(&mut self) -> anyhow::Result<()>;
80
81    /// Resets the data client to its initial state.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the operation fails.
86    fn reset(&mut self) -> anyhow::Result<()>;
87
88    /// Disposes of client resources and cleans up.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the operation fails.
93    fn dispose(&mut self) -> anyhow::Result<()>;
94
95    /// Connects external API's if needed.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the operation fails.
100    async fn connect(&mut self) -> anyhow::Result<()>;
101
102    /// Disconnects external API's if needed.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the operation fails.
107    async fn disconnect(&mut self) -> anyhow::Result<()>;
108
109    /// Returns `true` if the client is currently connected.
110    fn is_connected(&self) -> bool;
111
112    /// Returns `true` if the client is currently disconnected.
113    fn is_disconnected(&self) -> bool;
114
115    /// Subscribes to custom data types according to the command.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the subscribe operation fails.
120    fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
121        log_not_implemented(&cmd);
122        Ok(())
123    }
124
125    /// Subscribes to instruments list for the specified venue.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the subscribe operation fails.
130    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
131        log_not_implemented(&cmd);
132        Ok(())
133    }
134
135    /// Subscribes to data for a single instrument.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the subscribe operation fails.
140    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
141        log_not_implemented(&cmd);
142        Ok(())
143    }
144
145    /// Subscribes to order book delta updates for the specified instrument.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the subscribe operation fails.
150    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
151        log_not_implemented(&cmd);
152        Ok(())
153    }
154
155    /// Subscribes to top 10 order book depth updates for the specified instrument.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the subscribe operation fails.
160    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
161        log_not_implemented(&cmd);
162        Ok(())
163    }
164
165    /// Subscribes to periodic order book snapshots for the specified instrument.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the subscribe operation fails.
170    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
171        log_not_implemented(&cmd);
172        Ok(())
173    }
174
175    /// Subscribes to quote updates for the specified instrument.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the subscribe operation fails.
180    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
181        log_not_implemented(&cmd);
182        Ok(())
183    }
184
185    /// Subscribes to trade updates for the specified instrument.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the subscribe operation fails.
190    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
191        log_not_implemented(&cmd);
192        Ok(())
193    }
194
195    /// Subscribes to mark price updates for the specified instrument.
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if the subscribe operation fails.
200    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
201        log_not_implemented(&cmd);
202        Ok(())
203    }
204
205    /// Subscribes to index price updates for the specified instrument.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if the subscribe operation fails.
210    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
211        log_not_implemented(&cmd);
212        Ok(())
213    }
214
215    /// Subscribes to funding rate updates for the specified instrument.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if the subscribe operation fails.
220    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
221        log_not_implemented(&cmd);
222        Ok(())
223    }
224
225    /// Subscribes to bar updates of the specified bar type.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if the subscribe operation fails.
230    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
231        log_not_implemented(&cmd);
232        Ok(())
233    }
234
235    /// Subscribes to status updates for the specified instrument.
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if the subscribe operation fails.
240    fn subscribe_instrument_status(
241        &mut self,
242        cmd: &SubscribeInstrumentStatus,
243    ) -> anyhow::Result<()> {
244        log_not_implemented(&cmd);
245        Ok(())
246    }
247
248    /// Subscribes to instrument close events for the specified instrument.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if the subscription operation fails.
253    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
254        log_not_implemented(&cmd);
255        Ok(())
256    }
257
258    #[cfg(feature = "defi")]
259    /// Subscribes to blocks for a specified blockchain.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the subscription operation fails.
264    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
265        log_not_implemented(&cmd);
266        Ok(())
267    }
268
269    #[cfg(feature = "defi")]
270    /// Subscribes to pool definition updates for a specified AMM pool.
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if the subscription operation fails.
275    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
276        log_not_implemented(&cmd);
277        Ok(())
278    }
279
280    #[cfg(feature = "defi")]
281    /// Subscribes to pool swaps for a specified AMM pool.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the subscription operation fails.
286    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
287        log_not_implemented(&cmd);
288        Ok(())
289    }
290
291    #[cfg(feature = "defi")]
292    /// Subscribes to pool liquidity updates for a specified AMM pool.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the subscription operation fails.
297    fn subscribe_pool_liquidity_updates(
298        &mut self,
299        cmd: &SubscribePoolLiquidityUpdates,
300    ) -> anyhow::Result<()> {
301        log_not_implemented(&cmd);
302        Ok(())
303    }
304
305    #[cfg(feature = "defi")]
306    /// Subscribes to pool fee collects for a specified AMM pool.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the subscription operation fails.
311    fn subscribe_pool_fee_collects(
312        &mut self,
313        cmd: &SubscribePoolFeeCollects,
314    ) -> anyhow::Result<()> {
315        log_not_implemented(&cmd);
316        Ok(())
317    }
318
319    #[cfg(feature = "defi")]
320    /// Subscribes to pool flash loan events for a specified AMM pool.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the subscription operation fails.
325    fn subscribe_pool_flash_events(
326        &mut self,
327        cmd: &SubscribePoolFlashEvents,
328    ) -> anyhow::Result<()> {
329        log_not_implemented(&cmd);
330        Ok(())
331    }
332
333    /// Unsubscribes from custom data types according to the command.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if the unsubscribe operation fails.
338    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
339        log_not_implemented(&cmd);
340        Ok(())
341    }
342
343    /// Unsubscribes from instruments list for the specified venue.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the unsubscribe operation fails.
348    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
349        log_not_implemented(&cmd);
350        Ok(())
351    }
352
353    /// Unsubscribes from data for the specified instrument.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if the unsubscribe operation fails.
358    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
359        log_not_implemented(&cmd);
360        Ok(())
361    }
362
363    /// Unsubscribes from order book delta updates for the specified instrument.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if the unsubscribe operation fails.
368    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
369        log_not_implemented(&cmd);
370        Ok(())
371    }
372
373    /// Unsubscribes from top 10 order book depth updates for the specified instrument.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the unsubscribe operation fails.
378    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
379        log_not_implemented(&cmd);
380        Ok(())
381    }
382
383    /// Unsubscribes from periodic order book snapshots for the specified instrument.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if the unsubscribe operation fails.
388    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
389        log_not_implemented(&cmd);
390        Ok(())
391    }
392
393    /// Unsubscribes from quote updates for the specified instrument.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the unsubscribe operation fails.
398    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
399        log_not_implemented(&cmd);
400        Ok(())
401    }
402
403    /// Unsubscribes from trade updates for the specified instrument.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if the unsubscribe operation fails.
408    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
409        log_not_implemented(&cmd);
410        Ok(())
411    }
412
413    /// Unsubscribes from mark price updates for the specified instrument.
414    ///
415    /// # Errors
416    ///
417    /// Returns an error if the unsubscribe operation fails.
418    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
419        log_not_implemented(&cmd);
420        Ok(())
421    }
422
423    /// Unsubscribes from index price updates for the specified instrument.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the unsubscribe operation fails.
428    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
429        log_not_implemented(&cmd);
430        Ok(())
431    }
432
433    /// Unsubscribes from funding rate updates for the specified instrument.
434    ///
435    /// # Errors
436    ///
437    /// Returns an error if the unsubscribe operation fails.
438    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
439        log_not_implemented(&cmd);
440        Ok(())
441    }
442
443    /// Unsubscribes from bar updates of the specified bar type.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if the unsubscribe operation fails.
448    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
449        log_not_implemented(&cmd);
450        Ok(())
451    }
452
453    /// Unsubscribes from instrument status updates for the specified instrument.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if the unsubscribe operation fails.
458    fn unsubscribe_instrument_status(
459        &mut self,
460        cmd: &UnsubscribeInstrumentStatus,
461    ) -> anyhow::Result<()> {
462        log_not_implemented(&cmd);
463        Ok(())
464    }
465
466    /// Unsubscribes from instrument close events for the specified instrument.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the unsubscribe operation fails.
471    fn unsubscribe_instrument_close(
472        &mut self,
473        cmd: &UnsubscribeInstrumentClose,
474    ) -> anyhow::Result<()> {
475        log_not_implemented(&cmd);
476        Ok(())
477    }
478
479    #[cfg(feature = "defi")]
480    /// Unsubscribes from blocks for a specified blockchain.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the subscription operation fails.
485    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
486        log_not_implemented(&cmd);
487        Ok(())
488    }
489
490    #[cfg(feature = "defi")]
491    /// Unsubscribes from pool definition updates for a specified AMM pool.
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if the subscription operation fails.
496    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
497        log_not_implemented(&cmd);
498        Ok(())
499    }
500
501    #[cfg(feature = "defi")]
502    /// Unsubscribes from swaps for a specified AMM pool.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the subscription operation fails.
507    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
508        log_not_implemented(&cmd);
509        Ok(())
510    }
511
512    #[cfg(feature = "defi")]
513    /// Unsubscribes from pool liquidity updates for a specified AMM pool.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the subscription operation fails.
518    fn unsubscribe_pool_liquidity_updates(
519        &mut self,
520        cmd: &UnsubscribePoolLiquidityUpdates,
521    ) -> anyhow::Result<()> {
522        log_not_implemented(&cmd);
523        Ok(())
524    }
525
526    #[cfg(feature = "defi")]
527    /// Unsubscribes from pool fee collects for a specified AMM pool.
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if the subscription operation fails.
532    fn unsubscribe_pool_fee_collects(
533        &mut self,
534        cmd: &UnsubscribePoolFeeCollects,
535    ) -> anyhow::Result<()> {
536        log_not_implemented(&cmd);
537        Ok(())
538    }
539
540    #[cfg(feature = "defi")]
541    /// Unsubscribes from pool flash loan events for a specified AMM pool.
542    ///
543    /// # Errors
544    ///
545    /// Returns an error if the subscription operation fails.
546    fn unsubscribe_pool_flash_events(
547        &mut self,
548        cmd: &UnsubscribePoolFlashEvents,
549    ) -> anyhow::Result<()> {
550        log_not_implemented(&cmd);
551        Ok(())
552    }
553
554    /// Sends a custom data request to the provider.
555    ///
556    /// # Errors
557    ///
558    /// Returns an error if the data request fails.
559    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
560        log_not_implemented(&request);
561        Ok(())
562    }
563
564    /// Requests a list of instruments from the provider for a given venue.
565    ///
566    /// # Errors
567    ///
568    /// Returns an error if the instruments request fails.
569    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
570        log_not_implemented(&request);
571        Ok(())
572    }
573
574    /// Requests detailed data for a single instrument.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if the instrument request fails.
579    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
580        log_not_implemented(&request);
581        Ok(())
582    }
583
584    /// Requests a snapshot of the order book for a specified instrument.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the book snapshot request fails.
589    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
590        log_not_implemented(&request);
591        Ok(())
592    }
593
594    /// Requests historical or streaming quote data for a specified instrument.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if the quotes request fails.
599    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
600        log_not_implemented(&request);
601        Ok(())
602    }
603
604    /// Requests historical or streaming trade data for a specified instrument.
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if the trades request fails.
609    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
610        log_not_implemented(&request);
611        Ok(())
612    }
613
614    /// Requests historical or streaming bar data for a specified instrument and bar type.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if the bars request fails.
619    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
620        log_not_implemented(&request);
621        Ok(())
622    }
623
624    /// Requests historical order book depth data for a specified instrument.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the order book depths request fails.
629    fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
630        log_not_implemented(&request);
631        Ok(())
632    }
633
634    #[cfg(feature = "defi")]
635    /// Requests a snapshot of a specific AMM pool.
636    ///
637    /// # Errors
638    ///
639    /// Returns an error if the pool snapshot request fails.
640    fn request_pool_snapshot(&self, request: &RequestPoolSnapshot) -> anyhow::Result<()> {
641        log_not_implemented(&request);
642        Ok(())
643    }
644}
645
646/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
647pub struct DataClientAdapter {
648    pub(crate) client: Box<dyn DataClient>,
649    pub client_id: ClientId,
650    pub venue: Option<Venue>,
651    pub handles_book_deltas: bool,
652    pub handles_book_snapshots: bool,
653    pub subscriptions_custom: AHashSet<DataType>,
654    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
655    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
656    pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
657    pub subscriptions_quotes: AHashSet<InstrumentId>,
658    pub subscriptions_trades: AHashSet<InstrumentId>,
659    pub subscriptions_bars: AHashSet<BarType>,
660    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
661    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
662    pub subscriptions_instrument: AHashSet<InstrumentId>,
663    pub subscriptions_instrument_venue: AHashSet<Venue>,
664    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
665    pub subscriptions_index_prices: AHashSet<InstrumentId>,
666    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
667    #[cfg(feature = "defi")]
668    pub subscriptions_blocks: AHashSet<Blockchain>,
669    #[cfg(feature = "defi")]
670    pub subscriptions_pools: AHashSet<InstrumentId>,
671    #[cfg(feature = "defi")]
672    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
673    #[cfg(feature = "defi")]
674    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
675    #[cfg(feature = "defi")]
676    pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
677    #[cfg(feature = "defi")]
678    pub subscriptions_pool_flash: AHashSet<InstrumentId>,
679}
680
681impl Deref for DataClientAdapter {
682    type Target = Box<dyn DataClient>;
683
684    fn deref(&self) -> &Self::Target {
685        &self.client
686    }
687}
688
689impl DerefMut for DataClientAdapter {
690    fn deref_mut(&mut self) -> &mut Self::Target {
691        &mut self.client
692    }
693}
694
695impl Debug for DataClientAdapter {
696    #[rustfmt::skip]
697    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
698        f.debug_struct(stringify!(DataClientAdapter))
699            .field("client_id", &self.client_id)
700            .field("venue", &self.venue)
701            .field("handles_book_deltas", &self.handles_book_deltas)
702            .field("handles_book_snapshots", &self.handles_book_snapshots)
703            .field("subscriptions_custom", &self.subscriptions_custom)
704            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
705            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
706            .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
707            .field("subscriptions_quotes", &self.subscriptions_quotes)
708            .field("subscriptions_trades", &self.subscriptions_trades)
709            .field("subscriptions_bars", &self.subscriptions_bars)
710            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
711            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
712            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
713            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
714            .field("subscriptions_instrument", &self.subscriptions_instrument)
715            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
716            .finish()
717    }
718}
719
720impl DataClientAdapter {
721    /// Creates a new [`DataClientAdapter`] with the given client and clock.
722    #[must_use]
723    pub fn new(
724        client_id: ClientId,
725        venue: Option<Venue>,
726        handles_order_book_deltas: bool,
727        handles_order_book_snapshots: bool,
728        client: Box<dyn DataClient>,
729    ) -> Self {
730        Self {
731            client,
732            client_id,
733            venue,
734            handles_book_deltas: handles_order_book_deltas,
735            handles_book_snapshots: handles_order_book_snapshots,
736            subscriptions_custom: AHashSet::new(),
737            subscriptions_book_deltas: AHashSet::new(),
738            subscriptions_book_depth10: AHashSet::new(),
739            subscriptions_book_snapshots: AHashSet::new(),
740            subscriptions_quotes: AHashSet::new(),
741            subscriptions_trades: AHashSet::new(),
742            subscriptions_mark_prices: AHashSet::new(),
743            subscriptions_index_prices: AHashSet::new(),
744            subscriptions_funding_rates: AHashSet::new(),
745            subscriptions_bars: AHashSet::new(),
746            subscriptions_instrument_status: AHashSet::new(),
747            subscriptions_instrument_close: AHashSet::new(),
748            subscriptions_instrument: AHashSet::new(),
749            subscriptions_instrument_venue: AHashSet::new(),
750            #[cfg(feature = "defi")]
751            subscriptions_blocks: AHashSet::new(),
752            #[cfg(feature = "defi")]
753            subscriptions_pools: AHashSet::new(),
754            #[cfg(feature = "defi")]
755            subscriptions_pool_swaps: AHashSet::new(),
756            #[cfg(feature = "defi")]
757            subscriptions_pool_liquidity_updates: AHashSet::new(),
758            #[cfg(feature = "defi")]
759            subscriptions_pool_fee_collects: AHashSet::new(),
760            #[cfg(feature = "defi")]
761            subscriptions_pool_flash: AHashSet::new(),
762        }
763    }
764
765    #[allow(clippy::borrowed_box)]
766    #[must_use]
767    pub fn get_client(&self) -> &Box<dyn DataClient> {
768        &self.client
769    }
770
771    #[inline]
772    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
773        if let Err(e) = match cmd {
774            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
775            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
776            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
777            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
778            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
779            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
780            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
781            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
782            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
783            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
784            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
785            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
786            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
787            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
788        } {
789            log_command_error(&cmd, &e);
790        }
791    }
792
793    #[inline]
794    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
795        if let Err(e) = match cmd {
796            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
797            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
798            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
799            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
800            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
801            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
802            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
803            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
804            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
805            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
806            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
807            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
808            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
809            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
810        } {
811            log_command_error(&cmd, &e);
812        }
813    }
814
815    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
816
817    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the underlying client subscribe operation fails.
822    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
823        if !self.subscriptions_custom.contains(&cmd.data_type) {
824            self.subscriptions_custom.insert(cmd.data_type.clone());
825            self.client.subscribe(cmd)?;
826        }
827        Ok(())
828    }
829
830    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if the underlying client unsubscribe operation fails.
835    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
836        if self.subscriptions_custom.contains(&cmd.data_type) {
837            self.subscriptions_custom.remove(&cmd.data_type);
838            self.client.unsubscribe(cmd)?;
839        }
840        Ok(())
841    }
842
843    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
844    ///
845    /// # Errors
846    ///
847    /// Returns an error if the underlying client subscribe operation fails.
848    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
849        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
850            self.subscriptions_instrument_venue.insert(cmd.venue);
851            self.client.subscribe_instruments(cmd)?;
852        }
853
854        Ok(())
855    }
856
857    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
858    ///
859    /// # Errors
860    ///
861    /// Returns an error if the underlying client unsubscribe operation fails.
862    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
863        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
864            self.subscriptions_instrument_venue.remove(&cmd.venue);
865            self.client.unsubscribe_instruments(cmd)?;
866        }
867
868        Ok(())
869    }
870
871    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if the underlying client subscribe operation fails.
876    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
877        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
878            self.subscriptions_instrument.insert(cmd.instrument_id);
879            self.client.subscribe_instrument(cmd)?;
880        }
881
882        Ok(())
883    }
884
885    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if the underlying client unsubscribe operation fails.
890    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
891        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
892            self.subscriptions_instrument.remove(&cmd.instrument_id);
893            self.client.unsubscribe_instrument(cmd)?;
894        }
895
896        Ok(())
897    }
898
899    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
900    ///
901    /// # Errors
902    ///
903    /// Returns an error if the underlying client subscribe operation fails.
904    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
905        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
906            self.subscriptions_book_deltas.insert(cmd.instrument_id);
907            self.client.subscribe_book_deltas(cmd)?;
908        }
909
910        Ok(())
911    }
912
913    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
914    ///
915    /// # Errors
916    ///
917    /// Returns an error if the underlying client unsubscribe operation fails.
918    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
919        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
920            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
921            self.client.unsubscribe_book_deltas(cmd)?;
922        }
923
924        Ok(())
925    }
926
927    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
928    ///
929    /// # Errors
930    ///
931    /// Returns an error if the underlying client subscribe operation fails.
932    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
933        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
934            self.subscriptions_book_depth10.insert(cmd.instrument_id);
935            self.client.subscribe_book_depth10(cmd)?;
936        }
937
938        Ok(())
939    }
940
941    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
942    ///
943    /// # Errors
944    ///
945    /// Returns an error if the underlying client unsubscribe operation fails.
946    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
947        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
948            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
949            self.client.unsubscribe_book_depth10(cmd)?;
950        }
951
952        Ok(())
953    }
954
955    /// Subscribes to book snapshots for an instrument, updating internal state and forwarding to the client.
956    ///
957    /// # Errors
958    ///
959    /// Returns an error if the underlying client subscribe operation fails.
960    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
961        if !self
962            .subscriptions_book_snapshots
963            .contains(&cmd.instrument_id)
964        {
965            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
966            self.client.subscribe_book_snapshots(cmd)?;
967        }
968
969        Ok(())
970    }
971
972    /// Unsubscribes from book snapshots for an instrument, updating internal state and forwarding to the client.
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the underlying client unsubscribe operation fails.
977    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
978        if self
979            .subscriptions_book_snapshots
980            .contains(&cmd.instrument_id)
981        {
982            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
983            self.client.unsubscribe_book_snapshots(cmd)?;
984        }
985
986        Ok(())
987    }
988
989    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
990    ///
991    /// # Errors
992    ///
993    /// Returns an error if the underlying client subscribe operation fails.
994    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
995        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
996            self.subscriptions_quotes.insert(cmd.instrument_id);
997            self.client.subscribe_quotes(cmd)?;
998        }
999        Ok(())
1000    }
1001
1002    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
1003    ///
1004    /// # Errors
1005    ///
1006    /// Returns an error if the underlying client unsubscribe operation fails.
1007    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1008        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
1009            self.subscriptions_quotes.remove(&cmd.instrument_id);
1010            self.client.unsubscribe_quotes(cmd)?;
1011        }
1012        Ok(())
1013    }
1014
1015    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the underlying client subscribe operation fails.
1020    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1021        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
1022            self.subscriptions_trades.insert(cmd.instrument_id);
1023            self.client.subscribe_trades(cmd)?;
1024        }
1025        Ok(())
1026    }
1027
1028    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns an error if the underlying client unsubscribe operation fails.
1033    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1034        if self.subscriptions_trades.contains(&cmd.instrument_id) {
1035            self.subscriptions_trades.remove(&cmd.instrument_id);
1036            self.client.unsubscribe_trades(cmd)?;
1037        }
1038        Ok(())
1039    }
1040
1041    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
1042    ///
1043    /// # Errors
1044    ///
1045    /// Returns an error if the underlying client subscribe operation fails.
1046    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1047        if !self.subscriptions_bars.contains(&cmd.bar_type) {
1048            self.subscriptions_bars.insert(cmd.bar_type);
1049            self.client.subscribe_bars(cmd)?;
1050        }
1051        Ok(())
1052    }
1053
1054    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
1055    ///
1056    /// # Errors
1057    ///
1058    /// Returns an error if the underlying client unsubscribe operation fails.
1059    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1060        if self.subscriptions_bars.contains(&cmd.bar_type) {
1061            self.subscriptions_bars.remove(&cmd.bar_type);
1062            self.client.unsubscribe_bars(cmd)?;
1063        }
1064        Ok(())
1065    }
1066
1067    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
1068    ///
1069    /// # Errors
1070    ///
1071    /// Returns an error if the underlying client subscribe operation fails.
1072    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1073        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1074            self.subscriptions_mark_prices.insert(cmd.instrument_id);
1075            self.client.subscribe_mark_prices(cmd)?;
1076        }
1077        Ok(())
1078    }
1079
1080    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
1081    ///
1082    /// # Errors
1083    ///
1084    /// Returns an error if the underlying client unsubscribe operation fails.
1085    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1086        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1087            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1088            self.client.unsubscribe_mark_prices(cmd)?;
1089        }
1090        Ok(())
1091    }
1092
1093    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
1094    ///
1095    /// # Errors
1096    ///
1097    /// Returns an error if the underlying client subscribe operation fails.
1098    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1099        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1100            self.subscriptions_index_prices.insert(cmd.instrument_id);
1101            self.client.subscribe_index_prices(cmd)?;
1102        }
1103        Ok(())
1104    }
1105
1106    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
1107    ///
1108    /// # Errors
1109    ///
1110    /// Returns an error if the underlying client unsubscribe operation fails.
1111    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1112        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1113            self.subscriptions_index_prices.remove(&cmd.instrument_id);
1114            self.client.unsubscribe_index_prices(cmd)?;
1115        }
1116        Ok(())
1117    }
1118
1119    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
1120    ///
1121    /// # Errors
1122    ///
1123    /// Returns an error if the underlying client subscribe operation fails.
1124    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1125        if !self
1126            .subscriptions_funding_rates
1127            .contains(&cmd.instrument_id)
1128        {
1129            self.subscriptions_funding_rates.insert(cmd.instrument_id);
1130            self.client.subscribe_funding_rates(cmd)?;
1131        }
1132        Ok(())
1133    }
1134
1135    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
1136    ///
1137    /// # Errors
1138    ///
1139    /// Returns an error if the underlying client unsubscribe operation fails.
1140    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1141        if self
1142            .subscriptions_funding_rates
1143            .contains(&cmd.instrument_id)
1144        {
1145            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1146            self.client.unsubscribe_funding_rates(cmd)?;
1147        }
1148        Ok(())
1149    }
1150
1151    /// Subscribes to instrument status updates for the specified instrument.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns an error if the underlying client subscribe operation fails.
1156    fn subscribe_instrument_status(
1157        &mut self,
1158        cmd: &SubscribeInstrumentStatus,
1159    ) -> anyhow::Result<()> {
1160        if !self
1161            .subscriptions_instrument_status
1162            .contains(&cmd.instrument_id)
1163        {
1164            self.subscriptions_instrument_status
1165                .insert(cmd.instrument_id);
1166            self.client.subscribe_instrument_status(cmd)?;
1167        }
1168        Ok(())
1169    }
1170
1171    /// Unsubscribes from instrument status updates for the specified instrument.
1172    ///
1173    /// # Errors
1174    ///
1175    /// Returns an error if the underlying client unsubscribe operation fails.
1176    fn unsubscribe_instrument_status(
1177        &mut self,
1178        cmd: &UnsubscribeInstrumentStatus,
1179    ) -> anyhow::Result<()> {
1180        if self
1181            .subscriptions_instrument_status
1182            .contains(&cmd.instrument_id)
1183        {
1184            self.subscriptions_instrument_status
1185                .remove(&cmd.instrument_id);
1186            self.client.unsubscribe_instrument_status(cmd)?;
1187        }
1188        Ok(())
1189    }
1190
1191    /// Subscribes to instrument close events for the specified instrument.
1192    ///
1193    /// # Errors
1194    ///
1195    /// Returns an error if the underlying client subscribe operation fails.
1196    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1197        if !self
1198            .subscriptions_instrument_close
1199            .contains(&cmd.instrument_id)
1200        {
1201            self.subscriptions_instrument_close
1202                .insert(cmd.instrument_id);
1203            self.client.subscribe_instrument_close(cmd)?;
1204        }
1205        Ok(())
1206    }
1207
1208    /// Unsubscribes from instrument close events for the specified instrument.
1209    ///
1210    /// # Errors
1211    ///
1212    /// Returns an error if the underlying client unsubscribe operation fails.
1213    fn unsubscribe_instrument_close(
1214        &mut self,
1215        cmd: &UnsubscribeInstrumentClose,
1216    ) -> anyhow::Result<()> {
1217        if self
1218            .subscriptions_instrument_close
1219            .contains(&cmd.instrument_id)
1220        {
1221            self.subscriptions_instrument_close
1222                .remove(&cmd.instrument_id);
1223            self.client.unsubscribe_instrument_close(cmd)?;
1224        }
1225        Ok(())
1226    }
1227
1228    // -- REQUEST HANDLERS ------------------------------------------------------------------------
1229
1230    /// Sends a data request to the underlying client.
1231    ///
1232    /// # Errors
1233    ///
1234    /// Returns an error if the client request fails.
1235    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1236        self.client.request_data(req)
1237    }
1238
1239    /// Sends a single instrument request to the client.
1240    ///
1241    /// # Errors
1242    ///
1243    /// Returns an error if the client fails to process the request.
1244    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1245        self.client.request_instrument(req)
1246    }
1247
1248    /// Sends a batch instruments request to the client.
1249    ///
1250    /// # Errors
1251    ///
1252    /// Returns an error if the client fails to process the request.
1253    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1254        self.client.request_instruments(req)
1255    }
1256
1257    /// Sends a quotes request for a given instrument.
1258    ///
1259    /// # Errors
1260    ///
1261    /// Returns an error if the client fails to process the quotes request.
1262    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1263        self.client.request_quotes(req)
1264    }
1265
1266    /// Sends a trades request for a given instrument.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if the client fails to process the trades request.
1271    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1272        self.client.request_trades(req)
1273    }
1274
1275    /// Sends a bars request for a given instrument and bar type.
1276    ///
1277    /// # Errors
1278    ///
1279    /// Returns an error if the client fails to process the bars request.
1280    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1281        self.client.request_bars(req)
1282    }
1283
1284    /// Sends an order book depths request for a given instrument.
1285    ///
1286    /// # Errors
1287    ///
1288    /// Returns an error if the client fails to process the order book depths request.
1289    pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1290        self.client.request_book_depth(req)
1291    }
1292}
1293
1294#[inline(always)]
1295fn log_not_implemented<T: Debug>(msg: &T) {
1296    log::warn!("{msg:?} – handler not implemented");
1297}
1298
1299#[inline(always)]
1300fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1301    log::error!("Error on {cmd:?}: {e}");
1302}