nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Defines the `DataClient` trait, the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    fmt::{Debug, Display},
23    ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use async_trait::async_trait;
28use nautilus_common::messages::data::{
29    RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30    RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31    SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32    SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33    SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34    SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35    UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36    UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37    UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38    UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42    RequestPoolSnapshot, SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects,
43    SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
44    UnsubscribePool, UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
45    UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50    data::{BarType, DataType},
51    identifiers::{ClientId, InstrumentId, Venue},
52};
53
54#[cfg(feature = "defi")]
55#[allow(unused_imports)] // Brings DeFi impl blocks into scope
56use crate::defi::client as _;
57
58/// Defines the interface for a data client, managing connections, subscriptions, and requests.
59///
60/// # Thread safety
61///
62/// Client instances are not intended to be sent across threads. The `?Send` bound
63/// allows implementations to hold non-Send state for any Python interop.
64#[async_trait(?Send)]
65pub trait DataClient {
66    /// Returns the unique identifier for this data client.
67    fn client_id(&self) -> ClientId;
68
69    /// Returns the optional venue this client is associated with.
70    fn venue(&self) -> Option<Venue>;
71
72    /// Starts the data client.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the operation fails.
77    fn start(&mut self) -> anyhow::Result<()>;
78
79    /// Stops the data client.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the operation fails.
84    fn stop(&mut self) -> anyhow::Result<()>;
85
86    /// Resets the data client to its initial state.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the operation fails.
91    fn reset(&mut self) -> anyhow::Result<()>;
92
93    /// Disposes of client resources and cleans up.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the operation fails.
98    fn dispose(&mut self) -> anyhow::Result<()>;
99
100    /// Returns `true` if the client is currently connected.
101    fn is_connected(&self) -> bool;
102
103    /// Returns `true` if the client is currently disconnected.
104    fn is_disconnected(&self) -> bool;
105
106    /// Connects the client to the data provider.
107    ///
108    /// For live clients, this triggers the actual connection to external APIs.
109    /// For backtest clients, this is a no-op.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the connection fails.
114    async fn connect(&mut self) -> anyhow::Result<()> {
115        Ok(())
116    }
117
118    /// Disconnects the client from the data provider.
119    ///
120    /// For live clients, this closes connections to external APIs.
121    /// For backtest clients, this is a no-op.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the disconnection fails.
126    async fn disconnect(&mut self) -> anyhow::Result<()> {
127        Ok(())
128    }
129
130    /// Subscribes to custom data types according to the command.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the subscribe operation fails.
135    fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
136        log_not_implemented(&cmd);
137        Ok(())
138    }
139
140    /// Subscribes to instruments list for the specified venue.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the subscribe operation fails.
145    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
146        log_not_implemented(&cmd);
147        Ok(())
148    }
149
150    /// Subscribes to data for a single instrument.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the subscribe operation fails.
155    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
156        log_not_implemented(&cmd);
157        Ok(())
158    }
159
160    /// Subscribes to order book delta updates for the specified instrument.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the subscribe operation fails.
165    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
166        log_not_implemented(&cmd);
167        Ok(())
168    }
169
170    /// Subscribes to top 10 order book depth updates for the specified instrument.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the subscribe operation fails.
175    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
176        log_not_implemented(&cmd);
177        Ok(())
178    }
179
180    /// Subscribes to periodic order book snapshots for the specified instrument.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the subscribe operation fails.
185    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
186        log_not_implemented(&cmd);
187        Ok(())
188    }
189
190    /// Subscribes to quote updates for the specified instrument.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the subscribe operation fails.
195    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
196        log_not_implemented(&cmd);
197        Ok(())
198    }
199
200    /// Subscribes to trade updates for the specified instrument.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if the subscribe operation fails.
205    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
206        log_not_implemented(&cmd);
207        Ok(())
208    }
209
210    /// Subscribes to mark price updates for the specified instrument.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the subscribe operation fails.
215    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
216        log_not_implemented(&cmd);
217        Ok(())
218    }
219
220    /// Subscribes to index price updates for the specified instrument.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the subscribe operation fails.
225    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
226        log_not_implemented(&cmd);
227        Ok(())
228    }
229
230    /// Subscribes to funding rate updates for the specified instrument.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the subscribe operation fails.
235    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
236        log_not_implemented(&cmd);
237        Ok(())
238    }
239
240    /// Subscribes to bar updates of the specified bar type.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the subscribe operation fails.
245    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
246        log_not_implemented(&cmd);
247        Ok(())
248    }
249
250    /// Subscribes to status updates for the specified instrument.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the subscribe operation fails.
255    fn subscribe_instrument_status(
256        &mut self,
257        cmd: &SubscribeInstrumentStatus,
258    ) -> anyhow::Result<()> {
259        log_not_implemented(&cmd);
260        Ok(())
261    }
262
263    /// Subscribes to instrument close events for the specified instrument.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the subscription operation fails.
268    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
269        log_not_implemented(&cmd);
270        Ok(())
271    }
272
273    #[cfg(feature = "defi")]
274    /// Subscribes to blocks for a specified blockchain.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the subscription operation fails.
279    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
280        log_not_implemented(&cmd);
281        Ok(())
282    }
283
284    #[cfg(feature = "defi")]
285    /// Subscribes to pool definition updates for a specified AMM pool.
286    ///
287    /// # Errors
288    ///
289    /// Returns an error if the subscription operation fails.
290    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
291        log_not_implemented(&cmd);
292        Ok(())
293    }
294
295    #[cfg(feature = "defi")]
296    /// Subscribes to pool swaps for a specified AMM pool.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the subscription operation fails.
301    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
302        log_not_implemented(&cmd);
303        Ok(())
304    }
305
306    #[cfg(feature = "defi")]
307    /// Subscribes to pool liquidity updates for a specified AMM pool.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the subscription operation fails.
312    fn subscribe_pool_liquidity_updates(
313        &mut self,
314        cmd: &SubscribePoolLiquidityUpdates,
315    ) -> anyhow::Result<()> {
316        log_not_implemented(&cmd);
317        Ok(())
318    }
319
320    #[cfg(feature = "defi")]
321    /// Subscribes to pool fee collects for a specified AMM pool.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the subscription operation fails.
326    fn subscribe_pool_fee_collects(
327        &mut self,
328        cmd: &SubscribePoolFeeCollects,
329    ) -> anyhow::Result<()> {
330        log_not_implemented(&cmd);
331        Ok(())
332    }
333
334    #[cfg(feature = "defi")]
335    /// Subscribes to pool flash loan events for a specified AMM pool.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if the subscription operation fails.
340    fn subscribe_pool_flash_events(
341        &mut self,
342        cmd: &SubscribePoolFlashEvents,
343    ) -> anyhow::Result<()> {
344        log_not_implemented(&cmd);
345        Ok(())
346    }
347
348    /// Unsubscribes from custom data types according to the command.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the unsubscribe operation fails.
353    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
354        log_not_implemented(&cmd);
355        Ok(())
356    }
357
358    /// Unsubscribes from instruments list for the specified venue.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if the unsubscribe operation fails.
363    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
364        log_not_implemented(&cmd);
365        Ok(())
366    }
367
368    /// Unsubscribes from data for the specified instrument.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the unsubscribe operation fails.
373    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
374        log_not_implemented(&cmd);
375        Ok(())
376    }
377
378    /// Unsubscribes from order book delta updates for the specified instrument.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the unsubscribe operation fails.
383    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
384        log_not_implemented(&cmd);
385        Ok(())
386    }
387
388    /// Unsubscribes from top 10 order book depth updates for the specified instrument.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the unsubscribe operation fails.
393    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
394        log_not_implemented(&cmd);
395        Ok(())
396    }
397
398    /// Unsubscribes from periodic order book snapshots for the specified instrument.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if the unsubscribe operation fails.
403    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
404        log_not_implemented(&cmd);
405        Ok(())
406    }
407
408    /// Unsubscribes from quote updates for the specified instrument.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if the unsubscribe operation fails.
413    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
414        log_not_implemented(&cmd);
415        Ok(())
416    }
417
418    /// Unsubscribes from trade updates for the specified instrument.
419    ///
420    /// # Errors
421    ///
422    /// Returns an error if the unsubscribe operation fails.
423    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
424        log_not_implemented(&cmd);
425        Ok(())
426    }
427
428    /// Unsubscribes from mark price updates for the specified instrument.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the unsubscribe operation fails.
433    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
434        log_not_implemented(&cmd);
435        Ok(())
436    }
437
438    /// Unsubscribes from index price updates for the specified instrument.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if the unsubscribe operation fails.
443    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
444        log_not_implemented(&cmd);
445        Ok(())
446    }
447
448    /// Unsubscribes from funding rate updates for the specified instrument.
449    ///
450    /// # Errors
451    ///
452    /// Returns an error if the unsubscribe operation fails.
453    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
454        log_not_implemented(&cmd);
455        Ok(())
456    }
457
458    /// Unsubscribes from bar updates of the specified bar type.
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if the unsubscribe operation fails.
463    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
464        log_not_implemented(&cmd);
465        Ok(())
466    }
467
468    /// Unsubscribes from instrument status updates for the specified instrument.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if the unsubscribe operation fails.
473    fn unsubscribe_instrument_status(
474        &mut self,
475        cmd: &UnsubscribeInstrumentStatus,
476    ) -> anyhow::Result<()> {
477        log_not_implemented(&cmd);
478        Ok(())
479    }
480
481    /// Unsubscribes from instrument close events for the specified instrument.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the unsubscribe operation fails.
486    fn unsubscribe_instrument_close(
487        &mut self,
488        cmd: &UnsubscribeInstrumentClose,
489    ) -> anyhow::Result<()> {
490        log_not_implemented(&cmd);
491        Ok(())
492    }
493
494    #[cfg(feature = "defi")]
495    /// Unsubscribes from blocks for a specified blockchain.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if the subscription operation fails.
500    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
501        log_not_implemented(&cmd);
502        Ok(())
503    }
504
505    #[cfg(feature = "defi")]
506    /// Unsubscribes from pool definition updates for a specified AMM pool.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the subscription operation fails.
511    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
512        log_not_implemented(&cmd);
513        Ok(())
514    }
515
516    #[cfg(feature = "defi")]
517    /// Unsubscribes from swaps for a specified AMM pool.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if the subscription operation fails.
522    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
523        log_not_implemented(&cmd);
524        Ok(())
525    }
526
527    #[cfg(feature = "defi")]
528    /// Unsubscribes from pool liquidity updates for a specified AMM pool.
529    ///
530    /// # Errors
531    ///
532    /// Returns an error if the subscription operation fails.
533    fn unsubscribe_pool_liquidity_updates(
534        &mut self,
535        cmd: &UnsubscribePoolLiquidityUpdates,
536    ) -> anyhow::Result<()> {
537        log_not_implemented(&cmd);
538        Ok(())
539    }
540
541    #[cfg(feature = "defi")]
542    /// Unsubscribes from pool fee collects for a specified AMM pool.
543    ///
544    /// # Errors
545    ///
546    /// Returns an error if the subscription operation fails.
547    fn unsubscribe_pool_fee_collects(
548        &mut self,
549        cmd: &UnsubscribePoolFeeCollects,
550    ) -> anyhow::Result<()> {
551        log_not_implemented(&cmd);
552        Ok(())
553    }
554
555    #[cfg(feature = "defi")]
556    /// Unsubscribes from pool flash loan events for a specified AMM pool.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if the subscription operation fails.
561    fn unsubscribe_pool_flash_events(
562        &mut self,
563        cmd: &UnsubscribePoolFlashEvents,
564    ) -> anyhow::Result<()> {
565        log_not_implemented(&cmd);
566        Ok(())
567    }
568
569    /// Sends a custom data request to the provider.
570    ///
571    /// # Errors
572    ///
573    /// Returns an error if the data request fails.
574    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
575        log_not_implemented(&request);
576        Ok(())
577    }
578
579    /// Requests a list of instruments from the provider for a given venue.
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if the instruments request fails.
584    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
585        log_not_implemented(&request);
586        Ok(())
587    }
588
589    /// Requests detailed data for a single instrument.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if the instrument request fails.
594    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
595        log_not_implemented(&request);
596        Ok(())
597    }
598
599    /// Requests a snapshot of the order book for a specified instrument.
600    ///
601    /// # Errors
602    ///
603    /// Returns an error if the book snapshot request fails.
604    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
605        log_not_implemented(&request);
606        Ok(())
607    }
608
609    /// Requests historical or streaming quote data for a specified instrument.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if the quotes request fails.
614    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
615        log_not_implemented(&request);
616        Ok(())
617    }
618
619    /// Requests historical or streaming trade data for a specified instrument.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the trades request fails.
624    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
625        log_not_implemented(&request);
626        Ok(())
627    }
628
629    /// Requests historical or streaming bar data for a specified instrument and bar type.
630    ///
631    /// # Errors
632    ///
633    /// Returns an error if the bars request fails.
634    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
635        log_not_implemented(&request);
636        Ok(())
637    }
638
639    /// Requests historical order book depth data for a specified instrument.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the order book depths request fails.
644    fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
645        log_not_implemented(&request);
646        Ok(())
647    }
648
649    #[cfg(feature = "defi")]
650    /// Requests a snapshot of a specific AMM pool.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if the pool snapshot request fails.
655    fn request_pool_snapshot(&self, request: &RequestPoolSnapshot) -> anyhow::Result<()> {
656        log_not_implemented(&request);
657        Ok(())
658    }
659}
660
661/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
662pub struct DataClientAdapter {
663    pub(crate) client: Box<dyn DataClient>,
664    pub client_id: ClientId,
665    pub venue: Option<Venue>,
666    pub handles_book_deltas: bool,
667    pub handles_book_snapshots: bool,
668    pub subscriptions_custom: AHashSet<DataType>,
669    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
670    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
671    pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
672    pub subscriptions_quotes: AHashSet<InstrumentId>,
673    pub subscriptions_trades: AHashSet<InstrumentId>,
674    pub subscriptions_bars: AHashSet<BarType>,
675    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
676    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
677    pub subscriptions_instrument: AHashSet<InstrumentId>,
678    pub subscriptions_instrument_venue: AHashSet<Venue>,
679    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
680    pub subscriptions_index_prices: AHashSet<InstrumentId>,
681    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
682    #[cfg(feature = "defi")]
683    pub subscriptions_blocks: AHashSet<Blockchain>,
684    #[cfg(feature = "defi")]
685    pub subscriptions_pools: AHashSet<InstrumentId>,
686    #[cfg(feature = "defi")]
687    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
688    #[cfg(feature = "defi")]
689    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
690    #[cfg(feature = "defi")]
691    pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
692    #[cfg(feature = "defi")]
693    pub subscriptions_pool_flash: AHashSet<InstrumentId>,
694}
695
696impl Deref for DataClientAdapter {
697    type Target = Box<dyn DataClient>;
698
699    fn deref(&self) -> &Self::Target {
700        &self.client
701    }
702}
703
704impl DerefMut for DataClientAdapter {
705    fn deref_mut(&mut self) -> &mut Self::Target {
706        &mut self.client
707    }
708}
709
710impl Debug for DataClientAdapter {
711    #[rustfmt::skip]
712    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
713        f.debug_struct(stringify!(DataClientAdapter))
714            .field("client_id", &self.client_id)
715            .field("venue", &self.venue)
716            .field("handles_book_deltas", &self.handles_book_deltas)
717            .field("handles_book_snapshots", &self.handles_book_snapshots)
718            .field("subscriptions_custom", &self.subscriptions_custom)
719            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
720            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
721            .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
722            .field("subscriptions_quotes", &self.subscriptions_quotes)
723            .field("subscriptions_trades", &self.subscriptions_trades)
724            .field("subscriptions_bars", &self.subscriptions_bars)
725            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
726            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
727            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
728            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
729            .field("subscriptions_instrument", &self.subscriptions_instrument)
730            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
731            .finish()
732    }
733}
734
735impl DataClientAdapter {
736    /// Creates a new [`DataClientAdapter`] with the given client and clock.
737    #[must_use]
738    pub fn new(
739        client_id: ClientId,
740        venue: Option<Venue>,
741        handles_order_book_deltas: bool,
742        handles_order_book_snapshots: bool,
743        client: Box<dyn DataClient>,
744    ) -> Self {
745        Self {
746            client,
747            client_id,
748            venue,
749            handles_book_deltas: handles_order_book_deltas,
750            handles_book_snapshots: handles_order_book_snapshots,
751            subscriptions_custom: AHashSet::new(),
752            subscriptions_book_deltas: AHashSet::new(),
753            subscriptions_book_depth10: AHashSet::new(),
754            subscriptions_book_snapshots: AHashSet::new(),
755            subscriptions_quotes: AHashSet::new(),
756            subscriptions_trades: AHashSet::new(),
757            subscriptions_mark_prices: AHashSet::new(),
758            subscriptions_index_prices: AHashSet::new(),
759            subscriptions_funding_rates: AHashSet::new(),
760            subscriptions_bars: AHashSet::new(),
761            subscriptions_instrument_status: AHashSet::new(),
762            subscriptions_instrument_close: AHashSet::new(),
763            subscriptions_instrument: AHashSet::new(),
764            subscriptions_instrument_venue: AHashSet::new(),
765            #[cfg(feature = "defi")]
766            subscriptions_blocks: AHashSet::new(),
767            #[cfg(feature = "defi")]
768            subscriptions_pools: AHashSet::new(),
769            #[cfg(feature = "defi")]
770            subscriptions_pool_swaps: AHashSet::new(),
771            #[cfg(feature = "defi")]
772            subscriptions_pool_liquidity_updates: AHashSet::new(),
773            #[cfg(feature = "defi")]
774            subscriptions_pool_fee_collects: AHashSet::new(),
775            #[cfg(feature = "defi")]
776            subscriptions_pool_flash: AHashSet::new(),
777        }
778    }
779
780    #[allow(clippy::borrowed_box)]
781    #[must_use]
782    pub fn get_client(&self) -> &Box<dyn DataClient> {
783        &self.client
784    }
785
786    /// Connects the underlying client to the data provider.
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the connection fails.
791    pub async fn connect(&mut self) -> anyhow::Result<()> {
792        self.client.connect().await
793    }
794
795    /// Disconnects the underlying client from the data provider.
796    ///
797    /// # Errors
798    ///
799    /// Returns an error if the disconnection fails.
800    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
801        self.client.disconnect().await
802    }
803
804    #[inline]
805    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
806        if let Err(e) = match cmd {
807            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
808            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
809            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
810            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
811            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
812            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
813            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
814            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
815            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
816            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
817            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
818            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
819            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
820            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
821        } {
822            log_command_error(&cmd, &e);
823        }
824    }
825
826    #[inline]
827    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
828        if let Err(e) = match cmd {
829            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
830            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
831            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
832            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
833            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
834            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
835            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
836            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
837            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
838            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
839            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
840            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
841            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
842            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
843        } {
844            log_command_error(&cmd, &e);
845        }
846    }
847
848    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
849
850    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
851    ///
852    /// # Errors
853    ///
854    /// Returns an error if the underlying client subscribe operation fails.
855    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
856        if !self.subscriptions_custom.contains(&cmd.data_type) {
857            self.subscriptions_custom.insert(cmd.data_type.clone());
858            self.client.subscribe(cmd)?;
859        }
860        Ok(())
861    }
862
863    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
864    ///
865    /// # Errors
866    ///
867    /// Returns an error if the underlying client unsubscribe operation fails.
868    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
869        if self.subscriptions_custom.contains(&cmd.data_type) {
870            self.subscriptions_custom.remove(&cmd.data_type);
871            self.client.unsubscribe(cmd)?;
872        }
873        Ok(())
874    }
875
876    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the underlying client subscribe operation fails.
881    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
882        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
883            self.subscriptions_instrument_venue.insert(cmd.venue);
884            self.client.subscribe_instruments(cmd)?;
885        }
886
887        Ok(())
888    }
889
890    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
891    ///
892    /// # Errors
893    ///
894    /// Returns an error if the underlying client unsubscribe operation fails.
895    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
896        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
897            self.subscriptions_instrument_venue.remove(&cmd.venue);
898            self.client.unsubscribe_instruments(cmd)?;
899        }
900
901        Ok(())
902    }
903
904    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
905    ///
906    /// # Errors
907    ///
908    /// Returns an error if the underlying client subscribe operation fails.
909    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
910        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
911            self.subscriptions_instrument.insert(cmd.instrument_id);
912            self.client.subscribe_instrument(cmd)?;
913        }
914
915        Ok(())
916    }
917
918    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
919    ///
920    /// # Errors
921    ///
922    /// Returns an error if the underlying client unsubscribe operation fails.
923    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
924        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
925            self.subscriptions_instrument.remove(&cmd.instrument_id);
926            self.client.unsubscribe_instrument(cmd)?;
927        }
928
929        Ok(())
930    }
931
932    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
933    ///
934    /// # Errors
935    ///
936    /// Returns an error if the underlying client subscribe operation fails.
937    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
938        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
939            self.subscriptions_book_deltas.insert(cmd.instrument_id);
940            self.client.subscribe_book_deltas(cmd)?;
941        }
942
943        Ok(())
944    }
945
946    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
947    ///
948    /// # Errors
949    ///
950    /// Returns an error if the underlying client unsubscribe operation fails.
951    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
952        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
953            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
954            self.client.unsubscribe_book_deltas(cmd)?;
955        }
956
957        Ok(())
958    }
959
960    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the underlying client subscribe operation fails.
965    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
966        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
967            self.subscriptions_book_depth10.insert(cmd.instrument_id);
968            self.client.subscribe_book_depth10(cmd)?;
969        }
970
971        Ok(())
972    }
973
974    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the underlying client unsubscribe operation fails.
979    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
980        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
981            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
982            self.client.unsubscribe_book_depth10(cmd)?;
983        }
984
985        Ok(())
986    }
987
988    /// Subscribes to book snapshots for an instrument, updating internal state and forwarding to the client.
989    ///
990    /// # Errors
991    ///
992    /// Returns an error if the underlying client subscribe operation fails.
993    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
994        if !self
995            .subscriptions_book_snapshots
996            .contains(&cmd.instrument_id)
997        {
998            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
999            self.client.subscribe_book_snapshots(cmd)?;
1000        }
1001
1002        Ok(())
1003    }
1004
1005    /// Unsubscribes from book snapshots for an instrument, updating internal state and forwarding to the client.
1006    ///
1007    /// # Errors
1008    ///
1009    /// Returns an error if the underlying client unsubscribe operation fails.
1010    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1011        if self
1012            .subscriptions_book_snapshots
1013            .contains(&cmd.instrument_id)
1014        {
1015            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
1016            self.client.unsubscribe_book_snapshots(cmd)?;
1017        }
1018
1019        Ok(())
1020    }
1021
1022    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the underlying client subscribe operation fails.
1027    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
1028        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
1029            self.subscriptions_quotes.insert(cmd.instrument_id);
1030            self.client.subscribe_quotes(cmd)?;
1031        }
1032        Ok(())
1033    }
1034
1035    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
1036    ///
1037    /// # Errors
1038    ///
1039    /// Returns an error if the underlying client unsubscribe operation fails.
1040    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1041        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
1042            self.subscriptions_quotes.remove(&cmd.instrument_id);
1043            self.client.unsubscribe_quotes(cmd)?;
1044        }
1045        Ok(())
1046    }
1047
1048    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if the underlying client subscribe operation fails.
1053    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1054        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
1055            self.subscriptions_trades.insert(cmd.instrument_id);
1056            self.client.subscribe_trades(cmd)?;
1057        }
1058        Ok(())
1059    }
1060
1061    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
1062    ///
1063    /// # Errors
1064    ///
1065    /// Returns an error if the underlying client unsubscribe operation fails.
1066    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1067        if self.subscriptions_trades.contains(&cmd.instrument_id) {
1068            self.subscriptions_trades.remove(&cmd.instrument_id);
1069            self.client.unsubscribe_trades(cmd)?;
1070        }
1071        Ok(())
1072    }
1073
1074    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if the underlying client subscribe operation fails.
1079    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1080        if !self.subscriptions_bars.contains(&cmd.bar_type) {
1081            self.subscriptions_bars.insert(cmd.bar_type);
1082            self.client.subscribe_bars(cmd)?;
1083        }
1084        Ok(())
1085    }
1086
1087    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
1088    ///
1089    /// # Errors
1090    ///
1091    /// Returns an error if the underlying client unsubscribe operation fails.
1092    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1093        if self.subscriptions_bars.contains(&cmd.bar_type) {
1094            self.subscriptions_bars.remove(&cmd.bar_type);
1095            self.client.unsubscribe_bars(cmd)?;
1096        }
1097        Ok(())
1098    }
1099
1100    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
1101    ///
1102    /// # Errors
1103    ///
1104    /// Returns an error if the underlying client subscribe operation fails.
1105    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1106        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1107            self.subscriptions_mark_prices.insert(cmd.instrument_id);
1108            self.client.subscribe_mark_prices(cmd)?;
1109        }
1110        Ok(())
1111    }
1112
1113    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
1114    ///
1115    /// # Errors
1116    ///
1117    /// Returns an error if the underlying client unsubscribe operation fails.
1118    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1119        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1120            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1121            self.client.unsubscribe_mark_prices(cmd)?;
1122        }
1123        Ok(())
1124    }
1125
1126    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
1127    ///
1128    /// # Errors
1129    ///
1130    /// Returns an error if the underlying client subscribe operation fails.
1131    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1132        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1133            self.subscriptions_index_prices.insert(cmd.instrument_id);
1134            self.client.subscribe_index_prices(cmd)?;
1135        }
1136        Ok(())
1137    }
1138
1139    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
1140    ///
1141    /// # Errors
1142    ///
1143    /// Returns an error if the underlying client unsubscribe operation fails.
1144    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1145        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1146            self.subscriptions_index_prices.remove(&cmd.instrument_id);
1147            self.client.unsubscribe_index_prices(cmd)?;
1148        }
1149        Ok(())
1150    }
1151
1152    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
1153    ///
1154    /// # Errors
1155    ///
1156    /// Returns an error if the underlying client subscribe operation fails.
1157    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1158        if !self
1159            .subscriptions_funding_rates
1160            .contains(&cmd.instrument_id)
1161        {
1162            self.subscriptions_funding_rates.insert(cmd.instrument_id);
1163            self.client.subscribe_funding_rates(cmd)?;
1164        }
1165        Ok(())
1166    }
1167
1168    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the underlying client unsubscribe operation fails.
1173    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1174        if self
1175            .subscriptions_funding_rates
1176            .contains(&cmd.instrument_id)
1177        {
1178            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1179            self.client.unsubscribe_funding_rates(cmd)?;
1180        }
1181        Ok(())
1182    }
1183
1184    /// Subscribes to instrument status updates for the specified instrument.
1185    ///
1186    /// # Errors
1187    ///
1188    /// Returns an error if the underlying client subscribe operation fails.
1189    fn subscribe_instrument_status(
1190        &mut self,
1191        cmd: &SubscribeInstrumentStatus,
1192    ) -> anyhow::Result<()> {
1193        if !self
1194            .subscriptions_instrument_status
1195            .contains(&cmd.instrument_id)
1196        {
1197            self.subscriptions_instrument_status
1198                .insert(cmd.instrument_id);
1199            self.client.subscribe_instrument_status(cmd)?;
1200        }
1201        Ok(())
1202    }
1203
1204    /// Unsubscribes from instrument status updates for the specified instrument.
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns an error if the underlying client unsubscribe operation fails.
1209    fn unsubscribe_instrument_status(
1210        &mut self,
1211        cmd: &UnsubscribeInstrumentStatus,
1212    ) -> anyhow::Result<()> {
1213        if self
1214            .subscriptions_instrument_status
1215            .contains(&cmd.instrument_id)
1216        {
1217            self.subscriptions_instrument_status
1218                .remove(&cmd.instrument_id);
1219            self.client.unsubscribe_instrument_status(cmd)?;
1220        }
1221        Ok(())
1222    }
1223
1224    /// Subscribes to instrument close events for the specified instrument.
1225    ///
1226    /// # Errors
1227    ///
1228    /// Returns an error if the underlying client subscribe operation fails.
1229    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1230        if !self
1231            .subscriptions_instrument_close
1232            .contains(&cmd.instrument_id)
1233        {
1234            self.subscriptions_instrument_close
1235                .insert(cmd.instrument_id);
1236            self.client.subscribe_instrument_close(cmd)?;
1237        }
1238        Ok(())
1239    }
1240
1241    /// Unsubscribes from instrument close events for the specified instrument.
1242    ///
1243    /// # Errors
1244    ///
1245    /// Returns an error if the underlying client unsubscribe operation fails.
1246    fn unsubscribe_instrument_close(
1247        &mut self,
1248        cmd: &UnsubscribeInstrumentClose,
1249    ) -> anyhow::Result<()> {
1250        if self
1251            .subscriptions_instrument_close
1252            .contains(&cmd.instrument_id)
1253        {
1254            self.subscriptions_instrument_close
1255                .remove(&cmd.instrument_id);
1256            self.client.unsubscribe_instrument_close(cmd)?;
1257        }
1258        Ok(())
1259    }
1260
1261    // -- REQUEST HANDLERS ------------------------------------------------------------------------
1262
1263    /// Sends a data request to the underlying client.
1264    ///
1265    /// # Errors
1266    ///
1267    /// Returns an error if the client request fails.
1268    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1269        self.client.request_data(req)
1270    }
1271
1272    /// Sends a single instrument request to the client.
1273    ///
1274    /// # Errors
1275    ///
1276    /// Returns an error if the client fails to process the request.
1277    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1278        self.client.request_instrument(req)
1279    }
1280
1281    /// Sends a batch instruments request to the client.
1282    ///
1283    /// # Errors
1284    ///
1285    /// Returns an error if the client fails to process the request.
1286    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1287        self.client.request_instruments(req)
1288    }
1289
1290    /// Sends a quotes request for a given instrument.
1291    ///
1292    /// # Errors
1293    ///
1294    /// Returns an error if the client fails to process the quotes request.
1295    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1296        self.client.request_quotes(req)
1297    }
1298
1299    /// Sends a trades request for a given instrument.
1300    ///
1301    /// # Errors
1302    ///
1303    /// Returns an error if the client fails to process the trades request.
1304    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1305        self.client.request_trades(req)
1306    }
1307
1308    /// Sends a bars request for a given instrument and bar type.
1309    ///
1310    /// # Errors
1311    ///
1312    /// Returns an error if the client fails to process the bars request.
1313    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1314        self.client.request_bars(req)
1315    }
1316
1317    /// Sends an order book depths request for a given instrument.
1318    ///
1319    /// # Errors
1320    ///
1321    /// Returns an error if the client fails to process the order book depths request.
1322    pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1323        self.client.request_book_depth(req)
1324    }
1325}
1326
1327#[inline(always)]
1328fn log_not_implemented<T: Debug>(msg: &T) {
1329    log::warn!("{msg:?} – handler not implemented");
1330}
1331
1332#[inline(always)]
1333fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1334    log::error!("Error on {cmd:?}: {e}");
1335}