nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Provides the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    fmt::Debug,
23    ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use nautilus_common::{
28    clients::{DataClient, log_command_error},
29    messages::data::{
30        RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
31        RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
32        SubscribeBookDepth10, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
33        SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
34        SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
35        SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
36        UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
37        UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
38        UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39    },
40};
41#[cfg(feature = "defi")]
42use nautilus_model::defi::Blockchain;
43use nautilus_model::{
44    data::{BarType, DataType},
45    identifiers::{ClientId, InstrumentId, Venue},
46};
47
48#[cfg(feature = "defi")]
49#[allow(unused_imports)] // Brings DeFi impl blocks into scope
50use crate::defi::client as _;
51
52/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
53pub struct DataClientAdapter {
54    pub(crate) client: Box<dyn DataClient>,
55    pub client_id: ClientId,
56    pub venue: Option<Venue>,
57    pub handles_book_deltas: bool,
58    pub handles_book_snapshots: bool,
59    pub subscriptions_custom: AHashSet<DataType>,
60    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
61    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
62    pub subscriptions_quotes: AHashSet<InstrumentId>,
63    pub subscriptions_trades: AHashSet<InstrumentId>,
64    pub subscriptions_bars: AHashSet<BarType>,
65    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
66    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
67    pub subscriptions_instrument: AHashSet<InstrumentId>,
68    pub subscriptions_instrument_venue: AHashSet<Venue>,
69    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
70    pub subscriptions_index_prices: AHashSet<InstrumentId>,
71    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
72    #[cfg(feature = "defi")]
73    pub subscriptions_blocks: AHashSet<Blockchain>,
74    #[cfg(feature = "defi")]
75    pub subscriptions_pools: AHashSet<InstrumentId>,
76    #[cfg(feature = "defi")]
77    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
78    #[cfg(feature = "defi")]
79    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
80    #[cfg(feature = "defi")]
81    pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
82    #[cfg(feature = "defi")]
83    pub subscriptions_pool_flash: AHashSet<InstrumentId>,
84}
85
86impl Deref for DataClientAdapter {
87    type Target = Box<dyn DataClient>;
88
89    fn deref(&self) -> &Self::Target {
90        &self.client
91    }
92}
93
94impl DerefMut for DataClientAdapter {
95    fn deref_mut(&mut self) -> &mut Self::Target {
96        &mut self.client
97    }
98}
99
100impl Debug for DataClientAdapter {
101    #[rustfmt::skip]
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct(stringify!(DataClientAdapter))
104            .field("client_id", &self.client_id)
105            .field("venue", &self.venue)
106            .field("handles_book_deltas", &self.handles_book_deltas)
107            .field("handles_book_snapshots", &self.handles_book_snapshots)
108            .field("subscriptions_custom", &self.subscriptions_custom)
109            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
110            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
111            .field("subscriptions_quotes", &self.subscriptions_quotes)
112            .field("subscriptions_trades", &self.subscriptions_trades)
113            .field("subscriptions_bars", &self.subscriptions_bars)
114            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
115            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
116            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
117            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
118            .field("subscriptions_instrument", &self.subscriptions_instrument)
119            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
120            .finish()
121    }
122}
123
124impl DataClientAdapter {
125    /// Creates a new [`DataClientAdapter`] with the given client and clock.
126    #[must_use]
127    pub fn new(
128        client_id: ClientId,
129        venue: Option<Venue>,
130        handles_order_book_deltas: bool,
131        handles_order_book_snapshots: bool,
132        client: Box<dyn DataClient>,
133    ) -> Self {
134        Self {
135            client,
136            client_id,
137            venue,
138            handles_book_deltas: handles_order_book_deltas,
139            handles_book_snapshots: handles_order_book_snapshots,
140            subscriptions_custom: AHashSet::new(),
141            subscriptions_book_deltas: AHashSet::new(),
142            subscriptions_book_depth10: AHashSet::new(),
143            subscriptions_quotes: AHashSet::new(),
144            subscriptions_trades: AHashSet::new(),
145            subscriptions_mark_prices: AHashSet::new(),
146            subscriptions_index_prices: AHashSet::new(),
147            subscriptions_funding_rates: AHashSet::new(),
148            subscriptions_bars: AHashSet::new(),
149            subscriptions_instrument_status: AHashSet::new(),
150            subscriptions_instrument_close: AHashSet::new(),
151            subscriptions_instrument: AHashSet::new(),
152            subscriptions_instrument_venue: AHashSet::new(),
153            #[cfg(feature = "defi")]
154            subscriptions_blocks: AHashSet::new(),
155            #[cfg(feature = "defi")]
156            subscriptions_pools: AHashSet::new(),
157            #[cfg(feature = "defi")]
158            subscriptions_pool_swaps: AHashSet::new(),
159            #[cfg(feature = "defi")]
160            subscriptions_pool_liquidity_updates: AHashSet::new(),
161            #[cfg(feature = "defi")]
162            subscriptions_pool_fee_collects: AHashSet::new(),
163            #[cfg(feature = "defi")]
164            subscriptions_pool_flash: AHashSet::new(),
165        }
166    }
167
168    #[allow(clippy::borrowed_box)]
169    #[must_use]
170    pub fn get_client(&self) -> &Box<dyn DataClient> {
171        &self.client
172    }
173
174    /// Connects the underlying client to the data provider.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if the connection fails.
179    pub async fn connect(&mut self) -> anyhow::Result<()> {
180        self.client.connect().await
181    }
182
183    /// Disconnects the underlying client from the data provider.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the disconnection fails.
188    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
189        self.client.disconnect().await
190    }
191
192    #[inline]
193    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
194        if let Err(e) = match cmd {
195            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
196            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
197            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
198            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
199            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
200            SubscribeCommand::BookSnapshots(_) => Ok(()), // Handled internally by engine
201            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
202            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
203            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
204            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
205            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
206            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
207            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
208            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
209        } {
210            log_command_error(&cmd, &e);
211        }
212    }
213
214    #[inline]
215    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
216        if let Err(e) = match cmd {
217            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
218            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
219            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
220            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
221            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
222            UnsubscribeCommand::BookSnapshots(_) => Ok(()), // Handled internally by engine
223            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
224            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
225            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
226            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
227            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
228            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
229            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
230            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
231        } {
232            log_command_error(&cmd, &e);
233        }
234    }
235
236    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
237
238    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if the underlying client subscribe operation fails.
243    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
244        if !self.subscriptions_custom.contains(&cmd.data_type) {
245            self.subscriptions_custom.insert(cmd.data_type.clone());
246            self.client.subscribe(cmd)?;
247        }
248        Ok(())
249    }
250
251    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if the underlying client unsubscribe operation fails.
256    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
257        if self.subscriptions_custom.contains(&cmd.data_type) {
258            self.subscriptions_custom.remove(&cmd.data_type);
259            self.client.unsubscribe(cmd)?;
260        }
261        Ok(())
262    }
263
264    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the underlying client subscribe operation fails.
269    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
270        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
271            self.subscriptions_instrument_venue.insert(cmd.venue);
272            self.client.subscribe_instruments(cmd)?;
273        }
274
275        Ok(())
276    }
277
278    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the underlying client unsubscribe operation fails.
283    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
284        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
285            self.subscriptions_instrument_venue.remove(&cmd.venue);
286            self.client.unsubscribe_instruments(cmd)?;
287        }
288
289        Ok(())
290    }
291
292    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the underlying client subscribe operation fails.
297    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
298        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
299            self.subscriptions_instrument.insert(cmd.instrument_id);
300            self.client.subscribe_instrument(cmd)?;
301        }
302
303        Ok(())
304    }
305
306    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the underlying client unsubscribe operation fails.
311    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
312        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
313            self.subscriptions_instrument.remove(&cmd.instrument_id);
314            self.client.unsubscribe_instrument(cmd)?;
315        }
316
317        Ok(())
318    }
319
320    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the underlying client subscribe operation fails.
325    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
326        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
327            self.subscriptions_book_deltas.insert(cmd.instrument_id);
328            self.client.subscribe_book_deltas(cmd)?;
329        }
330
331        Ok(())
332    }
333
334    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the underlying client unsubscribe operation fails.
339    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
340        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
341            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
342            self.client.unsubscribe_book_deltas(cmd)?;
343        }
344
345        Ok(())
346    }
347
348    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the underlying client subscribe operation fails.
353    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
354        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
355            self.subscriptions_book_depth10.insert(cmd.instrument_id);
356            self.client.subscribe_book_depth10(cmd)?;
357        }
358
359        Ok(())
360    }
361
362    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if the underlying client unsubscribe operation fails.
367    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
368        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
369            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
370            self.client.unsubscribe_book_depth10(cmd)?;
371        }
372
373        Ok(())
374    }
375
376    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if the underlying client subscribe operation fails.
381    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
382        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
383            self.subscriptions_quotes.insert(cmd.instrument_id);
384            self.client.subscribe_quotes(cmd)?;
385        }
386        Ok(())
387    }
388
389    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the underlying client unsubscribe operation fails.
394    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
395        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
396            self.subscriptions_quotes.remove(&cmd.instrument_id);
397            self.client.unsubscribe_quotes(cmd)?;
398        }
399        Ok(())
400    }
401
402    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the underlying client subscribe operation fails.
407    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
408        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
409            self.subscriptions_trades.insert(cmd.instrument_id);
410            self.client.subscribe_trades(cmd)?;
411        }
412        Ok(())
413    }
414
415    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the underlying client unsubscribe operation fails.
420    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
421        if self.subscriptions_trades.contains(&cmd.instrument_id) {
422            self.subscriptions_trades.remove(&cmd.instrument_id);
423            self.client.unsubscribe_trades(cmd)?;
424        }
425        Ok(())
426    }
427
428    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the underlying client subscribe operation fails.
433    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
434        if !self.subscriptions_bars.contains(&cmd.bar_type) {
435            self.subscriptions_bars.insert(cmd.bar_type);
436            self.client.subscribe_bars(cmd)?;
437        }
438        Ok(())
439    }
440
441    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if the underlying client unsubscribe operation fails.
446    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
447        if self.subscriptions_bars.contains(&cmd.bar_type) {
448            self.subscriptions_bars.remove(&cmd.bar_type);
449            self.client.unsubscribe_bars(cmd)?;
450        }
451        Ok(())
452    }
453
454    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
455    ///
456    /// # Errors
457    ///
458    /// Returns an error if the underlying client subscribe operation fails.
459    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
460        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
461            self.subscriptions_mark_prices.insert(cmd.instrument_id);
462            self.client.subscribe_mark_prices(cmd)?;
463        }
464        Ok(())
465    }
466
467    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if the underlying client unsubscribe operation fails.
472    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
473        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
474            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
475            self.client.unsubscribe_mark_prices(cmd)?;
476        }
477        Ok(())
478    }
479
480    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the underlying client subscribe operation fails.
485    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
486        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
487            self.subscriptions_index_prices.insert(cmd.instrument_id);
488            self.client.subscribe_index_prices(cmd)?;
489        }
490        Ok(())
491    }
492
493    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if the underlying client unsubscribe operation fails.
498    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
499        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
500            self.subscriptions_index_prices.remove(&cmd.instrument_id);
501            self.client.unsubscribe_index_prices(cmd)?;
502        }
503        Ok(())
504    }
505
506    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the underlying client subscribe operation fails.
511    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
512        if !self
513            .subscriptions_funding_rates
514            .contains(&cmd.instrument_id)
515        {
516            self.subscriptions_funding_rates.insert(cmd.instrument_id);
517            self.client.subscribe_funding_rates(cmd)?;
518        }
519        Ok(())
520    }
521
522    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
523    ///
524    /// # Errors
525    ///
526    /// Returns an error if the underlying client unsubscribe operation fails.
527    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
528        if self
529            .subscriptions_funding_rates
530            .contains(&cmd.instrument_id)
531        {
532            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
533            self.client.unsubscribe_funding_rates(cmd)?;
534        }
535        Ok(())
536    }
537
538    /// Subscribes to instrument status updates for the specified instrument.
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if the underlying client subscribe operation fails.
543    fn subscribe_instrument_status(
544        &mut self,
545        cmd: &SubscribeInstrumentStatus,
546    ) -> anyhow::Result<()> {
547        if !self
548            .subscriptions_instrument_status
549            .contains(&cmd.instrument_id)
550        {
551            self.subscriptions_instrument_status
552                .insert(cmd.instrument_id);
553            self.client.subscribe_instrument_status(cmd)?;
554        }
555        Ok(())
556    }
557
558    /// Unsubscribes from instrument status updates for the specified instrument.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if the underlying client unsubscribe operation fails.
563    fn unsubscribe_instrument_status(
564        &mut self,
565        cmd: &UnsubscribeInstrumentStatus,
566    ) -> anyhow::Result<()> {
567        if self
568            .subscriptions_instrument_status
569            .contains(&cmd.instrument_id)
570        {
571            self.subscriptions_instrument_status
572                .remove(&cmd.instrument_id);
573            self.client.unsubscribe_instrument_status(cmd)?;
574        }
575        Ok(())
576    }
577
578    /// Subscribes to instrument close events for the specified instrument.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if the underlying client subscribe operation fails.
583    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
584        if !self
585            .subscriptions_instrument_close
586            .contains(&cmd.instrument_id)
587        {
588            self.subscriptions_instrument_close
589                .insert(cmd.instrument_id);
590            self.client.subscribe_instrument_close(cmd)?;
591        }
592        Ok(())
593    }
594
595    /// Unsubscribes from instrument close events for the specified instrument.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if the underlying client unsubscribe operation fails.
600    fn unsubscribe_instrument_close(
601        &mut self,
602        cmd: &UnsubscribeInstrumentClose,
603    ) -> anyhow::Result<()> {
604        if self
605            .subscriptions_instrument_close
606            .contains(&cmd.instrument_id)
607        {
608            self.subscriptions_instrument_close
609                .remove(&cmd.instrument_id);
610            self.client.unsubscribe_instrument_close(cmd)?;
611        }
612        Ok(())
613    }
614
615    // -- REQUEST HANDLERS ------------------------------------------------------------------------
616
617    /// Sends a data request to the underlying client.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if the client request fails.
622    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
623        self.client.request_data(req)
624    }
625
626    /// Sends a single instrument request to the client.
627    ///
628    /// # Errors
629    ///
630    /// Returns an error if the client fails to process the request.
631    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
632        self.client.request_instrument(req)
633    }
634
635    /// Sends a batch instruments request to the client.
636    ///
637    /// # Errors
638    ///
639    /// Returns an error if the client fails to process the request.
640    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
641        self.client.request_instruments(req)
642    }
643
644    /// Sends a book snapshot request for a given instrument.
645    ///
646    /// # Errors
647    ///
648    /// Returns an error if the client fails to process the book snapshot request.
649    pub fn request_book_snapshot(&self, req: &RequestBookSnapshot) -> anyhow::Result<()> {
650        self.client.request_book_snapshot(req)
651    }
652
653    /// Sends a quotes request for a given instrument.
654    ///
655    /// # Errors
656    ///
657    /// Returns an error if the client fails to process the quotes request.
658    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
659        self.client.request_quotes(req)
660    }
661
662    /// Sends a trades request for a given instrument.
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the client fails to process the trades request.
667    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
668        self.client.request_trades(req)
669    }
670
671    /// Sends a bars request for a given instrument and bar type.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if the client fails to process the bars request.
676    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
677        self.client.request_bars(req)
678    }
679
680    /// Sends an order book depths request for a given instrument.
681    ///
682    /// # Errors
683    ///
684    /// Returns an error if the client fails to process the order book depths request.
685    pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
686        self.client.request_book_depth(req)
687    }
688}