nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Defines the `DataClient` trait, the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    any::Any,
23    fmt::{Debug, Display},
24    ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29    RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30    RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31    SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32    SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33    SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34    SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35    UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36    UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37    UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38    UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42    DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
43    SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
44    UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
45};
46#[cfg(feature = "defi")]
47use nautilus_model::defi::Blockchain;
48use nautilus_model::{
49    data::{BarType, DataType},
50    identifiers::{ClientId, InstrumentId, Venue},
51};
52
53/// Defines the interface for a data client, managing connections, subscriptions, and requests.
54#[async_trait::async_trait]
55pub trait DataClient: Any + Sync + Send {
56    /// Returns the unique identifier for this data client.
57    fn client_id(&self) -> ClientId;
58
59    /// Returns the optional venue this client is associated with.
60    fn venue(&self) -> Option<Venue>;
61
62    /// Starts the data client.
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if the operation fails.
67    fn start(&mut self) -> anyhow::Result<()>;
68
69    /// Stops the data client.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the operation fails.
74    fn stop(&mut self) -> anyhow::Result<()>;
75
76    /// Resets the data client to its initial state.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the operation fails.
81    fn reset(&mut self) -> anyhow::Result<()>;
82
83    /// Disposes of client resources and cleans up.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the operation fails.
88    fn dispose(&mut self) -> anyhow::Result<()>;
89
90    /// Connects external API's if needed.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the operation fails.
95    async fn connect(&mut self) -> anyhow::Result<()>;
96
97    /// Disconnects external API's if needed.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the operation fails.
102    async fn disconnect(&mut self) -> anyhow::Result<()>;
103
104    /// Returns `true` if the client is currently connected.
105    fn is_connected(&self) -> bool;
106
107    /// Returns `true` if the client is currently disconnected.
108    fn is_disconnected(&self) -> bool;
109
110    /// Subscribes to custom data types according to the command.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the subscribe operation fails.
115    fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
116        log_not_implemented(&cmd);
117        Ok(())
118    }
119
120    /// Subscribes to instruments list for the specified venue.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the subscribe operation fails.
125    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
126        log_not_implemented(&cmd);
127        Ok(())
128    }
129
130    /// Subscribes to data for a single instrument.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the subscribe operation fails.
135    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
136        log_not_implemented(&cmd);
137        Ok(())
138    }
139
140    /// Subscribes to order book delta updates for the specified instrument.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the subscribe operation fails.
145    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
146        log_not_implemented(&cmd);
147        Ok(())
148    }
149
150    /// Subscribes to top 10 order book depth updates for the specified instrument.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the subscribe operation fails.
155    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
156        log_not_implemented(&cmd);
157        Ok(())
158    }
159
160    /// Subscribes to periodic order book snapshots for the specified instrument.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the subscribe operation fails.
165    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
166        log_not_implemented(&cmd);
167        Ok(())
168    }
169
170    /// Subscribes to quote updates for the specified instrument.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the subscribe operation fails.
175    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
176        log_not_implemented(&cmd);
177        Ok(())
178    }
179
180    /// Subscribes to trade updates for the specified instrument.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the subscribe operation fails.
185    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
186        log_not_implemented(&cmd);
187        Ok(())
188    }
189
190    /// Subscribes to mark price updates for the specified instrument.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the subscribe operation fails.
195    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
196        log_not_implemented(&cmd);
197        Ok(())
198    }
199
200    /// Subscribes to index price updates for the specified instrument.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if the subscribe operation fails.
205    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
206        log_not_implemented(&cmd);
207        Ok(())
208    }
209
210    /// Subscribes to funding rate updates for the specified instrument.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the subscribe operation fails.
215    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
216        log_not_implemented(&cmd);
217        Ok(())
218    }
219
220    /// Subscribes to bar updates of the specified bar type.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the subscribe operation fails.
225    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
226        log_not_implemented(&cmd);
227        Ok(())
228    }
229
230    /// Subscribes to status updates for the specified instrument.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the subscribe operation fails.
235    fn subscribe_instrument_status(
236        &mut self,
237        cmd: &SubscribeInstrumentStatus,
238    ) -> anyhow::Result<()> {
239        log_not_implemented(&cmd);
240        Ok(())
241    }
242
243    /// Subscribes to instrument close events for the specified instrument.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the subscription operation fails.
248    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
249        log_not_implemented(&cmd);
250        Ok(())
251    }
252
253    #[cfg(feature = "defi")]
254    /// Subscribes to blocks for a specified blockchain.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the subscription operation fails.
259    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
260        log_not_implemented(&cmd);
261        Ok(())
262    }
263
264    #[cfg(feature = "defi")]
265    /// Subscribes to pool definition updates for a specified AMM pool.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the subscription operation fails.
270    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
271        log_not_implemented(&cmd);
272        Ok(())
273    }
274
275    #[cfg(feature = "defi")]
276    /// Subscribes to pool swaps for a specified AMM pool.
277    ///
278    /// # Errors
279    ///
280    /// Returns an error if the subscription operation fails.
281    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
282        log_not_implemented(&cmd);
283        Ok(())
284    }
285
286    #[cfg(feature = "defi")]
287    /// Subscribes to pool liquidity updates for a specified AMM pool.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the subscription operation fails.
292    fn subscribe_pool_liquidity_updates(
293        &mut self,
294        cmd: &SubscribePoolLiquidityUpdates,
295    ) -> anyhow::Result<()> {
296        log_not_implemented(&cmd);
297        Ok(())
298    }
299
300    /// Unsubscribes from custom data types according to the command.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the unsubscribe operation fails.
305    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
306        log_not_implemented(&cmd);
307        Ok(())
308    }
309
310    /// Unsubscribes from instruments list for the specified venue.
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if the unsubscribe operation fails.
315    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
316        log_not_implemented(&cmd);
317        Ok(())
318    }
319
320    /// Unsubscribes from data for the specified instrument.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the unsubscribe operation fails.
325    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
326        log_not_implemented(&cmd);
327        Ok(())
328    }
329
330    /// Unsubscribes from order book delta updates for the specified instrument.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if the unsubscribe operation fails.
335    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
336        log_not_implemented(&cmd);
337        Ok(())
338    }
339
340    /// Unsubscribes from top 10 order book depth updates for the specified instrument.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the unsubscribe operation fails.
345    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
346        log_not_implemented(&cmd);
347        Ok(())
348    }
349
350    /// Unsubscribes from periodic order book snapshots for the specified instrument.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the unsubscribe operation fails.
355    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
356        log_not_implemented(&cmd);
357        Ok(())
358    }
359
360    /// Unsubscribes from quote updates for the specified instrument.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the unsubscribe operation fails.
365    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
366        log_not_implemented(&cmd);
367        Ok(())
368    }
369
370    /// Unsubscribes from trade updates for the specified instrument.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the unsubscribe operation fails.
375    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
376        log_not_implemented(&cmd);
377        Ok(())
378    }
379
380    /// Unsubscribes from mark price updates for the specified instrument.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the unsubscribe operation fails.
385    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
386        log_not_implemented(&cmd);
387        Ok(())
388    }
389
390    /// Unsubscribes from index price updates for the specified instrument.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the unsubscribe operation fails.
395    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
396        log_not_implemented(&cmd);
397        Ok(())
398    }
399
400    /// Unsubscribes from funding rate updates for the specified instrument.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the unsubscribe operation fails.
405    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
406        log_not_implemented(&cmd);
407        Ok(())
408    }
409
410    /// Unsubscribes from bar updates of the specified bar type.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the unsubscribe operation fails.
415    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
416        log_not_implemented(&cmd);
417        Ok(())
418    }
419
420    /// Unsubscribes from instrument status updates for the specified instrument.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if the unsubscribe operation fails.
425    fn unsubscribe_instrument_status(
426        &mut self,
427        cmd: &UnsubscribeInstrumentStatus,
428    ) -> anyhow::Result<()> {
429        log_not_implemented(&cmd);
430        Ok(())
431    }
432
433    /// Unsubscribes from instrument close events for the specified instrument.
434    ///
435    /// # Errors
436    ///
437    /// Returns an error if the unsubscribe operation fails.
438    fn unsubscribe_instrument_close(
439        &mut self,
440        cmd: &UnsubscribeInstrumentClose,
441    ) -> anyhow::Result<()> {
442        log_not_implemented(&cmd);
443        Ok(())
444    }
445
446    #[cfg(feature = "defi")]
447    /// Unsubscribes from blocks for a specified blockchain.
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if the subscription operation fails.
452    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
453        log_not_implemented(&cmd);
454        Ok(())
455    }
456
457    #[cfg(feature = "defi")]
458    /// Unsubscribes from pool definition updates for a specified AMM pool.
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if the subscription operation fails.
463    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
464        log_not_implemented(&cmd);
465        Ok(())
466    }
467
468    #[cfg(feature = "defi")]
469    /// Unsubscribes from swaps for a specified AMM pool.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if the subscription operation fails.
474    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
475        log_not_implemented(&cmd);
476        Ok(())
477    }
478
479    #[cfg(feature = "defi")]
480    /// Unsubscribes from pool liquidity updates for a specified AMM pool.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the subscription operation fails.
485    fn unsubscribe_pool_liquidity_updates(
486        &mut self,
487        cmd: &UnsubscribePoolLiquidityUpdates,
488    ) -> anyhow::Result<()> {
489        log_not_implemented(&cmd);
490        Ok(())
491    }
492
493    /// Sends a custom data request to the provider.
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if the data request fails.
498    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
499        log_not_implemented(&request);
500        Ok(())
501    }
502
503    /// Requests a list of instruments from the provider for a given venue.
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if the instruments request fails.
508    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
509        log_not_implemented(&request);
510        Ok(())
511    }
512
513    /// Requests detailed data for a single instrument.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the instrument request fails.
518    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
519        log_not_implemented(&request);
520        Ok(())
521    }
522
523    /// Requests a snapshot of the order book for a specified instrument.
524    ///
525    /// # Errors
526    ///
527    /// Returns an error if the book snapshot request fails.
528    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
529        log_not_implemented(&request);
530        Ok(())
531    }
532
533    /// Requests historical or streaming quote data for a specified instrument.
534    ///
535    /// # Errors
536    ///
537    /// Returns an error if the quotes request fails.
538    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
539        log_not_implemented(&request);
540        Ok(())
541    }
542
543    /// Requests historical or streaming trade data for a specified instrument.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if the trades request fails.
548    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
549        log_not_implemented(&request);
550        Ok(())
551    }
552
553    /// Requests historical or streaming bar data for a specified instrument and bar type.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the bars request fails.
558    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
559        log_not_implemented(&request);
560        Ok(())
561    }
562
563    /// Requests historical order book depth data for a specified instrument.
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if the order book depths request fails.
568    fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
569        log_not_implemented(&request);
570        Ok(())
571    }
572}
573
574/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
575pub struct DataClientAdapter {
576    client: Box<dyn DataClient>,
577    pub client_id: ClientId,
578    pub venue: Option<Venue>,
579    pub handles_book_deltas: bool,
580    pub handles_book_snapshots: bool,
581    pub subscriptions_custom: AHashSet<DataType>,
582    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
583    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
584    pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
585    pub subscriptions_quotes: AHashSet<InstrumentId>,
586    pub subscriptions_trades: AHashSet<InstrumentId>,
587    pub subscriptions_bars: AHashSet<BarType>,
588    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
589    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
590    pub subscriptions_instrument: AHashSet<InstrumentId>,
591    pub subscriptions_instrument_venue: AHashSet<Venue>,
592    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
593    pub subscriptions_index_prices: AHashSet<InstrumentId>,
594    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
595    #[cfg(feature = "defi")]
596    pub subscriptions_blocks: AHashSet<Blockchain>,
597    #[cfg(feature = "defi")]
598    pub subscriptions_pools: AHashSet<InstrumentId>,
599    #[cfg(feature = "defi")]
600    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
601    #[cfg(feature = "defi")]
602    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
603}
604
605impl Deref for DataClientAdapter {
606    type Target = Box<dyn DataClient>;
607
608    fn deref(&self) -> &Self::Target {
609        &self.client
610    }
611}
612
613impl DerefMut for DataClientAdapter {
614    fn deref_mut(&mut self) -> &mut Self::Target {
615        &mut self.client
616    }
617}
618
619impl Debug for DataClientAdapter {
620    #[rustfmt::skip]
621    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
622        f.debug_struct(stringify!(DataClientAdapter))
623            .field("client_id", &self.client_id)
624            .field("venue", &self.venue)
625            .field("handles_book_deltas", &self.handles_book_deltas)
626            .field("handles_book_snapshots", &self.handles_book_snapshots)
627            .field("subscriptions_custom", &self.subscriptions_custom)
628            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
629            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
630            .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
631            .field("subscriptions_quotes", &self.subscriptions_quotes)
632            .field("subscriptions_trades", &self.subscriptions_trades)
633            .field("subscriptions_bars", &self.subscriptions_bars)
634            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
635            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
636            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
637            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
638            .field("subscriptions_instrument", &self.subscriptions_instrument)
639            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
640            .finish()
641    }
642}
643
644impl DataClientAdapter {
645    /// Creates a new [`DataClientAdapter`] with the given client and clock.
646    #[must_use]
647    pub fn new(
648        client_id: ClientId,
649        venue: Option<Venue>,
650        handles_order_book_deltas: bool,
651        handles_order_book_snapshots: bool,
652        client: Box<dyn DataClient>,
653    ) -> Self {
654        Self {
655            client,
656            client_id,
657            venue,
658            handles_book_deltas: handles_order_book_deltas,
659            handles_book_snapshots: handles_order_book_snapshots,
660            subscriptions_custom: AHashSet::new(),
661            subscriptions_book_deltas: AHashSet::new(),
662            subscriptions_book_depth10: AHashSet::new(),
663            subscriptions_book_snapshots: AHashSet::new(),
664            subscriptions_quotes: AHashSet::new(),
665            subscriptions_trades: AHashSet::new(),
666            subscriptions_mark_prices: AHashSet::new(),
667            subscriptions_index_prices: AHashSet::new(),
668            subscriptions_funding_rates: AHashSet::new(),
669            subscriptions_bars: AHashSet::new(),
670            subscriptions_instrument_status: AHashSet::new(),
671            subscriptions_instrument_close: AHashSet::new(),
672            subscriptions_instrument: AHashSet::new(),
673            subscriptions_instrument_venue: AHashSet::new(),
674            #[cfg(feature = "defi")]
675            subscriptions_blocks: AHashSet::new(),
676            #[cfg(feature = "defi")]
677            subscriptions_pools: AHashSet::new(),
678            #[cfg(feature = "defi")]
679            subscriptions_pool_swaps: AHashSet::new(),
680            #[cfg(feature = "defi")]
681            subscriptions_pool_liquidity_updates: AHashSet::new(),
682        }
683    }
684
685    #[allow(clippy::borrowed_box)]
686    #[must_use]
687    pub fn get_client(&self) -> &Box<dyn DataClient> {
688        &self.client
689    }
690
691    #[inline]
692    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
693        if let Err(e) = match cmd {
694            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
695            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
696            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
697            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
698            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
699            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
700            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
701            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
702            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
703            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
704            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
705            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
706            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
707            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
708        } {
709            log_command_error(&cmd, &e);
710        }
711    }
712
713    #[cfg(feature = "defi")]
714    #[inline]
715    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
716        if let Err(e) = match cmd {
717            DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
718            DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
719            DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
720            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
721                self.subscribe_pool_liquidity_updates(cmd)
722            }
723        } {
724            log_command_error(&cmd, &e);
725        }
726    }
727
728    #[inline]
729    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
730        if let Err(e) = match cmd {
731            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
732            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
733            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
734            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
735            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
736            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
737            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
738            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
739            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
740            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
741            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
742            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
743            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
744            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
745        } {
746            log_command_error(&cmd, &e);
747        }
748    }
749
750    #[cfg(feature = "defi")]
751    #[inline]
752    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
753        if let Err(e) = match cmd {
754            DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
755            DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
756            DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
757            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
758                self.unsubscribe_pool_liquidity_updates(cmd)
759            }
760        } {
761            log_command_error(&cmd, &e);
762        }
763    }
764
765    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
766
767    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if the underlying client subscribe operation fails.
772    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
773        if !self.subscriptions_custom.contains(&cmd.data_type) {
774            self.subscriptions_custom.insert(cmd.data_type.clone());
775            self.client.subscribe(cmd)?;
776        }
777        Ok(())
778    }
779
780    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
781    ///
782    /// # Errors
783    ///
784    /// Returns an error if the underlying client unsubscribe operation fails.
785    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
786        if self.subscriptions_custom.contains(&cmd.data_type) {
787            self.subscriptions_custom.remove(&cmd.data_type);
788            self.client.unsubscribe(cmd)?;
789        }
790        Ok(())
791    }
792
793    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
794    ///
795    /// # Errors
796    ///
797    /// Returns an error if the underlying client subscribe operation fails.
798    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
799        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
800            self.subscriptions_instrument_venue.insert(cmd.venue);
801            self.client.subscribe_instruments(cmd)?;
802        }
803
804        Ok(())
805    }
806
807    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
808    ///
809    /// # Errors
810    ///
811    /// Returns an error if the underlying client unsubscribe operation fails.
812    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
813        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
814            self.subscriptions_instrument_venue.remove(&cmd.venue);
815            self.client.unsubscribe_instruments(cmd)?;
816        }
817
818        Ok(())
819    }
820
821    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if the underlying client subscribe operation fails.
826    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
827        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
828            self.subscriptions_instrument.insert(cmd.instrument_id);
829            self.client.subscribe_instrument(cmd)?;
830        }
831
832        Ok(())
833    }
834
835    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
836    ///
837    /// # Errors
838    ///
839    /// Returns an error if the underlying client unsubscribe operation fails.
840    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
841        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
842            self.subscriptions_instrument.remove(&cmd.instrument_id);
843            self.client.unsubscribe_instrument(cmd)?;
844        }
845
846        Ok(())
847    }
848
849    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
850    ///
851    /// # Errors
852    ///
853    /// Returns an error if the underlying client subscribe operation fails.
854    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
855        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
856            self.subscriptions_book_deltas.insert(cmd.instrument_id);
857            self.client.subscribe_book_deltas(cmd)?;
858        }
859
860        Ok(())
861    }
862
863    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
864    ///
865    /// # Errors
866    ///
867    /// Returns an error if the underlying client unsubscribe operation fails.
868    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
869        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
870            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
871            self.client.unsubscribe_book_deltas(cmd)?;
872        }
873
874        Ok(())
875    }
876
877    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
878    ///
879    /// # Errors
880    ///
881    /// Returns an error if the underlying client subscribe operation fails.
882    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
883        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
884            self.subscriptions_book_depth10.insert(cmd.instrument_id);
885            self.client.subscribe_book_depth10(cmd)?;
886        }
887
888        Ok(())
889    }
890
891    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
892    ///
893    /// # Errors
894    ///
895    /// Returns an error if the underlying client unsubscribe operation fails.
896    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
897        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
898            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
899            self.client.unsubscribe_book_depth10(cmd)?;
900        }
901
902        Ok(())
903    }
904
905    /// Subscribes to book snapshots for an instrument, updating internal state and forwarding to the client.
906    ///
907    /// # Errors
908    ///
909    /// Returns an error if the underlying client subscribe operation fails.
910    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
911        if !self
912            .subscriptions_book_snapshots
913            .contains(&cmd.instrument_id)
914        {
915            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
916            self.client.subscribe_book_snapshots(cmd)?;
917        }
918
919        Ok(())
920    }
921
922    /// Unsubscribes from book snapshots for an instrument, updating internal state and forwarding to the client.
923    ///
924    /// # Errors
925    ///
926    /// Returns an error if the underlying client unsubscribe operation fails.
927    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
928        if self
929            .subscriptions_book_snapshots
930            .contains(&cmd.instrument_id)
931        {
932            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
933            self.client.unsubscribe_book_snapshots(cmd)?;
934        }
935
936        Ok(())
937    }
938
939    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
940    ///
941    /// # Errors
942    ///
943    /// Returns an error if the underlying client subscribe operation fails.
944    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
945        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
946            self.subscriptions_quotes.insert(cmd.instrument_id);
947            self.client.subscribe_quotes(cmd)?;
948        }
949        Ok(())
950    }
951
952    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
953    ///
954    /// # Errors
955    ///
956    /// Returns an error if the underlying client unsubscribe operation fails.
957    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
958        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
959            self.subscriptions_quotes.remove(&cmd.instrument_id);
960            self.client.unsubscribe_quotes(cmd)?;
961        }
962        Ok(())
963    }
964
965    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
966    ///
967    /// # Errors
968    ///
969    /// Returns an error if the underlying client subscribe operation fails.
970    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
971        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
972            self.subscriptions_trades.insert(cmd.instrument_id);
973            self.client.subscribe_trades(cmd)?;
974        }
975        Ok(())
976    }
977
978    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
979    ///
980    /// # Errors
981    ///
982    /// Returns an error if the underlying client unsubscribe operation fails.
983    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
984        if self.subscriptions_trades.contains(&cmd.instrument_id) {
985            self.subscriptions_trades.remove(&cmd.instrument_id);
986            self.client.unsubscribe_trades(cmd)?;
987        }
988        Ok(())
989    }
990
991    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
992    ///
993    /// # Errors
994    ///
995    /// Returns an error if the underlying client subscribe operation fails.
996    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
997        if !self.subscriptions_bars.contains(&cmd.bar_type) {
998            self.subscriptions_bars.insert(cmd.bar_type);
999            self.client.subscribe_bars(cmd)?;
1000        }
1001        Ok(())
1002    }
1003
1004    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
1005    ///
1006    /// # Errors
1007    ///
1008    /// Returns an error if the underlying client unsubscribe operation fails.
1009    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1010        if self.subscriptions_bars.contains(&cmd.bar_type) {
1011            self.subscriptions_bars.remove(&cmd.bar_type);
1012            self.client.unsubscribe_bars(cmd)?;
1013        }
1014        Ok(())
1015    }
1016
1017    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if the underlying client subscribe operation fails.
1022    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1023        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1024            self.subscriptions_mark_prices.insert(cmd.instrument_id);
1025            self.client.subscribe_mark_prices(cmd)?;
1026        }
1027        Ok(())
1028    }
1029
1030    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
1031    ///
1032    /// # Errors
1033    ///
1034    /// Returns an error if the underlying client unsubscribe operation fails.
1035    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1036        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1037            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1038            self.client.unsubscribe_mark_prices(cmd)?;
1039        }
1040        Ok(())
1041    }
1042
1043    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
1044    ///
1045    /// # Errors
1046    ///
1047    /// Returns an error if the underlying client subscribe operation fails.
1048    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1049        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1050            self.subscriptions_index_prices.insert(cmd.instrument_id);
1051            self.client.subscribe_index_prices(cmd)?;
1052        }
1053        Ok(())
1054    }
1055
1056    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns an error if the underlying client unsubscribe operation fails.
1061    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1062        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1063            self.subscriptions_index_prices.remove(&cmd.instrument_id);
1064            self.client.unsubscribe_index_prices(cmd)?;
1065        }
1066        Ok(())
1067    }
1068
1069    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Returns an error if the underlying client subscribe operation fails.
1074    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1075        if !self
1076            .subscriptions_funding_rates
1077            .contains(&cmd.instrument_id)
1078        {
1079            self.subscriptions_funding_rates.insert(cmd.instrument_id);
1080            self.client.subscribe_funding_rates(cmd)?;
1081        }
1082        Ok(())
1083    }
1084
1085    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
1086    ///
1087    /// # Errors
1088    ///
1089    /// Returns an error if the underlying client unsubscribe operation fails.
1090    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1091        if self
1092            .subscriptions_funding_rates
1093            .contains(&cmd.instrument_id)
1094        {
1095            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1096            self.client.unsubscribe_funding_rates(cmd)?;
1097        }
1098        Ok(())
1099    }
1100
1101    /// Subscribes to instrument status updates for the specified instrument.
1102    ///
1103    /// # Errors
1104    ///
1105    /// Returns an error if the underlying client subscribe operation fails.
1106    fn subscribe_instrument_status(
1107        &mut self,
1108        cmd: &SubscribeInstrumentStatus,
1109    ) -> anyhow::Result<()> {
1110        if !self
1111            .subscriptions_instrument_status
1112            .contains(&cmd.instrument_id)
1113        {
1114            self.subscriptions_instrument_status
1115                .insert(cmd.instrument_id);
1116            self.client.subscribe_instrument_status(cmd)?;
1117        }
1118        Ok(())
1119    }
1120
1121    /// Unsubscribes from instrument status updates for the specified instrument.
1122    ///
1123    /// # Errors
1124    ///
1125    /// Returns an error if the underlying client unsubscribe operation fails.
1126    fn unsubscribe_instrument_status(
1127        &mut self,
1128        cmd: &UnsubscribeInstrumentStatus,
1129    ) -> anyhow::Result<()> {
1130        if self
1131            .subscriptions_instrument_status
1132            .contains(&cmd.instrument_id)
1133        {
1134            self.subscriptions_instrument_status
1135                .remove(&cmd.instrument_id);
1136            self.client.unsubscribe_instrument_status(cmd)?;
1137        }
1138        Ok(())
1139    }
1140
1141    /// Subscribes to instrument close events for the specified instrument.
1142    ///
1143    /// # Errors
1144    ///
1145    /// Returns an error if the underlying client subscribe operation fails.
1146    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1147        if !self
1148            .subscriptions_instrument_close
1149            .contains(&cmd.instrument_id)
1150        {
1151            self.subscriptions_instrument_close
1152                .insert(cmd.instrument_id);
1153            self.client.subscribe_instrument_close(cmd)?;
1154        }
1155        Ok(())
1156    }
1157
1158    /// Unsubscribes from instrument close events for the specified instrument.
1159    ///
1160    /// # Errors
1161    ///
1162    /// Returns an error if the underlying client unsubscribe operation fails.
1163    fn unsubscribe_instrument_close(
1164        &mut self,
1165        cmd: &UnsubscribeInstrumentClose,
1166    ) -> anyhow::Result<()> {
1167        if self
1168            .subscriptions_instrument_close
1169            .contains(&cmd.instrument_id)
1170        {
1171            self.subscriptions_instrument_close
1172                .remove(&cmd.instrument_id);
1173            self.client.unsubscribe_instrument_close(cmd)?;
1174        }
1175        Ok(())
1176    }
1177
1178    #[cfg(feature = "defi")]
1179    /// Subscribes to block events for the specified blockchain.
1180    ///
1181    /// # Errors
1182    ///
1183    /// Returns an error if the underlying client subscribe operation fails.
1184    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
1185        if !self.subscriptions_blocks.contains(&cmd.chain) {
1186            self.subscriptions_blocks.insert(cmd.chain);
1187            self.client.subscribe_blocks(cmd)?;
1188        }
1189        Ok(())
1190    }
1191
1192    #[cfg(feature = "defi")]
1193    /// Unsubscribes from block events for the specified blockchain.
1194    ///
1195    /// # Errors
1196    ///
1197    /// Returns an error if the underlying client unsubscribe operation fails.
1198    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
1199        if self.subscriptions_blocks.contains(&cmd.chain) {
1200            self.subscriptions_blocks.remove(&cmd.chain);
1201            self.client.unsubscribe_blocks(cmd)?;
1202        }
1203        Ok(())
1204    }
1205
1206    #[cfg(feature = "defi")]
1207    /// Subscribes to pool definition updates for the specified AMM pool.
1208    ///
1209    /// # Errors
1210    ///
1211    /// Returns an error if the underlying client subscribe operation fails.
1212    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
1213        if !self.subscriptions_pools.contains(&cmd.instrument_id) {
1214            self.subscriptions_pools.insert(cmd.instrument_id);
1215            self.client.subscribe_pool(cmd)?;
1216        }
1217        Ok(())
1218    }
1219
1220    #[cfg(feature = "defi")]
1221    /// Subscribes to pool swap events for the specified AMM pool.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns an error if the underlying client subscribe operation fails.
1226    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
1227        if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1228            self.subscriptions_pool_swaps.insert(cmd.instrument_id);
1229            self.client.subscribe_pool_swaps(cmd)?;
1230        }
1231        Ok(())
1232    }
1233
1234    #[cfg(feature = "defi")]
1235    /// Subscribes to pool liquidity update events for the specified AMM pool.
1236    ///
1237    /// # Errors
1238    ///
1239    /// Returns an error if the underlying client subscribe operation fails.
1240    fn subscribe_pool_liquidity_updates(
1241        &mut self,
1242        cmd: &SubscribePoolLiquidityUpdates,
1243    ) -> anyhow::Result<()> {
1244        if !self
1245            .subscriptions_pool_liquidity_updates
1246            .contains(&cmd.instrument_id)
1247        {
1248            self.subscriptions_pool_liquidity_updates
1249                .insert(cmd.instrument_id);
1250            self.client.subscribe_pool_liquidity_updates(cmd)?;
1251        }
1252        Ok(())
1253    }
1254
1255    #[cfg(feature = "defi")]
1256    /// Unsubscribes from pool definition updates for the specified AMM pool.
1257    ///
1258    /// # Errors
1259    ///
1260    /// Returns an error if the underlying client unsubscribe operation fails.
1261    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
1262        if self.subscriptions_pools.contains(&cmd.instrument_id) {
1263            self.subscriptions_pools.remove(&cmd.instrument_id);
1264            self.client.unsubscribe_pool(cmd)?;
1265        }
1266        Ok(())
1267    }
1268
1269    #[cfg(feature = "defi")]
1270    /// Unsubscribes from swap events for the specified AMM pool.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if the underlying client unsubscribe operation fails.
1275    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
1276        if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1277            self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
1278            self.client.unsubscribe_pool_swaps(cmd)?;
1279        }
1280        Ok(())
1281    }
1282
1283    #[cfg(feature = "defi")]
1284    /// Unsubscribes from pool liquidity update events for the specified AMM pool.
1285    ///
1286    /// # Errors
1287    ///
1288    /// Returns an error if the underlying client unsubscribe operation fails.
1289    fn unsubscribe_pool_liquidity_updates(
1290        &mut self,
1291        cmd: &UnsubscribePoolLiquidityUpdates,
1292    ) -> anyhow::Result<()> {
1293        if self
1294            .subscriptions_pool_liquidity_updates
1295            .contains(&cmd.instrument_id)
1296        {
1297            self.subscriptions_pool_liquidity_updates
1298                .remove(&cmd.instrument_id);
1299            self.client.unsubscribe_pool_liquidity_updates(cmd)?;
1300        }
1301        Ok(())
1302    }
1303
1304    // -- REQUEST HANDLERS ------------------------------------------------------------------------
1305
1306    /// Sends a data request to the underlying client.
1307    ///
1308    /// # Errors
1309    ///
1310    /// Returns an error if the client request fails.
1311    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1312        self.client.request_data(req)
1313    }
1314
1315    /// Sends a single instrument request to the client.
1316    ///
1317    /// # Errors
1318    ///
1319    /// Returns an error if the client fails to process the request.
1320    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1321        self.client.request_instrument(req)
1322    }
1323
1324    /// Sends a batch instruments request to the client.
1325    ///
1326    /// # Errors
1327    ///
1328    /// Returns an error if the client fails to process the request.
1329    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1330        self.client.request_instruments(req)
1331    }
1332
1333    /// Sends a quotes request for a given instrument.
1334    ///
1335    /// # Errors
1336    ///
1337    /// Returns an error if the client fails to process the quotes request.
1338    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1339        self.client.request_quotes(req)
1340    }
1341
1342    /// Sends a trades request for a given instrument.
1343    ///
1344    /// # Errors
1345    ///
1346    /// Returns an error if the client fails to process the trades request.
1347    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1348        self.client.request_trades(req)
1349    }
1350
1351    /// Sends a bars request for a given instrument and bar type.
1352    ///
1353    /// # Errors
1354    ///
1355    /// Returns an error if the client fails to process the bars request.
1356    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1357        self.client.request_bars(req)
1358    }
1359
1360    /// Sends an order book depths request for a given instrument.
1361    ///
1362    /// # Errors
1363    ///
1364    /// Returns an error if the client fails to process the order book depths request.
1365    pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1366        self.client.request_book_depth(req)
1367    }
1368}
1369
1370#[inline(always)]
1371fn log_not_implemented<T: Debug>(msg: &T) {
1372    log::warn!("{msg:?} – handler not implemented");
1373}
1374
1375#[inline(always)]
1376fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1377    log::error!("Error on {cmd:?}: {e}");
1378}