nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Defines the `DataClient` trait, the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    any::Any,
23    fmt::{Debug, Display},
24    ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28#[cfg(feature = "defi")]
29use alloy_primitives::Address;
30use nautilus_common::messages::data::{
31    RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
32    RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
33    SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeIndexPrices,
34    SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
35    SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
36    UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
37    UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
38    UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
39    UnsubscribeTrades,
40};
41#[cfg(feature = "defi")]
42use nautilus_common::messages::defi::{
43    DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
44    SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
45    UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50    data::{BarType, DataType},
51    identifiers::{ClientId, InstrumentId, Venue},
52};
53
54/// Defines the interface for a data client, managing connections, subscriptions, and requests.
55#[async_trait::async_trait]
56pub trait DataClient: Any + Sync + Send {
57    /// Returns the unique identifier for this data client.
58    fn client_id(&self) -> ClientId;
59
60    /// Returns the optional venue this client is associated with.
61    fn venue(&self) -> Option<Venue>;
62
63    /// Starts the data client.
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if the operation fails.
68    fn start(&mut self) -> anyhow::Result<()>;
69
70    /// Stops the data client.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the operation fails.
75    fn stop(&mut self) -> anyhow::Result<()>;
76
77    /// Resets the data client to its initial state.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if the operation fails.
82    fn reset(&mut self) -> anyhow::Result<()>;
83
84    /// Disposes of client resources and cleans up.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the operation fails.
89    fn dispose(&mut self) -> anyhow::Result<()>;
90
91    /// Connects external API's if needed.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the operation fails.
96    async fn connect(&mut self) -> anyhow::Result<()>;
97
98    /// Disconnects external API's if needed.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if the operation fails.
103    async fn disconnect(&mut self) -> anyhow::Result<()>;
104
105    /// Returns `true` if the client is currently connected.
106    fn is_connected(&self) -> bool;
107
108    /// Returns `true` if the client is currently disconnected.
109    fn is_disconnected(&self) -> bool;
110
111    /// Subscribes to custom data types according to the command.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the subscribe operation fails.
116    fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
117        log_not_implemented(&cmd);
118        Ok(())
119    }
120
121    /// Subscribes to instruments list for the specified venue.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the subscribe operation fails.
126    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
127        log_not_implemented(&cmd);
128        Ok(())
129    }
130
131    /// Subscribes to data for a single instrument.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the subscribe operation fails.
136    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
137        log_not_implemented(&cmd);
138        Ok(())
139    }
140
141    /// Subscribes to order book delta updates for the specified instrument.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the subscribe operation fails.
146    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
147        log_not_implemented(&cmd);
148        Ok(())
149    }
150
151    /// Subscribes to top 10 order book depth updates for the specified instrument.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the subscribe operation fails.
156    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
157        log_not_implemented(&cmd);
158        Ok(())
159    }
160
161    /// Subscribes to periodic order book snapshots for the specified instrument.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the subscribe operation fails.
166    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
167        log_not_implemented(&cmd);
168        Ok(())
169    }
170
171    /// Subscribes to quote updates for the specified instrument.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the subscribe operation fails.
176    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
177        log_not_implemented(&cmd);
178        Ok(())
179    }
180
181    /// Subscribes to trade updates for the specified instrument.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if the subscribe operation fails.
186    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
187        log_not_implemented(&cmd);
188        Ok(())
189    }
190
191    /// Subscribes to mark price updates for the specified instrument.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if the subscribe operation fails.
196    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
197        log_not_implemented(&cmd);
198        Ok(())
199    }
200
201    /// Subscribes to index price updates for the specified instrument.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the subscribe operation fails.
206    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
207        log_not_implemented(&cmd);
208        Ok(())
209    }
210
211    /// Subscribes to bar updates of the specified bar type.
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if the subscribe operation fails.
216    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
217        log_not_implemented(&cmd);
218        Ok(())
219    }
220
221    /// Subscribes to status updates for the specified instrument.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the subscribe operation fails.
226    fn subscribe_instrument_status(
227        &mut self,
228        cmd: &SubscribeInstrumentStatus,
229    ) -> anyhow::Result<()> {
230        log_not_implemented(&cmd);
231        Ok(())
232    }
233
234    /// Subscribes to instrument close events for the specified instrument.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the subscription operation fails.
239    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
240        log_not_implemented(&cmd);
241        Ok(())
242    }
243
244    #[cfg(feature = "defi")]
245    /// Subscribes to blocks for a specified blockchain.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if the subscription operation fails.
250    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
251        log_not_implemented(&cmd);
252        Ok(())
253    }
254
255    #[cfg(feature = "defi")]
256    /// Subscribes to pool definition updates for a specified AMM pool.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the subscription operation fails.
261    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
262        log_not_implemented(&cmd);
263        Ok(())
264    }
265
266    #[cfg(feature = "defi")]
267    /// Subscribes to pool swaps for a specified AMM pool.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if the subscription operation fails.
272    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
273        log_not_implemented(&cmd);
274        Ok(())
275    }
276
277    #[cfg(feature = "defi")]
278    /// Subscribes to pool liquidity updates for a specified AMM pool.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the subscription operation fails.
283    fn subscribe_pool_liquidity_updates(
284        &mut self,
285        cmd: &SubscribePoolLiquidityUpdates,
286    ) -> anyhow::Result<()> {
287        log_not_implemented(&cmd);
288        Ok(())
289    }
290
291    /// Unsubscribes from custom data types according to the command.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the unsubscribe operation fails.
296    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
297        log_not_implemented(&cmd);
298        Ok(())
299    }
300
301    /// Unsubscribes from instruments list for the specified venue.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the unsubscribe operation fails.
306    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
307        log_not_implemented(&cmd);
308        Ok(())
309    }
310
311    /// Unsubscribes from data for the specified instrument.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the unsubscribe operation fails.
316    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
317        log_not_implemented(&cmd);
318        Ok(())
319    }
320
321    /// Unsubscribes from order book delta updates for the specified instrument.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the unsubscribe operation fails.
326    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
327        log_not_implemented(&cmd);
328        Ok(())
329    }
330
331    /// Unsubscribes from top 10 order book depth updates for the specified instrument.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the unsubscribe operation fails.
336    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
337        log_not_implemented(&cmd);
338        Ok(())
339    }
340
341    /// Unsubscribes from periodic order book snapshots for the specified instrument.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the unsubscribe operation fails.
346    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
347        log_not_implemented(&cmd);
348        Ok(())
349    }
350
351    /// Unsubscribes from quote updates for the specified instrument.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if the unsubscribe operation fails.
356    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
357        log_not_implemented(&cmd);
358        Ok(())
359    }
360
361    /// Unsubscribes from trade updates for the specified instrument.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if the unsubscribe operation fails.
366    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
367        log_not_implemented(&cmd);
368        Ok(())
369    }
370
371    /// Unsubscribes from mark price updates for the specified instrument.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if the unsubscribe operation fails.
376    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
377        log_not_implemented(&cmd);
378        Ok(())
379    }
380
381    /// Unsubscribes from index price updates for the specified instrument.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the unsubscribe operation fails.
386    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
387        log_not_implemented(&cmd);
388        Ok(())
389    }
390
391    /// Unsubscribes from bar updates of the specified bar type.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the unsubscribe operation fails.
396    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
397        log_not_implemented(&cmd);
398        Ok(())
399    }
400
401    /// Unsubscribes from instrument status updates for the specified instrument.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the unsubscribe operation fails.
406    fn unsubscribe_instrument_status(
407        &mut self,
408        cmd: &UnsubscribeInstrumentStatus,
409    ) -> anyhow::Result<()> {
410        log_not_implemented(&cmd);
411        Ok(())
412    }
413
414    /// Unsubscribes from instrument close events for the specified instrument.
415    ///
416    /// # Errors
417    ///
418    /// Returns an error if the unsubscribe operation fails.
419    fn unsubscribe_instrument_close(
420        &mut self,
421        cmd: &UnsubscribeInstrumentClose,
422    ) -> anyhow::Result<()> {
423        log_not_implemented(&cmd);
424        Ok(())
425    }
426
427    #[cfg(feature = "defi")]
428    /// Unsubscribes from blocks for a specified blockchain.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the subscription operation fails.
433    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
434        log_not_implemented(&cmd);
435        Ok(())
436    }
437
438    #[cfg(feature = "defi")]
439    /// Unsubscribes from pool definition updates for a specified AMM pool.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if the subscription operation fails.
444    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
445        log_not_implemented(&cmd);
446        Ok(())
447    }
448
449    #[cfg(feature = "defi")]
450    /// Unsubscribes from swaps for a specified AMM pool.
451    ///
452    /// # Errors
453    ///
454    /// Returns an error if the subscription operation fails.
455    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
456        log_not_implemented(&cmd);
457        Ok(())
458    }
459
460    #[cfg(feature = "defi")]
461    /// Unsubscribes from pool liquidity updates for a specified AMM pool.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the subscription operation fails.
466    fn unsubscribe_pool_liquidity_updates(
467        &mut self,
468        cmd: &UnsubscribePoolLiquidityUpdates,
469    ) -> anyhow::Result<()> {
470        log_not_implemented(&cmd);
471        Ok(())
472    }
473
474    /// Sends a custom data request to the provider.
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if the data request fails.
479    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
480        log_not_implemented(&request);
481        Ok(())
482    }
483
484    /// Requests a list of instruments from the provider for a given venue.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if the instruments request fails.
489    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
490        log_not_implemented(&request);
491        Ok(())
492    }
493
494    /// Requests detailed data for a single instrument.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if the instrument request fails.
499    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
500        log_not_implemented(&request);
501        Ok(())
502    }
503
504    /// Requests a snapshot of the order book for a specified instrument.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if the book snapshot request fails.
509    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
510        log_not_implemented(&request);
511        Ok(())
512    }
513
514    /// Requests historical or streaming quote data for a specified instrument.
515    ///
516    /// # Errors
517    ///
518    /// Returns an error if the quotes request fails.
519    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
520        log_not_implemented(&request);
521        Ok(())
522    }
523
524    /// Requests historical or streaming trade data for a specified instrument.
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if the trades request fails.
529    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
530        log_not_implemented(&request);
531        Ok(())
532    }
533
534    /// Requests historical or streaming bar data for a specified instrument and bar type.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if the bars request fails.
539    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
540        log_not_implemented(&request);
541        Ok(())
542    }
543}
544
545/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
546pub struct DataClientAdapter {
547    client: Box<dyn DataClient>,
548    pub client_id: ClientId,
549    pub venue: Option<Venue>,
550    pub handles_book_deltas: bool,
551    pub handles_book_snapshots: bool,
552    pub subscriptions_custom: AHashSet<DataType>,
553    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
554    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
555    pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
556    pub subscriptions_quotes: AHashSet<InstrumentId>,
557    pub subscriptions_trades: AHashSet<InstrumentId>,
558    pub subscriptions_bars: AHashSet<BarType>,
559    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
560    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
561    pub subscriptions_instrument: AHashSet<InstrumentId>,
562    pub subscriptions_instrument_venue: AHashSet<Venue>,
563    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
564    pub subscriptions_index_prices: AHashSet<InstrumentId>,
565    #[cfg(feature = "defi")]
566    pub subscriptions_blocks: AHashSet<Blockchain>,
567    #[cfg(feature = "defi")]
568    pub subscriptions_pools: AHashSet<Address>,
569    #[cfg(feature = "defi")]
570    pub subscriptions_pool_swaps: AHashSet<Address>,
571    #[cfg(feature = "defi")]
572    pub subscriptions_pool_liquidity_updates: AHashSet<Address>,
573}
574
575impl Deref for DataClientAdapter {
576    type Target = Box<dyn DataClient>;
577
578    fn deref(&self) -> &Self::Target {
579        &self.client
580    }
581}
582
583impl DerefMut for DataClientAdapter {
584    fn deref_mut(&mut self) -> &mut Self::Target {
585        &mut self.client
586    }
587}
588
589impl Debug for DataClientAdapter {
590    #[rustfmt::skip]
591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592        f.debug_struct(stringify!(DataClientAdapter))
593            .field("client_id", &self.client_id)
594            .field("venue", &self.venue)
595            .field("handles_book_deltas", &self.handles_book_deltas)
596            .field("handles_book_snapshots", &self.handles_book_snapshots)
597            .field("subscriptions_custom", &self.subscriptions_custom)
598            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
599            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
600            .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
601            .field("subscriptions_quotes", &self.subscriptions_quotes)
602            .field("subscriptions_trades", &self.subscriptions_trades)
603            .field("subscriptions_bars", &self.subscriptions_bars)
604            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
605            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
606            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
607            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
608            .field("subscriptions_instrument", &self.subscriptions_instrument)
609            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
610            .finish()
611    }
612}
613
614impl DataClientAdapter {
615    /// Creates a new [`DataClientAdapter`] with the given client and clock.
616    #[must_use]
617    pub fn new(
618        client_id: ClientId,
619        venue: Option<Venue>,
620        handles_order_book_deltas: bool,
621        handles_order_book_snapshots: bool,
622        client: Box<dyn DataClient>,
623    ) -> Self {
624        Self {
625            client,
626            client_id,
627            venue,
628            handles_book_deltas: handles_order_book_deltas,
629            handles_book_snapshots: handles_order_book_snapshots,
630            subscriptions_custom: AHashSet::new(),
631            subscriptions_book_deltas: AHashSet::new(),
632            subscriptions_book_depth10: AHashSet::new(),
633            subscriptions_book_snapshots: AHashSet::new(),
634            subscriptions_quotes: AHashSet::new(),
635            subscriptions_trades: AHashSet::new(),
636            subscriptions_mark_prices: AHashSet::new(),
637            subscriptions_index_prices: AHashSet::new(),
638            subscriptions_bars: AHashSet::new(),
639            subscriptions_instrument_status: AHashSet::new(),
640            subscriptions_instrument_close: AHashSet::new(),
641            subscriptions_instrument: AHashSet::new(),
642            subscriptions_instrument_venue: AHashSet::new(),
643            #[cfg(feature = "defi")]
644            subscriptions_blocks: AHashSet::new(),
645            #[cfg(feature = "defi")]
646            subscriptions_pools: AHashSet::new(),
647            #[cfg(feature = "defi")]
648            subscriptions_pool_swaps: AHashSet::new(),
649            #[cfg(feature = "defi")]
650            subscriptions_pool_liquidity_updates: AHashSet::new(),
651        }
652    }
653
654    #[allow(clippy::borrowed_box)]
655    #[must_use]
656    pub fn get_client(&self) -> &Box<dyn DataClient> {
657        &self.client
658    }
659
660    #[inline]
661    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
662        if let Err(e) = match cmd {
663            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
664            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
665            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
666            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
667            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
668            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
669            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
670            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
671            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
672            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
673            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
674            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
675            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
676        } {
677            log_command_error(&cmd, &e);
678        }
679    }
680
681    #[cfg(feature = "defi")]
682    #[inline]
683    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
684        if let Err(e) = match cmd {
685            DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
686            DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
687            DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_swaps(cmd),
688            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
689                self.subscribe_pool_liquidity_updates(cmd)
690            }
691        } {
692            log_command_error(&cmd, &e);
693        }
694    }
695
696    #[inline]
697    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
698        if let Err(e) = match cmd {
699            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
700            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
701            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
702            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
703            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
704            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
705            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
706            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
707            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
708            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
709            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
710            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
711            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
712        } {
713            log_command_error(&cmd, &e);
714        }
715    }
716
717    #[cfg(feature = "defi")]
718    #[inline]
719    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
720        if let Err(e) = match cmd {
721            DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
722            DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
723            DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_swaps(cmd),
724            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
725                self.unsubscribe_pool_liquidity_updates(cmd)
726            }
727        } {
728            log_command_error(&cmd, &e);
729        }
730    }
731
732    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
733
734    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the underlying client subscribe operation fails.
739    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
740        if !self.subscriptions_custom.contains(&cmd.data_type) {
741            self.subscriptions_custom.insert(cmd.data_type.clone());
742            self.client.subscribe(cmd)?;
743        }
744        Ok(())
745    }
746
747    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if the underlying client unsubscribe operation fails.
752    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
753        if self.subscriptions_custom.contains(&cmd.data_type) {
754            self.subscriptions_custom.remove(&cmd.data_type);
755            self.client.unsubscribe(cmd)?;
756        }
757        Ok(())
758    }
759
760    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
761    ///
762    /// # Errors
763    ///
764    /// Returns an error if the underlying client subscribe operation fails.
765    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
766        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
767            self.subscriptions_instrument_venue.insert(cmd.venue);
768            self.client.subscribe_instruments(cmd)?;
769        }
770
771        Ok(())
772    }
773
774    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
775    ///
776    /// # Errors
777    ///
778    /// Returns an error if the underlying client unsubscribe operation fails.
779    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
780        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
781            self.subscriptions_instrument_venue.remove(&cmd.venue);
782            self.client.unsubscribe_instruments(cmd)?;
783        }
784
785        Ok(())
786    }
787
788    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
789    ///
790    /// # Errors
791    ///
792    /// Returns an error if the underlying client subscribe operation fails.
793    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
794        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
795            self.subscriptions_instrument.insert(cmd.instrument_id);
796            self.client.subscribe_instrument(cmd)?;
797        }
798
799        Ok(())
800    }
801
802    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the underlying client unsubscribe operation fails.
807    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
808        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
809            self.subscriptions_instrument.remove(&cmd.instrument_id);
810            self.client.unsubscribe_instrument(cmd)?;
811        }
812
813        Ok(())
814    }
815
816    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the underlying client subscribe operation fails.
821    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
822        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
823            self.subscriptions_book_deltas.insert(cmd.instrument_id);
824            self.client.subscribe_book_deltas(cmd)?;
825        }
826
827        Ok(())
828    }
829
830    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if the underlying client unsubscribe operation fails.
835    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
836        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
837            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
838            self.client.unsubscribe_book_deltas(cmd)?;
839        }
840
841        Ok(())
842    }
843
844    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
845    ///
846    /// # Errors
847    ///
848    /// Returns an error if the underlying client subscribe operation fails.
849    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
850        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
851            self.subscriptions_book_depth10.insert(cmd.instrument_id);
852            self.client.subscribe_book_depth10(cmd)?;
853        }
854
855        Ok(())
856    }
857
858    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
859    ///
860    /// # Errors
861    ///
862    /// Returns an error if the underlying client unsubscribe operation fails.
863    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
864        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
865            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
866            self.client.unsubscribe_book_depth10(cmd)?;
867        }
868
869        Ok(())
870    }
871
872    /// Subscribes to book snapshots for an instrument, updating internal state and forwarding to the client.
873    ///
874    /// # Errors
875    ///
876    /// Returns an error if the underlying client subscribe operation fails.
877    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
878        if !self
879            .subscriptions_book_snapshots
880            .contains(&cmd.instrument_id)
881        {
882            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
883            self.client.subscribe_book_snapshots(cmd)?;
884        }
885
886        Ok(())
887    }
888
889    /// Unsubscribes from book snapshots for an instrument, updating internal state and forwarding to the client.
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if the underlying client unsubscribe operation fails.
894    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
895        if self
896            .subscriptions_book_snapshots
897            .contains(&cmd.instrument_id)
898        {
899            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
900            self.client.unsubscribe_book_snapshots(cmd)?;
901        }
902
903        Ok(())
904    }
905
906    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if the underlying client subscribe operation fails.
911    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
912        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
913            self.subscriptions_quotes.insert(cmd.instrument_id);
914            self.client.subscribe_quotes(cmd)?;
915        }
916        Ok(())
917    }
918
919    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
920    ///
921    /// # Errors
922    ///
923    /// Returns an error if the underlying client unsubscribe operation fails.
924    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
925        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
926            self.subscriptions_quotes.remove(&cmd.instrument_id);
927            self.client.unsubscribe_quotes(cmd)?;
928        }
929        Ok(())
930    }
931
932    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
933    ///
934    /// # Errors
935    ///
936    /// Returns an error if the underlying client subscribe operation fails.
937    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
938        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
939            self.subscriptions_trades.insert(cmd.instrument_id);
940            self.client.subscribe_trades(cmd)?;
941        }
942        Ok(())
943    }
944
945    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
946    ///
947    /// # Errors
948    ///
949    /// Returns an error if the underlying client unsubscribe operation fails.
950    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
951        if self.subscriptions_trades.contains(&cmd.instrument_id) {
952            self.subscriptions_trades.remove(&cmd.instrument_id);
953            self.client.unsubscribe_trades(cmd)?;
954        }
955        Ok(())
956    }
957
958    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
959    ///
960    /// # Errors
961    ///
962    /// Returns an error if the underlying client subscribe operation fails.
963    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
964        if !self.subscriptions_bars.contains(&cmd.bar_type) {
965            self.subscriptions_bars.insert(cmd.bar_type);
966            self.client.subscribe_bars(cmd)?;
967        }
968        Ok(())
969    }
970
971    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the underlying client unsubscribe operation fails.
976    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
977        if self.subscriptions_bars.contains(&cmd.bar_type) {
978            self.subscriptions_bars.remove(&cmd.bar_type);
979            self.client.unsubscribe_bars(cmd)?;
980        }
981        Ok(())
982    }
983
984    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
985    ///
986    /// # Errors
987    ///
988    /// Returns an error if the underlying client subscribe operation fails.
989    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
990        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
991            self.subscriptions_mark_prices.insert(cmd.instrument_id);
992            self.client.subscribe_mark_prices(cmd)?;
993        }
994        Ok(())
995    }
996
997    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the underlying client unsubscribe operation fails.
1002    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1003        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1004            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1005            self.client.unsubscribe_mark_prices(cmd)?;
1006        }
1007        Ok(())
1008    }
1009
1010    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
1011    ///
1012    /// # Errors
1013    ///
1014    /// Returns an error if the underlying client subscribe operation fails.
1015    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1016        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1017            self.subscriptions_index_prices.insert(cmd.instrument_id);
1018            self.client.subscribe_index_prices(cmd)?;
1019        }
1020        Ok(())
1021    }
1022
1023    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
1024    ///
1025    /// # Errors
1026    ///
1027    /// Returns an error if the underlying client unsubscribe operation fails.
1028    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1029        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1030            self.subscriptions_index_prices.remove(&cmd.instrument_id);
1031            self.client.unsubscribe_index_prices(cmd)?;
1032        }
1033        Ok(())
1034    }
1035
1036    /// Subscribes to instrument status updates for the specified instrument.
1037    ///
1038    /// # Errors
1039    ///
1040    /// Returns an error if the underlying client subscribe operation fails.
1041    fn subscribe_instrument_status(
1042        &mut self,
1043        cmd: &SubscribeInstrumentStatus,
1044    ) -> anyhow::Result<()> {
1045        if !self
1046            .subscriptions_instrument_status
1047            .contains(&cmd.instrument_id)
1048        {
1049            self.subscriptions_instrument_status
1050                .insert(cmd.instrument_id);
1051            self.client.subscribe_instrument_status(cmd)?;
1052        }
1053        Ok(())
1054    }
1055
1056    /// Unsubscribes from instrument status updates for the specified instrument.
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns an error if the underlying client unsubscribe operation fails.
1061    fn unsubscribe_instrument_status(
1062        &mut self,
1063        cmd: &UnsubscribeInstrumentStatus,
1064    ) -> anyhow::Result<()> {
1065        if self
1066            .subscriptions_instrument_status
1067            .contains(&cmd.instrument_id)
1068        {
1069            self.subscriptions_instrument_status
1070                .remove(&cmd.instrument_id);
1071            self.client.unsubscribe_instrument_status(cmd)?;
1072        }
1073        Ok(())
1074    }
1075
1076    /// Subscribes to instrument close events for the specified instrument.
1077    ///
1078    /// # Errors
1079    ///
1080    /// Returns an error if the underlying client subscribe operation fails.
1081    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1082        if !self
1083            .subscriptions_instrument_close
1084            .contains(&cmd.instrument_id)
1085        {
1086            self.subscriptions_instrument_close
1087                .insert(cmd.instrument_id);
1088            self.client.subscribe_instrument_close(cmd)?;
1089        }
1090        Ok(())
1091    }
1092
1093    /// Unsubscribes from instrument close events for the specified instrument.
1094    ///
1095    /// # Errors
1096    ///
1097    /// Returns an error if the underlying client unsubscribe operation fails.
1098    fn unsubscribe_instrument_close(
1099        &mut self,
1100        cmd: &UnsubscribeInstrumentClose,
1101    ) -> anyhow::Result<()> {
1102        if self
1103            .subscriptions_instrument_close
1104            .contains(&cmd.instrument_id)
1105        {
1106            self.subscriptions_instrument_close
1107                .remove(&cmd.instrument_id);
1108            self.client.unsubscribe_instrument_close(cmd)?;
1109        }
1110        Ok(())
1111    }
1112
1113    #[cfg(feature = "defi")]
1114    /// Subscribes to block events for the specified blockchain.
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns an error if the underlying client subscribe operation fails.
1119    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
1120        if !self.subscriptions_blocks.contains(&cmd.chain) {
1121            self.subscriptions_blocks.insert(cmd.chain);
1122            self.client.subscribe_blocks(cmd)?;
1123        }
1124        Ok(())
1125    }
1126
1127    #[cfg(feature = "defi")]
1128    /// Unsubscribes from block events for the specified blockchain.
1129    ///
1130    /// # Errors
1131    ///
1132    /// Returns an error if the underlying client unsubscribe operation fails.
1133    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
1134        if self.subscriptions_blocks.contains(&cmd.chain) {
1135            self.subscriptions_blocks.remove(&cmd.chain);
1136            self.client.unsubscribe_blocks(cmd)?;
1137        }
1138        Ok(())
1139    }
1140
1141    #[cfg(feature = "defi")]
1142    /// Subscribes to pool definition updates for the specified AMM pool.
1143    ///
1144    /// # Errors
1145    ///
1146    /// Returns an error if the underlying client subscribe operation fails.
1147    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
1148        if !self.subscriptions_pools.contains(&cmd.address) {
1149            self.subscriptions_pools.insert(cmd.address);
1150            self.client.subscribe_pool(cmd)?;
1151        }
1152        Ok(())
1153    }
1154
1155    #[cfg(feature = "defi")]
1156    /// Subscribes to pool swap events for the specified AMM pool.
1157    ///
1158    /// # Errors
1159    ///
1160    /// Returns an error if the underlying client subscribe operation fails.
1161    fn subscribe_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
1162        if !self.subscriptions_pool_swaps.contains(&cmd.address) {
1163            self.subscriptions_pool_swaps.insert(cmd.address);
1164            self.client.subscribe_pool_swaps(cmd)?;
1165        }
1166        Ok(())
1167    }
1168
1169    #[cfg(feature = "defi")]
1170    /// Subscribes to pool liquidity update events for the specified AMM pool.
1171    ///
1172    /// # Errors
1173    ///
1174    /// Returns an error if the underlying client subscribe operation fails.
1175    fn subscribe_pool_liquidity_updates(
1176        &mut self,
1177        cmd: &SubscribePoolLiquidityUpdates,
1178    ) -> anyhow::Result<()> {
1179        if !self
1180            .subscriptions_pool_liquidity_updates
1181            .contains(&cmd.address)
1182        {
1183            self.subscriptions_pool_liquidity_updates
1184                .insert(cmd.address);
1185            self.client.subscribe_pool_liquidity_updates(cmd)?;
1186        }
1187        Ok(())
1188    }
1189
1190    #[cfg(feature = "defi")]
1191    /// Unsubscribes from pool definition updates for the specified AMM pool.
1192    ///
1193    /// # Errors
1194    ///
1195    /// Returns an error if the underlying client unsubscribe operation fails.
1196    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
1197        if self.subscriptions_pools.contains(&cmd.address) {
1198            self.subscriptions_pools.remove(&cmd.address);
1199            self.client.unsubscribe_pool(cmd)?;
1200        }
1201        Ok(())
1202    }
1203
1204    #[cfg(feature = "defi")]
1205    /// Unsubscribes from swap events for the specified AMM pool.
1206    ///
1207    /// # Errors
1208    ///
1209    /// Returns an error if the underlying client unsubscribe operation fails.
1210    fn unsubscribe_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
1211        if self.subscriptions_pool_swaps.contains(&cmd.address) {
1212            self.subscriptions_pool_swaps.remove(&cmd.address);
1213            self.client.unsubscribe_pool_swaps(cmd)?;
1214        }
1215        Ok(())
1216    }
1217
1218    #[cfg(feature = "defi")]
1219    /// Unsubscribes from pool liquidity update events for the specified AMM pool.
1220    ///
1221    /// # Errors
1222    ///
1223    /// Returns an error if the underlying client unsubscribe operation fails.
1224    fn unsubscribe_pool_liquidity_updates(
1225        &mut self,
1226        cmd: &UnsubscribePoolLiquidityUpdates,
1227    ) -> anyhow::Result<()> {
1228        if self
1229            .subscriptions_pool_liquidity_updates
1230            .contains(&cmd.address)
1231        {
1232            self.subscriptions_pool_liquidity_updates
1233                .remove(&cmd.address);
1234            self.client.unsubscribe_pool_liquidity_updates(cmd)?;
1235        }
1236        Ok(())
1237    }
1238
1239    // -- REQUEST HANDLERS ------------------------------------------------------------------------
1240
1241    /// Sends a data request to the underlying client.
1242    ///
1243    /// # Errors
1244    ///
1245    /// Returns an error if the client request fails.
1246    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1247        self.client.request_data(req)
1248    }
1249
1250    /// Sends a single instrument request to the client.
1251    ///
1252    /// # Errors
1253    ///
1254    /// Returns an error if the client fails to process the request.
1255    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1256        self.client.request_instrument(req)
1257    }
1258
1259    /// Sends a batch instruments request to the client.
1260    ///
1261    /// # Errors
1262    ///
1263    /// Returns an error if the client fails to process the request.
1264    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1265        self.client.request_instruments(req)
1266    }
1267
1268    /// Sends a quotes request for a given instrument.
1269    ///
1270    /// # Errors
1271    ///
1272    /// Returns an error if the client fails to process the quotes request.
1273    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1274        self.client.request_quotes(req)
1275    }
1276
1277    /// Sends a trades request for a given instrument.
1278    ///
1279    /// # Errors
1280    ///
1281    /// Returns an error if the client fails to process the trades request.
1282    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1283        self.client.request_trades(req)
1284    }
1285
1286    /// Sends a bars request for a given instrument and bar type.
1287    ///
1288    /// # Errors
1289    ///
1290    /// Returns an error if the client fails to process the bars request.
1291    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1292        self.client.request_bars(req)
1293    }
1294}
1295
1296#[inline(always)]
1297fn log_not_implemented<T: Debug>(msg: &T) {
1298    log::warn!("{msg:?} – handler not implemented");
1299}
1300
1301#[inline(always)]
1302fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1303    log::error!("Error on {cmd:?}: {e}");
1304}