1use std::{
22 any::Any,
23 fmt::{Debug, Display},
24 ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29 RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38 UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42 RequestPoolSnapshot, SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects,
43 SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
44 UnsubscribePool, UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
45 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50 data::{BarType, DataType},
51 identifiers::{ClientId, InstrumentId, Venue},
52};
53
54#[cfg(feature = "defi")]
55#[allow(unused_imports)] use crate::defi::client as _;
57
58#[async_trait::async_trait]
60pub trait DataClient: Any + Sync + Send {
61 fn client_id(&self) -> ClientId;
63
64 fn venue(&self) -> Option<Venue>;
66
67 fn start(&mut self) -> anyhow::Result<()>;
73
74 fn stop(&mut self) -> anyhow::Result<()>;
80
81 fn reset(&mut self) -> anyhow::Result<()>;
87
88 fn dispose(&mut self) -> anyhow::Result<()>;
94
95 async fn connect(&mut self) -> anyhow::Result<()>;
101
102 async fn disconnect(&mut self) -> anyhow::Result<()>;
108
109 fn is_connected(&self) -> bool;
111
112 fn is_disconnected(&self) -> bool;
114
115 fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
121 log_not_implemented(&cmd);
122 Ok(())
123 }
124
125 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
131 log_not_implemented(&cmd);
132 Ok(())
133 }
134
135 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
141 log_not_implemented(&cmd);
142 Ok(())
143 }
144
145 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
151 log_not_implemented(&cmd);
152 Ok(())
153 }
154
155 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
161 log_not_implemented(&cmd);
162 Ok(())
163 }
164
165 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
171 log_not_implemented(&cmd);
172 Ok(())
173 }
174
175 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
181 log_not_implemented(&cmd);
182 Ok(())
183 }
184
185 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
191 log_not_implemented(&cmd);
192 Ok(())
193 }
194
195 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
201 log_not_implemented(&cmd);
202 Ok(())
203 }
204
205 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
211 log_not_implemented(&cmd);
212 Ok(())
213 }
214
215 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
221 log_not_implemented(&cmd);
222 Ok(())
223 }
224
225 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
231 log_not_implemented(&cmd);
232 Ok(())
233 }
234
235 fn subscribe_instrument_status(
241 &mut self,
242 cmd: &SubscribeInstrumentStatus,
243 ) -> anyhow::Result<()> {
244 log_not_implemented(&cmd);
245 Ok(())
246 }
247
248 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
254 log_not_implemented(&cmd);
255 Ok(())
256 }
257
258 #[cfg(feature = "defi")]
259 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
265 log_not_implemented(&cmd);
266 Ok(())
267 }
268
269 #[cfg(feature = "defi")]
270 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
276 log_not_implemented(&cmd);
277 Ok(())
278 }
279
280 #[cfg(feature = "defi")]
281 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
287 log_not_implemented(&cmd);
288 Ok(())
289 }
290
291 #[cfg(feature = "defi")]
292 fn subscribe_pool_liquidity_updates(
298 &mut self,
299 cmd: &SubscribePoolLiquidityUpdates,
300 ) -> anyhow::Result<()> {
301 log_not_implemented(&cmd);
302 Ok(())
303 }
304
305 #[cfg(feature = "defi")]
306 fn subscribe_pool_fee_collects(
312 &mut self,
313 cmd: &SubscribePoolFeeCollects,
314 ) -> anyhow::Result<()> {
315 log_not_implemented(&cmd);
316 Ok(())
317 }
318
319 #[cfg(feature = "defi")]
320 fn subscribe_pool_flash_events(
326 &mut self,
327 cmd: &SubscribePoolFlashEvents,
328 ) -> anyhow::Result<()> {
329 log_not_implemented(&cmd);
330 Ok(())
331 }
332
333 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
339 log_not_implemented(&cmd);
340 Ok(())
341 }
342
343 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
349 log_not_implemented(&cmd);
350 Ok(())
351 }
352
353 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
359 log_not_implemented(&cmd);
360 Ok(())
361 }
362
363 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
369 log_not_implemented(&cmd);
370 Ok(())
371 }
372
373 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
379 log_not_implemented(&cmd);
380 Ok(())
381 }
382
383 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
389 log_not_implemented(&cmd);
390 Ok(())
391 }
392
393 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
399 log_not_implemented(&cmd);
400 Ok(())
401 }
402
403 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
409 log_not_implemented(&cmd);
410 Ok(())
411 }
412
413 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
419 log_not_implemented(&cmd);
420 Ok(())
421 }
422
423 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
429 log_not_implemented(&cmd);
430 Ok(())
431 }
432
433 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
439 log_not_implemented(&cmd);
440 Ok(())
441 }
442
443 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
449 log_not_implemented(&cmd);
450 Ok(())
451 }
452
453 fn unsubscribe_instrument_status(
459 &mut self,
460 cmd: &UnsubscribeInstrumentStatus,
461 ) -> anyhow::Result<()> {
462 log_not_implemented(&cmd);
463 Ok(())
464 }
465
466 fn unsubscribe_instrument_close(
472 &mut self,
473 cmd: &UnsubscribeInstrumentClose,
474 ) -> anyhow::Result<()> {
475 log_not_implemented(&cmd);
476 Ok(())
477 }
478
479 #[cfg(feature = "defi")]
480 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
486 log_not_implemented(&cmd);
487 Ok(())
488 }
489
490 #[cfg(feature = "defi")]
491 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
497 log_not_implemented(&cmd);
498 Ok(())
499 }
500
501 #[cfg(feature = "defi")]
502 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
508 log_not_implemented(&cmd);
509 Ok(())
510 }
511
512 #[cfg(feature = "defi")]
513 fn unsubscribe_pool_liquidity_updates(
519 &mut self,
520 cmd: &UnsubscribePoolLiquidityUpdates,
521 ) -> anyhow::Result<()> {
522 log_not_implemented(&cmd);
523 Ok(())
524 }
525
526 #[cfg(feature = "defi")]
527 fn unsubscribe_pool_fee_collects(
533 &mut self,
534 cmd: &UnsubscribePoolFeeCollects,
535 ) -> anyhow::Result<()> {
536 log_not_implemented(&cmd);
537 Ok(())
538 }
539
540 #[cfg(feature = "defi")]
541 fn unsubscribe_pool_flash_events(
547 &mut self,
548 cmd: &UnsubscribePoolFlashEvents,
549 ) -> anyhow::Result<()> {
550 log_not_implemented(&cmd);
551 Ok(())
552 }
553
554 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
560 log_not_implemented(&request);
561 Ok(())
562 }
563
564 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
570 log_not_implemented(&request);
571 Ok(())
572 }
573
574 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
580 log_not_implemented(&request);
581 Ok(())
582 }
583
584 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
590 log_not_implemented(&request);
591 Ok(())
592 }
593
594 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
600 log_not_implemented(&request);
601 Ok(())
602 }
603
604 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
610 log_not_implemented(&request);
611 Ok(())
612 }
613
614 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
620 log_not_implemented(&request);
621 Ok(())
622 }
623
624 fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
630 log_not_implemented(&request);
631 Ok(())
632 }
633
634 #[cfg(feature = "defi")]
635 fn request_pool_snapshot(&self, request: &RequestPoolSnapshot) -> anyhow::Result<()> {
641 log_not_implemented(&request);
642 Ok(())
643 }
644}
645
646pub struct DataClientAdapter {
648 pub(crate) client: Box<dyn DataClient>,
649 pub client_id: ClientId,
650 pub venue: Option<Venue>,
651 pub handles_book_deltas: bool,
652 pub handles_book_snapshots: bool,
653 pub subscriptions_custom: AHashSet<DataType>,
654 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
655 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
656 pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
657 pub subscriptions_quotes: AHashSet<InstrumentId>,
658 pub subscriptions_trades: AHashSet<InstrumentId>,
659 pub subscriptions_bars: AHashSet<BarType>,
660 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
661 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
662 pub subscriptions_instrument: AHashSet<InstrumentId>,
663 pub subscriptions_instrument_venue: AHashSet<Venue>,
664 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
665 pub subscriptions_index_prices: AHashSet<InstrumentId>,
666 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
667 #[cfg(feature = "defi")]
668 pub subscriptions_blocks: AHashSet<Blockchain>,
669 #[cfg(feature = "defi")]
670 pub subscriptions_pools: AHashSet<InstrumentId>,
671 #[cfg(feature = "defi")]
672 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
673 #[cfg(feature = "defi")]
674 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
675 #[cfg(feature = "defi")]
676 pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
677 #[cfg(feature = "defi")]
678 pub subscriptions_pool_flash: AHashSet<InstrumentId>,
679}
680
681impl Deref for DataClientAdapter {
682 type Target = Box<dyn DataClient>;
683
684 fn deref(&self) -> &Self::Target {
685 &self.client
686 }
687}
688
689impl DerefMut for DataClientAdapter {
690 fn deref_mut(&mut self) -> &mut Self::Target {
691 &mut self.client
692 }
693}
694
695impl Debug for DataClientAdapter {
696 #[rustfmt::skip]
697 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
698 f.debug_struct(stringify!(DataClientAdapter))
699 .field("client_id", &self.client_id)
700 .field("venue", &self.venue)
701 .field("handles_book_deltas", &self.handles_book_deltas)
702 .field("handles_book_snapshots", &self.handles_book_snapshots)
703 .field("subscriptions_custom", &self.subscriptions_custom)
704 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
705 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
706 .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
707 .field("subscriptions_quotes", &self.subscriptions_quotes)
708 .field("subscriptions_trades", &self.subscriptions_trades)
709 .field("subscriptions_bars", &self.subscriptions_bars)
710 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
711 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
712 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
713 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
714 .field("subscriptions_instrument", &self.subscriptions_instrument)
715 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
716 .finish()
717 }
718}
719
720impl DataClientAdapter {
721 #[must_use]
723 pub fn new(
724 client_id: ClientId,
725 venue: Option<Venue>,
726 handles_order_book_deltas: bool,
727 handles_order_book_snapshots: bool,
728 client: Box<dyn DataClient>,
729 ) -> Self {
730 Self {
731 client,
732 client_id,
733 venue,
734 handles_book_deltas: handles_order_book_deltas,
735 handles_book_snapshots: handles_order_book_snapshots,
736 subscriptions_custom: AHashSet::new(),
737 subscriptions_book_deltas: AHashSet::new(),
738 subscriptions_book_depth10: AHashSet::new(),
739 subscriptions_book_snapshots: AHashSet::new(),
740 subscriptions_quotes: AHashSet::new(),
741 subscriptions_trades: AHashSet::new(),
742 subscriptions_mark_prices: AHashSet::new(),
743 subscriptions_index_prices: AHashSet::new(),
744 subscriptions_funding_rates: AHashSet::new(),
745 subscriptions_bars: AHashSet::new(),
746 subscriptions_instrument_status: AHashSet::new(),
747 subscriptions_instrument_close: AHashSet::new(),
748 subscriptions_instrument: AHashSet::new(),
749 subscriptions_instrument_venue: AHashSet::new(),
750 #[cfg(feature = "defi")]
751 subscriptions_blocks: AHashSet::new(),
752 #[cfg(feature = "defi")]
753 subscriptions_pools: AHashSet::new(),
754 #[cfg(feature = "defi")]
755 subscriptions_pool_swaps: AHashSet::new(),
756 #[cfg(feature = "defi")]
757 subscriptions_pool_liquidity_updates: AHashSet::new(),
758 #[cfg(feature = "defi")]
759 subscriptions_pool_fee_collects: AHashSet::new(),
760 #[cfg(feature = "defi")]
761 subscriptions_pool_flash: AHashSet::new(),
762 }
763 }
764
765 #[allow(clippy::borrowed_box)]
766 #[must_use]
767 pub fn get_client(&self) -> &Box<dyn DataClient> {
768 &self.client
769 }
770
771 #[inline]
772 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
773 if let Err(e) = match cmd {
774 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
775 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
776 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
777 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
778 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
779 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
780 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
781 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
782 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
783 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
784 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
785 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
786 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
787 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
788 } {
789 log_command_error(&cmd, &e);
790 }
791 }
792
793 #[inline]
794 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
795 if let Err(e) = match cmd {
796 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
797 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
798 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
799 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
800 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
801 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
802 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
803 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
804 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
805 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
806 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
807 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
808 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
809 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
810 } {
811 log_command_error(&cmd, &e);
812 }
813 }
814
815 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
823 if !self.subscriptions_custom.contains(&cmd.data_type) {
824 self.subscriptions_custom.insert(cmd.data_type.clone());
825 self.client.subscribe(cmd)?;
826 }
827 Ok(())
828 }
829
830 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
836 if self.subscriptions_custom.contains(&cmd.data_type) {
837 self.subscriptions_custom.remove(&cmd.data_type);
838 self.client.unsubscribe(cmd)?;
839 }
840 Ok(())
841 }
842
843 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
849 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
850 self.subscriptions_instrument_venue.insert(cmd.venue);
851 self.client.subscribe_instruments(cmd)?;
852 }
853
854 Ok(())
855 }
856
857 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
863 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
864 self.subscriptions_instrument_venue.remove(&cmd.venue);
865 self.client.unsubscribe_instruments(cmd)?;
866 }
867
868 Ok(())
869 }
870
871 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
877 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
878 self.subscriptions_instrument.insert(cmd.instrument_id);
879 self.client.subscribe_instrument(cmd)?;
880 }
881
882 Ok(())
883 }
884
885 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
891 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
892 self.subscriptions_instrument.remove(&cmd.instrument_id);
893 self.client.unsubscribe_instrument(cmd)?;
894 }
895
896 Ok(())
897 }
898
899 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
905 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
906 self.subscriptions_book_deltas.insert(cmd.instrument_id);
907 self.client.subscribe_book_deltas(cmd)?;
908 }
909
910 Ok(())
911 }
912
913 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
919 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
920 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
921 self.client.unsubscribe_book_deltas(cmd)?;
922 }
923
924 Ok(())
925 }
926
927 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
933 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
934 self.subscriptions_book_depth10.insert(cmd.instrument_id);
935 self.client.subscribe_book_depth10(cmd)?;
936 }
937
938 Ok(())
939 }
940
941 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
947 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
948 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
949 self.client.unsubscribe_book_depth10(cmd)?;
950 }
951
952 Ok(())
953 }
954
955 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
961 if !self
962 .subscriptions_book_snapshots
963 .contains(&cmd.instrument_id)
964 {
965 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
966 self.client.subscribe_book_snapshots(cmd)?;
967 }
968
969 Ok(())
970 }
971
972 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
978 if self
979 .subscriptions_book_snapshots
980 .contains(&cmd.instrument_id)
981 {
982 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
983 self.client.unsubscribe_book_snapshots(cmd)?;
984 }
985
986 Ok(())
987 }
988
989 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
995 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
996 self.subscriptions_quotes.insert(cmd.instrument_id);
997 self.client.subscribe_quotes(cmd)?;
998 }
999 Ok(())
1000 }
1001
1002 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1008 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
1009 self.subscriptions_quotes.remove(&cmd.instrument_id);
1010 self.client.unsubscribe_quotes(cmd)?;
1011 }
1012 Ok(())
1013 }
1014
1015 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1021 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
1022 self.subscriptions_trades.insert(cmd.instrument_id);
1023 self.client.subscribe_trades(cmd)?;
1024 }
1025 Ok(())
1026 }
1027
1028 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1034 if self.subscriptions_trades.contains(&cmd.instrument_id) {
1035 self.subscriptions_trades.remove(&cmd.instrument_id);
1036 self.client.unsubscribe_trades(cmd)?;
1037 }
1038 Ok(())
1039 }
1040
1041 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1047 if !self.subscriptions_bars.contains(&cmd.bar_type) {
1048 self.subscriptions_bars.insert(cmd.bar_type);
1049 self.client.subscribe_bars(cmd)?;
1050 }
1051 Ok(())
1052 }
1053
1054 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1060 if self.subscriptions_bars.contains(&cmd.bar_type) {
1061 self.subscriptions_bars.remove(&cmd.bar_type);
1062 self.client.unsubscribe_bars(cmd)?;
1063 }
1064 Ok(())
1065 }
1066
1067 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1073 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1074 self.subscriptions_mark_prices.insert(cmd.instrument_id);
1075 self.client.subscribe_mark_prices(cmd)?;
1076 }
1077 Ok(())
1078 }
1079
1080 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1086 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1087 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1088 self.client.unsubscribe_mark_prices(cmd)?;
1089 }
1090 Ok(())
1091 }
1092
1093 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1099 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1100 self.subscriptions_index_prices.insert(cmd.instrument_id);
1101 self.client.subscribe_index_prices(cmd)?;
1102 }
1103 Ok(())
1104 }
1105
1106 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1112 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1113 self.subscriptions_index_prices.remove(&cmd.instrument_id);
1114 self.client.unsubscribe_index_prices(cmd)?;
1115 }
1116 Ok(())
1117 }
1118
1119 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1125 if !self
1126 .subscriptions_funding_rates
1127 .contains(&cmd.instrument_id)
1128 {
1129 self.subscriptions_funding_rates.insert(cmd.instrument_id);
1130 self.client.subscribe_funding_rates(cmd)?;
1131 }
1132 Ok(())
1133 }
1134
1135 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1141 if self
1142 .subscriptions_funding_rates
1143 .contains(&cmd.instrument_id)
1144 {
1145 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1146 self.client.unsubscribe_funding_rates(cmd)?;
1147 }
1148 Ok(())
1149 }
1150
1151 fn subscribe_instrument_status(
1157 &mut self,
1158 cmd: &SubscribeInstrumentStatus,
1159 ) -> anyhow::Result<()> {
1160 if !self
1161 .subscriptions_instrument_status
1162 .contains(&cmd.instrument_id)
1163 {
1164 self.subscriptions_instrument_status
1165 .insert(cmd.instrument_id);
1166 self.client.subscribe_instrument_status(cmd)?;
1167 }
1168 Ok(())
1169 }
1170
1171 fn unsubscribe_instrument_status(
1177 &mut self,
1178 cmd: &UnsubscribeInstrumentStatus,
1179 ) -> anyhow::Result<()> {
1180 if self
1181 .subscriptions_instrument_status
1182 .contains(&cmd.instrument_id)
1183 {
1184 self.subscriptions_instrument_status
1185 .remove(&cmd.instrument_id);
1186 self.client.unsubscribe_instrument_status(cmd)?;
1187 }
1188 Ok(())
1189 }
1190
1191 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1197 if !self
1198 .subscriptions_instrument_close
1199 .contains(&cmd.instrument_id)
1200 {
1201 self.subscriptions_instrument_close
1202 .insert(cmd.instrument_id);
1203 self.client.subscribe_instrument_close(cmd)?;
1204 }
1205 Ok(())
1206 }
1207
1208 fn unsubscribe_instrument_close(
1214 &mut self,
1215 cmd: &UnsubscribeInstrumentClose,
1216 ) -> anyhow::Result<()> {
1217 if self
1218 .subscriptions_instrument_close
1219 .contains(&cmd.instrument_id)
1220 {
1221 self.subscriptions_instrument_close
1222 .remove(&cmd.instrument_id);
1223 self.client.unsubscribe_instrument_close(cmd)?;
1224 }
1225 Ok(())
1226 }
1227
1228 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1236 self.client.request_data(req)
1237 }
1238
1239 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1245 self.client.request_instrument(req)
1246 }
1247
1248 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1254 self.client.request_instruments(req)
1255 }
1256
1257 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1263 self.client.request_quotes(req)
1264 }
1265
1266 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1272 self.client.request_trades(req)
1273 }
1274
1275 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1281 self.client.request_bars(req)
1282 }
1283
1284 pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1290 self.client.request_book_depth(req)
1291 }
1292}
1293
1294#[inline(always)]
1295fn log_not_implemented<T: Debug>(msg: &T) {
1296 log::warn!("{msg:?} – handler not implemented");
1297}
1298
1299#[inline(always)]
1300fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1301 log::error!("Error on {cmd:?}: {e}");
1302}