nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Defines the `DataClient` trait, the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    any::Any,
23    fmt::{Debug, Display},
24    ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29    RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
30    RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
31    SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
32    SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
33    SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars,
34    UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
35    UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
36    UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
37    UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38};
39#[cfg(feature = "defi")]
40use nautilus_common::messages::defi::{
41    DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
42    SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
43    UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
44};
45#[cfg(feature = "defi")]
46use nautilus_model::defi::Blockchain;
47use nautilus_model::{
48    data::{BarType, DataType},
49    identifiers::{ClientId, InstrumentId, Venue},
50};
51
52/// Defines the interface for a data client, managing connections, subscriptions, and requests.
53#[async_trait::async_trait]
54pub trait DataClient: Any + Sync + Send {
55    /// Returns the unique identifier for this data client.
56    fn client_id(&self) -> ClientId;
57
58    /// Returns the optional venue this client is associated with.
59    fn venue(&self) -> Option<Venue>;
60
61    /// Starts the data client.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if the operation fails.
66    fn start(&mut self) -> anyhow::Result<()>;
67
68    /// Stops the data client.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the operation fails.
73    fn stop(&mut self) -> anyhow::Result<()>;
74
75    /// Resets the data client to its initial state.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the operation fails.
80    fn reset(&mut self) -> anyhow::Result<()>;
81
82    /// Disposes of client resources and cleans up.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if the operation fails.
87    fn dispose(&mut self) -> anyhow::Result<()>;
88
89    /// Connects external API's if needed.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the operation fails.
94    async fn connect(&mut self) -> anyhow::Result<()>;
95
96    /// Disconnects external API's if needed.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the operation fails.
101    async fn disconnect(&mut self) -> anyhow::Result<()>;
102
103    /// Returns `true` if the client is currently connected.
104    fn is_connected(&self) -> bool;
105
106    /// Returns `true` if the client is currently disconnected.
107    fn is_disconnected(&self) -> bool;
108
109    /// Subscribes to custom data types according to the command.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the subscribe operation fails.
114    fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
115        log_not_implemented(&cmd);
116        Ok(())
117    }
118
119    /// Subscribes to instruments list for the specified venue.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the subscribe operation fails.
124    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
125        log_not_implemented(&cmd);
126        Ok(())
127    }
128
129    /// Subscribes to data for a single instrument.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the subscribe operation fails.
134    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
135        log_not_implemented(&cmd);
136        Ok(())
137    }
138
139    /// Subscribes to order book delta updates for the specified instrument.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if the subscribe operation fails.
144    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
145        log_not_implemented(&cmd);
146        Ok(())
147    }
148
149    /// Subscribes to top 10 order book depth updates for the specified instrument.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the subscribe operation fails.
154    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
155        log_not_implemented(&cmd);
156        Ok(())
157    }
158
159    /// Subscribes to periodic order book snapshots for the specified instrument.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the subscribe operation fails.
164    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
165        log_not_implemented(&cmd);
166        Ok(())
167    }
168
169    /// Subscribes to quote updates for the specified instrument.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the subscribe operation fails.
174    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
175        log_not_implemented(&cmd);
176        Ok(())
177    }
178
179    /// Subscribes to trade updates for the specified instrument.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if the subscribe operation fails.
184    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
185        log_not_implemented(&cmd);
186        Ok(())
187    }
188
189    /// Subscribes to mark price updates for the specified instrument.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if the subscribe operation fails.
194    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
195        log_not_implemented(&cmd);
196        Ok(())
197    }
198
199    /// Subscribes to index price updates for the specified instrument.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the subscribe operation fails.
204    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
205        log_not_implemented(&cmd);
206        Ok(())
207    }
208
209    /// Subscribes to funding rate updates for the specified instrument.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the subscribe operation fails.
214    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
215        log_not_implemented(&cmd);
216        Ok(())
217    }
218
219    /// Subscribes to bar updates of the specified bar type.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the subscribe operation fails.
224    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
225        log_not_implemented(&cmd);
226        Ok(())
227    }
228
229    /// Subscribes to status updates for the specified instrument.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the subscribe operation fails.
234    fn subscribe_instrument_status(
235        &mut self,
236        cmd: &SubscribeInstrumentStatus,
237    ) -> anyhow::Result<()> {
238        log_not_implemented(&cmd);
239        Ok(())
240    }
241
242    /// Subscribes to instrument close events for the specified instrument.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the subscription operation fails.
247    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
248        log_not_implemented(&cmd);
249        Ok(())
250    }
251
252    #[cfg(feature = "defi")]
253    /// Subscribes to blocks for a specified blockchain.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if the subscription operation fails.
258    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
259        log_not_implemented(&cmd);
260        Ok(())
261    }
262
263    #[cfg(feature = "defi")]
264    /// Subscribes to pool definition updates for a specified AMM pool.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the subscription operation fails.
269    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
270        log_not_implemented(&cmd);
271        Ok(())
272    }
273
274    #[cfg(feature = "defi")]
275    /// Subscribes to pool swaps for a specified AMM pool.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the subscription operation fails.
280    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
281        log_not_implemented(&cmd);
282        Ok(())
283    }
284
285    #[cfg(feature = "defi")]
286    /// Subscribes to pool liquidity updates for a specified AMM pool.
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if the subscription operation fails.
291    fn subscribe_pool_liquidity_updates(
292        &mut self,
293        cmd: &SubscribePoolLiquidityUpdates,
294    ) -> anyhow::Result<()> {
295        log_not_implemented(&cmd);
296        Ok(())
297    }
298
299    /// Unsubscribes from custom data types according to the command.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if the unsubscribe operation fails.
304    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
305        log_not_implemented(&cmd);
306        Ok(())
307    }
308
309    /// Unsubscribes from instruments list for the specified venue.
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the unsubscribe operation fails.
314    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
315        log_not_implemented(&cmd);
316        Ok(())
317    }
318
319    /// Unsubscribes from data for the specified instrument.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the unsubscribe operation fails.
324    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
325        log_not_implemented(&cmd);
326        Ok(())
327    }
328
329    /// Unsubscribes from order book delta updates for the specified instrument.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the unsubscribe operation fails.
334    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
335        log_not_implemented(&cmd);
336        Ok(())
337    }
338
339    /// Unsubscribes from top 10 order book depth updates for the specified instrument.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if the unsubscribe operation fails.
344    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
345        log_not_implemented(&cmd);
346        Ok(())
347    }
348
349    /// Unsubscribes from periodic order book snapshots for the specified instrument.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if the unsubscribe operation fails.
354    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
355        log_not_implemented(&cmd);
356        Ok(())
357    }
358
359    /// Unsubscribes from quote updates for the specified instrument.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if the unsubscribe operation fails.
364    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
365        log_not_implemented(&cmd);
366        Ok(())
367    }
368
369    /// Unsubscribes from trade updates for the specified instrument.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the unsubscribe operation fails.
374    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
375        log_not_implemented(&cmd);
376        Ok(())
377    }
378
379    /// Unsubscribes from mark price updates for the specified instrument.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if the unsubscribe operation fails.
384    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
385        log_not_implemented(&cmd);
386        Ok(())
387    }
388
389    /// Unsubscribes from index price updates for the specified instrument.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the unsubscribe operation fails.
394    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
395        log_not_implemented(&cmd);
396        Ok(())
397    }
398
399    /// Unsubscribes from funding rate updates for the specified instrument.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the unsubscribe operation fails.
404    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
405        log_not_implemented(&cmd);
406        Ok(())
407    }
408
409    /// Unsubscribes from bar updates of the specified bar type.
410    ///
411    /// # Errors
412    ///
413    /// Returns an error if the unsubscribe operation fails.
414    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
415        log_not_implemented(&cmd);
416        Ok(())
417    }
418
419    /// Unsubscribes from instrument status updates for the specified instrument.
420    ///
421    /// # Errors
422    ///
423    /// Returns an error if the unsubscribe operation fails.
424    fn unsubscribe_instrument_status(
425        &mut self,
426        cmd: &UnsubscribeInstrumentStatus,
427    ) -> anyhow::Result<()> {
428        log_not_implemented(&cmd);
429        Ok(())
430    }
431
432    /// Unsubscribes from instrument close events for the specified instrument.
433    ///
434    /// # Errors
435    ///
436    /// Returns an error if the unsubscribe operation fails.
437    fn unsubscribe_instrument_close(
438        &mut self,
439        cmd: &UnsubscribeInstrumentClose,
440    ) -> anyhow::Result<()> {
441        log_not_implemented(&cmd);
442        Ok(())
443    }
444
445    #[cfg(feature = "defi")]
446    /// Unsubscribes from blocks for a specified blockchain.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if the subscription operation fails.
451    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
452        log_not_implemented(&cmd);
453        Ok(())
454    }
455
456    #[cfg(feature = "defi")]
457    /// Unsubscribes from pool definition updates for a specified AMM pool.
458    ///
459    /// # Errors
460    ///
461    /// Returns an error if the subscription operation fails.
462    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
463        log_not_implemented(&cmd);
464        Ok(())
465    }
466
467    #[cfg(feature = "defi")]
468    /// Unsubscribes from swaps for a specified AMM pool.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if the subscription operation fails.
473    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
474        log_not_implemented(&cmd);
475        Ok(())
476    }
477
478    #[cfg(feature = "defi")]
479    /// Unsubscribes from pool liquidity updates for a specified AMM pool.
480    ///
481    /// # Errors
482    ///
483    /// Returns an error if the subscription operation fails.
484    fn unsubscribe_pool_liquidity_updates(
485        &mut self,
486        cmd: &UnsubscribePoolLiquidityUpdates,
487    ) -> anyhow::Result<()> {
488        log_not_implemented(&cmd);
489        Ok(())
490    }
491
492    /// Sends a custom data request to the provider.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if the data request fails.
497    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
498        log_not_implemented(&request);
499        Ok(())
500    }
501
502    /// Requests a list of instruments from the provider for a given venue.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the instruments request fails.
507    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
508        log_not_implemented(&request);
509        Ok(())
510    }
511
512    /// Requests detailed data for a single instrument.
513    ///
514    /// # Errors
515    ///
516    /// Returns an error if the instrument request fails.
517    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
518        log_not_implemented(&request);
519        Ok(())
520    }
521
522    /// Requests a snapshot of the order book for a specified instrument.
523    ///
524    /// # Errors
525    ///
526    /// Returns an error if the book snapshot request fails.
527    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
528        log_not_implemented(&request);
529        Ok(())
530    }
531
532    /// Requests historical or streaming quote data for a specified instrument.
533    ///
534    /// # Errors
535    ///
536    /// Returns an error if the quotes request fails.
537    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
538        log_not_implemented(&request);
539        Ok(())
540    }
541
542    /// Requests historical or streaming trade data for a specified instrument.
543    ///
544    /// # Errors
545    ///
546    /// Returns an error if the trades request fails.
547    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
548        log_not_implemented(&request);
549        Ok(())
550    }
551
552    /// Requests historical or streaming bar data for a specified instrument and bar type.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if the bars request fails.
557    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
558        log_not_implemented(&request);
559        Ok(())
560    }
561}
562
563/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
564pub struct DataClientAdapter {
565    client: Box<dyn DataClient>,
566    pub client_id: ClientId,
567    pub venue: Option<Venue>,
568    pub handles_book_deltas: bool,
569    pub handles_book_snapshots: bool,
570    pub subscriptions_custom: AHashSet<DataType>,
571    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
572    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
573    pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
574    pub subscriptions_quotes: AHashSet<InstrumentId>,
575    pub subscriptions_trades: AHashSet<InstrumentId>,
576    pub subscriptions_bars: AHashSet<BarType>,
577    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
578    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
579    pub subscriptions_instrument: AHashSet<InstrumentId>,
580    pub subscriptions_instrument_venue: AHashSet<Venue>,
581    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
582    pub subscriptions_index_prices: AHashSet<InstrumentId>,
583    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
584    #[cfg(feature = "defi")]
585    pub subscriptions_blocks: AHashSet<Blockchain>,
586    #[cfg(feature = "defi")]
587    pub subscriptions_pools: AHashSet<InstrumentId>,
588    #[cfg(feature = "defi")]
589    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
590    #[cfg(feature = "defi")]
591    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
592}
593
594impl Deref for DataClientAdapter {
595    type Target = Box<dyn DataClient>;
596
597    fn deref(&self) -> &Self::Target {
598        &self.client
599    }
600}
601
602impl DerefMut for DataClientAdapter {
603    fn deref_mut(&mut self) -> &mut Self::Target {
604        &mut self.client
605    }
606}
607
608impl Debug for DataClientAdapter {
609    #[rustfmt::skip]
610    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
611        f.debug_struct(stringify!(DataClientAdapter))
612            .field("client_id", &self.client_id)
613            .field("venue", &self.venue)
614            .field("handles_book_deltas", &self.handles_book_deltas)
615            .field("handles_book_snapshots", &self.handles_book_snapshots)
616            .field("subscriptions_custom", &self.subscriptions_custom)
617            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
618            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
619            .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
620            .field("subscriptions_quotes", &self.subscriptions_quotes)
621            .field("subscriptions_trades", &self.subscriptions_trades)
622            .field("subscriptions_bars", &self.subscriptions_bars)
623            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
624            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
625            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
626            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
627            .field("subscriptions_instrument", &self.subscriptions_instrument)
628            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
629            .finish()
630    }
631}
632
633impl DataClientAdapter {
634    /// Creates a new [`DataClientAdapter`] with the given client and clock.
635    #[must_use]
636    pub fn new(
637        client_id: ClientId,
638        venue: Option<Venue>,
639        handles_order_book_deltas: bool,
640        handles_order_book_snapshots: bool,
641        client: Box<dyn DataClient>,
642    ) -> Self {
643        Self {
644            client,
645            client_id,
646            venue,
647            handles_book_deltas: handles_order_book_deltas,
648            handles_book_snapshots: handles_order_book_snapshots,
649            subscriptions_custom: AHashSet::new(),
650            subscriptions_book_deltas: AHashSet::new(),
651            subscriptions_book_depth10: AHashSet::new(),
652            subscriptions_book_snapshots: AHashSet::new(),
653            subscriptions_quotes: AHashSet::new(),
654            subscriptions_trades: AHashSet::new(),
655            subscriptions_mark_prices: AHashSet::new(),
656            subscriptions_index_prices: AHashSet::new(),
657            subscriptions_funding_rates: AHashSet::new(),
658            subscriptions_bars: AHashSet::new(),
659            subscriptions_instrument_status: AHashSet::new(),
660            subscriptions_instrument_close: AHashSet::new(),
661            subscriptions_instrument: AHashSet::new(),
662            subscriptions_instrument_venue: AHashSet::new(),
663            #[cfg(feature = "defi")]
664            subscriptions_blocks: AHashSet::new(),
665            #[cfg(feature = "defi")]
666            subscriptions_pools: AHashSet::new(),
667            #[cfg(feature = "defi")]
668            subscriptions_pool_swaps: AHashSet::new(),
669            #[cfg(feature = "defi")]
670            subscriptions_pool_liquidity_updates: AHashSet::new(),
671        }
672    }
673
674    #[allow(clippy::borrowed_box)]
675    #[must_use]
676    pub fn get_client(&self) -> &Box<dyn DataClient> {
677        &self.client
678    }
679
680    #[inline]
681    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
682        if let Err(e) = match cmd {
683            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
684            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
685            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
686            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
687            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
688            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
689            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
690            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
691            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
692            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
693            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
694            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
695            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
696            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
697        } {
698            log_command_error(&cmd, &e);
699        }
700    }
701
702    #[cfg(feature = "defi")]
703    #[inline]
704    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
705        if let Err(e) = match cmd {
706            DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
707            DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
708            DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
709            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
710                self.subscribe_pool_liquidity_updates(cmd)
711            }
712        } {
713            log_command_error(&cmd, &e);
714        }
715    }
716
717    #[inline]
718    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
719        if let Err(e) = match cmd {
720            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
721            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
722            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
723            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
724            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
725            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
726            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
727            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
728            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
729            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
730            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
731            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
732            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
733            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
734        } {
735            log_command_error(&cmd, &e);
736        }
737    }
738
739    #[cfg(feature = "defi")]
740    #[inline]
741    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
742        if let Err(e) = match cmd {
743            DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
744            DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
745            DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
746            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
747                self.unsubscribe_pool_liquidity_updates(cmd)
748            }
749        } {
750            log_command_error(&cmd, &e);
751        }
752    }
753
754    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
755
756    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the underlying client subscribe operation fails.
761    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
762        if !self.subscriptions_custom.contains(&cmd.data_type) {
763            self.subscriptions_custom.insert(cmd.data_type.clone());
764            self.client.subscribe(cmd)?;
765        }
766        Ok(())
767    }
768
769    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
770    ///
771    /// # Errors
772    ///
773    /// Returns an error if the underlying client unsubscribe operation fails.
774    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
775        if self.subscriptions_custom.contains(&cmd.data_type) {
776            self.subscriptions_custom.remove(&cmd.data_type);
777            self.client.unsubscribe(cmd)?;
778        }
779        Ok(())
780    }
781
782    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
783    ///
784    /// # Errors
785    ///
786    /// Returns an error if the underlying client subscribe operation fails.
787    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
788        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
789            self.subscriptions_instrument_venue.insert(cmd.venue);
790            self.client.subscribe_instruments(cmd)?;
791        }
792
793        Ok(())
794    }
795
796    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
797    ///
798    /// # Errors
799    ///
800    /// Returns an error if the underlying client unsubscribe operation fails.
801    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
802        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
803            self.subscriptions_instrument_venue.remove(&cmd.venue);
804            self.client.unsubscribe_instruments(cmd)?;
805        }
806
807        Ok(())
808    }
809
810    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
811    ///
812    /// # Errors
813    ///
814    /// Returns an error if the underlying client subscribe operation fails.
815    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
816        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
817            self.subscriptions_instrument.insert(cmd.instrument_id);
818            self.client.subscribe_instrument(cmd)?;
819        }
820
821        Ok(())
822    }
823
824    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
825    ///
826    /// # Errors
827    ///
828    /// Returns an error if the underlying client unsubscribe operation fails.
829    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
830        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
831            self.subscriptions_instrument.remove(&cmd.instrument_id);
832            self.client.unsubscribe_instrument(cmd)?;
833        }
834
835        Ok(())
836    }
837
838    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if the underlying client subscribe operation fails.
843    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
844        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
845            self.subscriptions_book_deltas.insert(cmd.instrument_id);
846            self.client.subscribe_book_deltas(cmd)?;
847        }
848
849        Ok(())
850    }
851
852    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
853    ///
854    /// # Errors
855    ///
856    /// Returns an error if the underlying client unsubscribe operation fails.
857    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
858        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
859            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
860            self.client.unsubscribe_book_deltas(cmd)?;
861        }
862
863        Ok(())
864    }
865
866    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
867    ///
868    /// # Errors
869    ///
870    /// Returns an error if the underlying client subscribe operation fails.
871    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
872        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
873            self.subscriptions_book_depth10.insert(cmd.instrument_id);
874            self.client.subscribe_book_depth10(cmd)?;
875        }
876
877        Ok(())
878    }
879
880    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
881    ///
882    /// # Errors
883    ///
884    /// Returns an error if the underlying client unsubscribe operation fails.
885    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
886        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
887            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
888            self.client.unsubscribe_book_depth10(cmd)?;
889        }
890
891        Ok(())
892    }
893
894    /// Subscribes to book snapshots for an instrument, updating internal state and forwarding to the client.
895    ///
896    /// # Errors
897    ///
898    /// Returns an error if the underlying client subscribe operation fails.
899    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
900        if !self
901            .subscriptions_book_snapshots
902            .contains(&cmd.instrument_id)
903        {
904            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
905            self.client.subscribe_book_snapshots(cmd)?;
906        }
907
908        Ok(())
909    }
910
911    /// Unsubscribes from book snapshots for an instrument, updating internal state and forwarding to the client.
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the underlying client unsubscribe operation fails.
916    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
917        if self
918            .subscriptions_book_snapshots
919            .contains(&cmd.instrument_id)
920        {
921            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
922            self.client.unsubscribe_book_snapshots(cmd)?;
923        }
924
925        Ok(())
926    }
927
928    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
929    ///
930    /// # Errors
931    ///
932    /// Returns an error if the underlying client subscribe operation fails.
933    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
934        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
935            self.subscriptions_quotes.insert(cmd.instrument_id);
936            self.client.subscribe_quotes(cmd)?;
937        }
938        Ok(())
939    }
940
941    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
942    ///
943    /// # Errors
944    ///
945    /// Returns an error if the underlying client unsubscribe operation fails.
946    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
947        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
948            self.subscriptions_quotes.remove(&cmd.instrument_id);
949            self.client.unsubscribe_quotes(cmd)?;
950        }
951        Ok(())
952    }
953
954    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the underlying client subscribe operation fails.
959    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
960        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
961            self.subscriptions_trades.insert(cmd.instrument_id);
962            self.client.subscribe_trades(cmd)?;
963        }
964        Ok(())
965    }
966
967    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
968    ///
969    /// # Errors
970    ///
971    /// Returns an error if the underlying client unsubscribe operation fails.
972    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
973        if self.subscriptions_trades.contains(&cmd.instrument_id) {
974            self.subscriptions_trades.remove(&cmd.instrument_id);
975            self.client.unsubscribe_trades(cmd)?;
976        }
977        Ok(())
978    }
979
980    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
981    ///
982    /// # Errors
983    ///
984    /// Returns an error if the underlying client subscribe operation fails.
985    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
986        if !self.subscriptions_bars.contains(&cmd.bar_type) {
987            self.subscriptions_bars.insert(cmd.bar_type);
988            self.client.subscribe_bars(cmd)?;
989        }
990        Ok(())
991    }
992
993    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
994    ///
995    /// # Errors
996    ///
997    /// Returns an error if the underlying client unsubscribe operation fails.
998    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
999        if self.subscriptions_bars.contains(&cmd.bar_type) {
1000            self.subscriptions_bars.remove(&cmd.bar_type);
1001            self.client.unsubscribe_bars(cmd)?;
1002        }
1003        Ok(())
1004    }
1005
1006    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns an error if the underlying client subscribe operation fails.
1011    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1012        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1013            self.subscriptions_mark_prices.insert(cmd.instrument_id);
1014            self.client.subscribe_mark_prices(cmd)?;
1015        }
1016        Ok(())
1017    }
1018
1019    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns an error if the underlying client unsubscribe operation fails.
1024    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1025        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1026            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1027            self.client.unsubscribe_mark_prices(cmd)?;
1028        }
1029        Ok(())
1030    }
1031
1032    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
1033    ///
1034    /// # Errors
1035    ///
1036    /// Returns an error if the underlying client subscribe operation fails.
1037    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1038        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1039            self.subscriptions_index_prices.insert(cmd.instrument_id);
1040            self.client.subscribe_index_prices(cmd)?;
1041        }
1042        Ok(())
1043    }
1044
1045    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
1046    ///
1047    /// # Errors
1048    ///
1049    /// Returns an error if the underlying client unsubscribe operation fails.
1050    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1051        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1052            self.subscriptions_index_prices.remove(&cmd.instrument_id);
1053            self.client.unsubscribe_index_prices(cmd)?;
1054        }
1055        Ok(())
1056    }
1057
1058    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
1059    ///
1060    /// # Errors
1061    ///
1062    /// Returns an error if the underlying client subscribe operation fails.
1063    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1064        if !self
1065            .subscriptions_funding_rates
1066            .contains(&cmd.instrument_id)
1067        {
1068            self.subscriptions_funding_rates.insert(cmd.instrument_id);
1069            self.client.subscribe_funding_rates(cmd)?;
1070        }
1071        Ok(())
1072    }
1073
1074    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if the underlying client unsubscribe operation fails.
1079    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1080        if self
1081            .subscriptions_funding_rates
1082            .contains(&cmd.instrument_id)
1083        {
1084            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1085            self.client.unsubscribe_funding_rates(cmd)?;
1086        }
1087        Ok(())
1088    }
1089
1090    /// Subscribes to instrument status updates for the specified instrument.
1091    ///
1092    /// # Errors
1093    ///
1094    /// Returns an error if the underlying client subscribe operation fails.
1095    fn subscribe_instrument_status(
1096        &mut self,
1097        cmd: &SubscribeInstrumentStatus,
1098    ) -> anyhow::Result<()> {
1099        if !self
1100            .subscriptions_instrument_status
1101            .contains(&cmd.instrument_id)
1102        {
1103            self.subscriptions_instrument_status
1104                .insert(cmd.instrument_id);
1105            self.client.subscribe_instrument_status(cmd)?;
1106        }
1107        Ok(())
1108    }
1109
1110    /// Unsubscribes from instrument status updates for the specified instrument.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if the underlying client unsubscribe operation fails.
1115    fn unsubscribe_instrument_status(
1116        &mut self,
1117        cmd: &UnsubscribeInstrumentStatus,
1118    ) -> anyhow::Result<()> {
1119        if self
1120            .subscriptions_instrument_status
1121            .contains(&cmd.instrument_id)
1122        {
1123            self.subscriptions_instrument_status
1124                .remove(&cmd.instrument_id);
1125            self.client.unsubscribe_instrument_status(cmd)?;
1126        }
1127        Ok(())
1128    }
1129
1130    /// Subscribes to instrument close events for the specified instrument.
1131    ///
1132    /// # Errors
1133    ///
1134    /// Returns an error if the underlying client subscribe operation fails.
1135    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1136        if !self
1137            .subscriptions_instrument_close
1138            .contains(&cmd.instrument_id)
1139        {
1140            self.subscriptions_instrument_close
1141                .insert(cmd.instrument_id);
1142            self.client.subscribe_instrument_close(cmd)?;
1143        }
1144        Ok(())
1145    }
1146
1147    /// Unsubscribes from instrument close events for the specified instrument.
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns an error if the underlying client unsubscribe operation fails.
1152    fn unsubscribe_instrument_close(
1153        &mut self,
1154        cmd: &UnsubscribeInstrumentClose,
1155    ) -> anyhow::Result<()> {
1156        if self
1157            .subscriptions_instrument_close
1158            .contains(&cmd.instrument_id)
1159        {
1160            self.subscriptions_instrument_close
1161                .remove(&cmd.instrument_id);
1162            self.client.unsubscribe_instrument_close(cmd)?;
1163        }
1164        Ok(())
1165    }
1166
1167    #[cfg(feature = "defi")]
1168    /// Subscribes to block events for the specified blockchain.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the underlying client subscribe operation fails.
1173    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
1174        if !self.subscriptions_blocks.contains(&cmd.chain) {
1175            self.subscriptions_blocks.insert(cmd.chain);
1176            self.client.subscribe_blocks(cmd)?;
1177        }
1178        Ok(())
1179    }
1180
1181    #[cfg(feature = "defi")]
1182    /// Unsubscribes from block events for the specified blockchain.
1183    ///
1184    /// # Errors
1185    ///
1186    /// Returns an error if the underlying client unsubscribe operation fails.
1187    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
1188        if self.subscriptions_blocks.contains(&cmd.chain) {
1189            self.subscriptions_blocks.remove(&cmd.chain);
1190            self.client.unsubscribe_blocks(cmd)?;
1191        }
1192        Ok(())
1193    }
1194
1195    #[cfg(feature = "defi")]
1196    /// Subscribes to pool definition updates for the specified AMM pool.
1197    ///
1198    /// # Errors
1199    ///
1200    /// Returns an error if the underlying client subscribe operation fails.
1201    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
1202        if !self.subscriptions_pools.contains(&cmd.instrument_id) {
1203            self.subscriptions_pools.insert(cmd.instrument_id);
1204            self.client.subscribe_pool(cmd)?;
1205        }
1206        Ok(())
1207    }
1208
1209    #[cfg(feature = "defi")]
1210    /// Subscribes to pool swap events for the specified AMM pool.
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns an error if the underlying client subscribe operation fails.
1215    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
1216        if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1217            self.subscriptions_pool_swaps.insert(cmd.instrument_id);
1218            self.client.subscribe_pool_swaps(cmd)?;
1219        }
1220        Ok(())
1221    }
1222
1223    #[cfg(feature = "defi")]
1224    /// Subscribes to pool liquidity update events for the specified AMM pool.
1225    ///
1226    /// # Errors
1227    ///
1228    /// Returns an error if the underlying client subscribe operation fails.
1229    fn subscribe_pool_liquidity_updates(
1230        &mut self,
1231        cmd: &SubscribePoolLiquidityUpdates,
1232    ) -> anyhow::Result<()> {
1233        if !self
1234            .subscriptions_pool_liquidity_updates
1235            .contains(&cmd.instrument_id)
1236        {
1237            self.subscriptions_pool_liquidity_updates
1238                .insert(cmd.instrument_id);
1239            self.client.subscribe_pool_liquidity_updates(cmd)?;
1240        }
1241        Ok(())
1242    }
1243
1244    #[cfg(feature = "defi")]
1245    /// Unsubscribes from pool definition updates for the specified AMM pool.
1246    ///
1247    /// # Errors
1248    ///
1249    /// Returns an error if the underlying client unsubscribe operation fails.
1250    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
1251        if self.subscriptions_pools.contains(&cmd.instrument_id) {
1252            self.subscriptions_pools.remove(&cmd.instrument_id);
1253            self.client.unsubscribe_pool(cmd)?;
1254        }
1255        Ok(())
1256    }
1257
1258    #[cfg(feature = "defi")]
1259    /// Unsubscribes from swap events for the specified AMM pool.
1260    ///
1261    /// # Errors
1262    ///
1263    /// Returns an error if the underlying client unsubscribe operation fails.
1264    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
1265        if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1266            self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
1267            self.client.unsubscribe_pool_swaps(cmd)?;
1268        }
1269        Ok(())
1270    }
1271
1272    #[cfg(feature = "defi")]
1273    /// Unsubscribes from pool liquidity update events for the specified AMM pool.
1274    ///
1275    /// # Errors
1276    ///
1277    /// Returns an error if the underlying client unsubscribe operation fails.
1278    fn unsubscribe_pool_liquidity_updates(
1279        &mut self,
1280        cmd: &UnsubscribePoolLiquidityUpdates,
1281    ) -> anyhow::Result<()> {
1282        if self
1283            .subscriptions_pool_liquidity_updates
1284            .contains(&cmd.instrument_id)
1285        {
1286            self.subscriptions_pool_liquidity_updates
1287                .remove(&cmd.instrument_id);
1288            self.client.unsubscribe_pool_liquidity_updates(cmd)?;
1289        }
1290        Ok(())
1291    }
1292
1293    // -- REQUEST HANDLERS ------------------------------------------------------------------------
1294
1295    /// Sends a data request to the underlying client.
1296    ///
1297    /// # Errors
1298    ///
1299    /// Returns an error if the client request fails.
1300    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1301        self.client.request_data(req)
1302    }
1303
1304    /// Sends a single instrument request to the client.
1305    ///
1306    /// # Errors
1307    ///
1308    /// Returns an error if the client fails to process the request.
1309    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1310        self.client.request_instrument(req)
1311    }
1312
1313    /// Sends a batch instruments request to the client.
1314    ///
1315    /// # Errors
1316    ///
1317    /// Returns an error if the client fails to process the request.
1318    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1319        self.client.request_instruments(req)
1320    }
1321
1322    /// Sends a quotes request for a given instrument.
1323    ///
1324    /// # Errors
1325    ///
1326    /// Returns an error if the client fails to process the quotes request.
1327    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1328        self.client.request_quotes(req)
1329    }
1330
1331    /// Sends a trades request for a given instrument.
1332    ///
1333    /// # Errors
1334    ///
1335    /// Returns an error if the client fails to process the trades request.
1336    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1337        self.client.request_trades(req)
1338    }
1339
1340    /// Sends a bars request for a given instrument and bar type.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if the client fails to process the bars request.
1345    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1346        self.client.request_bars(req)
1347    }
1348}
1349
1350#[inline(always)]
1351fn log_not_implemented<T: Debug>(msg: &T) {
1352    log::warn!("{msg:?} – handler not implemented");
1353}
1354
1355#[inline(always)]
1356fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1357    log::error!("Error on {cmd:?}: {e}");
1358}