1use std::{
22 any::Any,
23 fmt::{Debug, Display},
24 ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29 RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38 UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42 RequestPoolSnapshot, SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects,
43 SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
44 UnsubscribePool, UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
45 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50 data::{BarType, DataType},
51 identifiers::{ClientId, InstrumentId, Venue},
52};
53
54#[cfg(feature = "defi")]
55#[allow(unused_imports)] use crate::defi::client as _;
57
58#[async_trait::async_trait]
60pub trait DataClient: Any + Sync + Send {
61 fn client_id(&self) -> ClientId;
63
64 fn venue(&self) -> Option<Venue>;
66
67 fn start(&mut self) -> anyhow::Result<()>;
73
74 fn stop(&mut self) -> anyhow::Result<()>;
80
81 fn reset(&mut self) -> anyhow::Result<()>;
87
88 fn dispose(&mut self) -> anyhow::Result<()>;
94
95 async fn connect(&mut self) -> anyhow::Result<()> {
101 Ok(())
102 }
103
104 async fn disconnect(&mut self) -> anyhow::Result<()> {
110 Ok(())
111 }
112
113 fn is_connected(&self) -> bool;
115
116 fn is_disconnected(&self) -> bool;
118
119 fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
125 log_not_implemented(&cmd);
126 Ok(())
127 }
128
129 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
135 log_not_implemented(&cmd);
136 Ok(())
137 }
138
139 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
145 log_not_implemented(&cmd);
146 Ok(())
147 }
148
149 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
155 log_not_implemented(&cmd);
156 Ok(())
157 }
158
159 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
165 log_not_implemented(&cmd);
166 Ok(())
167 }
168
169 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
175 log_not_implemented(&cmd);
176 Ok(())
177 }
178
179 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
185 log_not_implemented(&cmd);
186 Ok(())
187 }
188
189 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
195 log_not_implemented(&cmd);
196 Ok(())
197 }
198
199 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
205 log_not_implemented(&cmd);
206 Ok(())
207 }
208
209 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
215 log_not_implemented(&cmd);
216 Ok(())
217 }
218
219 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
225 log_not_implemented(&cmd);
226 Ok(())
227 }
228
229 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
235 log_not_implemented(&cmd);
236 Ok(())
237 }
238
239 fn subscribe_instrument_status(
245 &mut self,
246 cmd: &SubscribeInstrumentStatus,
247 ) -> anyhow::Result<()> {
248 log_not_implemented(&cmd);
249 Ok(())
250 }
251
252 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
258 log_not_implemented(&cmd);
259 Ok(())
260 }
261
262 #[cfg(feature = "defi")]
263 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
269 log_not_implemented(&cmd);
270 Ok(())
271 }
272
273 #[cfg(feature = "defi")]
274 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
280 log_not_implemented(&cmd);
281 Ok(())
282 }
283
284 #[cfg(feature = "defi")]
285 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
291 log_not_implemented(&cmd);
292 Ok(())
293 }
294
295 #[cfg(feature = "defi")]
296 fn subscribe_pool_liquidity_updates(
302 &mut self,
303 cmd: &SubscribePoolLiquidityUpdates,
304 ) -> anyhow::Result<()> {
305 log_not_implemented(&cmd);
306 Ok(())
307 }
308
309 #[cfg(feature = "defi")]
310 fn subscribe_pool_fee_collects(
316 &mut self,
317 cmd: &SubscribePoolFeeCollects,
318 ) -> anyhow::Result<()> {
319 log_not_implemented(&cmd);
320 Ok(())
321 }
322
323 #[cfg(feature = "defi")]
324 fn subscribe_pool_flash_events(
330 &mut self,
331 cmd: &SubscribePoolFlashEvents,
332 ) -> anyhow::Result<()> {
333 log_not_implemented(&cmd);
334 Ok(())
335 }
336
337 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
343 log_not_implemented(&cmd);
344 Ok(())
345 }
346
347 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
353 log_not_implemented(&cmd);
354 Ok(())
355 }
356
357 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
363 log_not_implemented(&cmd);
364 Ok(())
365 }
366
367 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
373 log_not_implemented(&cmd);
374 Ok(())
375 }
376
377 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
383 log_not_implemented(&cmd);
384 Ok(())
385 }
386
387 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
393 log_not_implemented(&cmd);
394 Ok(())
395 }
396
397 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
403 log_not_implemented(&cmd);
404 Ok(())
405 }
406
407 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
413 log_not_implemented(&cmd);
414 Ok(())
415 }
416
417 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
423 log_not_implemented(&cmd);
424 Ok(())
425 }
426
427 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
433 log_not_implemented(&cmd);
434 Ok(())
435 }
436
437 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
443 log_not_implemented(&cmd);
444 Ok(())
445 }
446
447 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
453 log_not_implemented(&cmd);
454 Ok(())
455 }
456
457 fn unsubscribe_instrument_status(
463 &mut self,
464 cmd: &UnsubscribeInstrumentStatus,
465 ) -> anyhow::Result<()> {
466 log_not_implemented(&cmd);
467 Ok(())
468 }
469
470 fn unsubscribe_instrument_close(
476 &mut self,
477 cmd: &UnsubscribeInstrumentClose,
478 ) -> anyhow::Result<()> {
479 log_not_implemented(&cmd);
480 Ok(())
481 }
482
483 #[cfg(feature = "defi")]
484 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
490 log_not_implemented(&cmd);
491 Ok(())
492 }
493
494 #[cfg(feature = "defi")]
495 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
501 log_not_implemented(&cmd);
502 Ok(())
503 }
504
505 #[cfg(feature = "defi")]
506 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
512 log_not_implemented(&cmd);
513 Ok(())
514 }
515
516 #[cfg(feature = "defi")]
517 fn unsubscribe_pool_liquidity_updates(
523 &mut self,
524 cmd: &UnsubscribePoolLiquidityUpdates,
525 ) -> anyhow::Result<()> {
526 log_not_implemented(&cmd);
527 Ok(())
528 }
529
530 #[cfg(feature = "defi")]
531 fn unsubscribe_pool_fee_collects(
537 &mut self,
538 cmd: &UnsubscribePoolFeeCollects,
539 ) -> anyhow::Result<()> {
540 log_not_implemented(&cmd);
541 Ok(())
542 }
543
544 #[cfg(feature = "defi")]
545 fn unsubscribe_pool_flash_events(
551 &mut self,
552 cmd: &UnsubscribePoolFlashEvents,
553 ) -> anyhow::Result<()> {
554 log_not_implemented(&cmd);
555 Ok(())
556 }
557
558 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
564 log_not_implemented(&request);
565 Ok(())
566 }
567
568 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
574 log_not_implemented(&request);
575 Ok(())
576 }
577
578 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
584 log_not_implemented(&request);
585 Ok(())
586 }
587
588 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
594 log_not_implemented(&request);
595 Ok(())
596 }
597
598 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
604 log_not_implemented(&request);
605 Ok(())
606 }
607
608 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
614 log_not_implemented(&request);
615 Ok(())
616 }
617
618 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
624 log_not_implemented(&request);
625 Ok(())
626 }
627
628 fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
634 log_not_implemented(&request);
635 Ok(())
636 }
637
638 #[cfg(feature = "defi")]
639 fn request_pool_snapshot(&self, request: &RequestPoolSnapshot) -> anyhow::Result<()> {
645 log_not_implemented(&request);
646 Ok(())
647 }
648}
649
650pub struct DataClientAdapter {
652 pub(crate) client: Box<dyn DataClient>,
653 pub client_id: ClientId,
654 pub venue: Option<Venue>,
655 pub handles_book_deltas: bool,
656 pub handles_book_snapshots: bool,
657 pub subscriptions_custom: AHashSet<DataType>,
658 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
659 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
660 pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
661 pub subscriptions_quotes: AHashSet<InstrumentId>,
662 pub subscriptions_trades: AHashSet<InstrumentId>,
663 pub subscriptions_bars: AHashSet<BarType>,
664 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
665 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
666 pub subscriptions_instrument: AHashSet<InstrumentId>,
667 pub subscriptions_instrument_venue: AHashSet<Venue>,
668 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
669 pub subscriptions_index_prices: AHashSet<InstrumentId>,
670 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
671 #[cfg(feature = "defi")]
672 pub subscriptions_blocks: AHashSet<Blockchain>,
673 #[cfg(feature = "defi")]
674 pub subscriptions_pools: AHashSet<InstrumentId>,
675 #[cfg(feature = "defi")]
676 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
677 #[cfg(feature = "defi")]
678 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
679 #[cfg(feature = "defi")]
680 pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
681 #[cfg(feature = "defi")]
682 pub subscriptions_pool_flash: AHashSet<InstrumentId>,
683}
684
685impl Deref for DataClientAdapter {
686 type Target = Box<dyn DataClient>;
687
688 fn deref(&self) -> &Self::Target {
689 &self.client
690 }
691}
692
693impl DerefMut for DataClientAdapter {
694 fn deref_mut(&mut self) -> &mut Self::Target {
695 &mut self.client
696 }
697}
698
699impl Debug for DataClientAdapter {
700 #[rustfmt::skip]
701 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702 f.debug_struct(stringify!(DataClientAdapter))
703 .field("client_id", &self.client_id)
704 .field("venue", &self.venue)
705 .field("handles_book_deltas", &self.handles_book_deltas)
706 .field("handles_book_snapshots", &self.handles_book_snapshots)
707 .field("subscriptions_custom", &self.subscriptions_custom)
708 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
709 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
710 .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
711 .field("subscriptions_quotes", &self.subscriptions_quotes)
712 .field("subscriptions_trades", &self.subscriptions_trades)
713 .field("subscriptions_bars", &self.subscriptions_bars)
714 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
715 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
716 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
717 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
718 .field("subscriptions_instrument", &self.subscriptions_instrument)
719 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
720 .finish()
721 }
722}
723
724impl DataClientAdapter {
725 #[must_use]
727 pub fn new(
728 client_id: ClientId,
729 venue: Option<Venue>,
730 handles_order_book_deltas: bool,
731 handles_order_book_snapshots: bool,
732 client: Box<dyn DataClient>,
733 ) -> Self {
734 Self {
735 client,
736 client_id,
737 venue,
738 handles_book_deltas: handles_order_book_deltas,
739 handles_book_snapshots: handles_order_book_snapshots,
740 subscriptions_custom: AHashSet::new(),
741 subscriptions_book_deltas: AHashSet::new(),
742 subscriptions_book_depth10: AHashSet::new(),
743 subscriptions_book_snapshots: AHashSet::new(),
744 subscriptions_quotes: AHashSet::new(),
745 subscriptions_trades: AHashSet::new(),
746 subscriptions_mark_prices: AHashSet::new(),
747 subscriptions_index_prices: AHashSet::new(),
748 subscriptions_funding_rates: AHashSet::new(),
749 subscriptions_bars: AHashSet::new(),
750 subscriptions_instrument_status: AHashSet::new(),
751 subscriptions_instrument_close: AHashSet::new(),
752 subscriptions_instrument: AHashSet::new(),
753 subscriptions_instrument_venue: AHashSet::new(),
754 #[cfg(feature = "defi")]
755 subscriptions_blocks: AHashSet::new(),
756 #[cfg(feature = "defi")]
757 subscriptions_pools: AHashSet::new(),
758 #[cfg(feature = "defi")]
759 subscriptions_pool_swaps: AHashSet::new(),
760 #[cfg(feature = "defi")]
761 subscriptions_pool_liquidity_updates: AHashSet::new(),
762 #[cfg(feature = "defi")]
763 subscriptions_pool_fee_collects: AHashSet::new(),
764 #[cfg(feature = "defi")]
765 subscriptions_pool_flash: AHashSet::new(),
766 }
767 }
768
769 #[allow(clippy::borrowed_box)]
770 #[must_use]
771 pub fn get_client(&self) -> &Box<dyn DataClient> {
772 &self.client
773 }
774
775 #[inline]
776 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
777 if let Err(e) = match cmd {
778 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
779 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
780 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
781 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
782 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
783 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
784 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
785 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
786 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
787 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
788 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
789 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
790 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
791 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
792 } {
793 log_command_error(&cmd, &e);
794 }
795 }
796
797 #[inline]
798 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
799 if let Err(e) = match cmd {
800 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
801 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
802 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
803 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
804 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
805 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
806 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
807 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
808 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
809 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
810 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
811 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
812 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
813 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
814 } {
815 log_command_error(&cmd, &e);
816 }
817 }
818
819 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
827 if !self.subscriptions_custom.contains(&cmd.data_type) {
828 self.subscriptions_custom.insert(cmd.data_type.clone());
829 self.client.subscribe(cmd)?;
830 }
831 Ok(())
832 }
833
834 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
840 if self.subscriptions_custom.contains(&cmd.data_type) {
841 self.subscriptions_custom.remove(&cmd.data_type);
842 self.client.unsubscribe(cmd)?;
843 }
844 Ok(())
845 }
846
847 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
853 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
854 self.subscriptions_instrument_venue.insert(cmd.venue);
855 self.client.subscribe_instruments(cmd)?;
856 }
857
858 Ok(())
859 }
860
861 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
867 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
868 self.subscriptions_instrument_venue.remove(&cmd.venue);
869 self.client.unsubscribe_instruments(cmd)?;
870 }
871
872 Ok(())
873 }
874
875 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
881 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
882 self.subscriptions_instrument.insert(cmd.instrument_id);
883 self.client.subscribe_instrument(cmd)?;
884 }
885
886 Ok(())
887 }
888
889 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
895 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
896 self.subscriptions_instrument.remove(&cmd.instrument_id);
897 self.client.unsubscribe_instrument(cmd)?;
898 }
899
900 Ok(())
901 }
902
903 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
909 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
910 self.subscriptions_book_deltas.insert(cmd.instrument_id);
911 self.client.subscribe_book_deltas(cmd)?;
912 }
913
914 Ok(())
915 }
916
917 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
923 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
924 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
925 self.client.unsubscribe_book_deltas(cmd)?;
926 }
927
928 Ok(())
929 }
930
931 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
937 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
938 self.subscriptions_book_depth10.insert(cmd.instrument_id);
939 self.client.subscribe_book_depth10(cmd)?;
940 }
941
942 Ok(())
943 }
944
945 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
951 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
952 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
953 self.client.unsubscribe_book_depth10(cmd)?;
954 }
955
956 Ok(())
957 }
958
959 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
965 if !self
966 .subscriptions_book_snapshots
967 .contains(&cmd.instrument_id)
968 {
969 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
970 self.client.subscribe_book_snapshots(cmd)?;
971 }
972
973 Ok(())
974 }
975
976 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
982 if self
983 .subscriptions_book_snapshots
984 .contains(&cmd.instrument_id)
985 {
986 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
987 self.client.unsubscribe_book_snapshots(cmd)?;
988 }
989
990 Ok(())
991 }
992
993 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
999 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
1000 self.subscriptions_quotes.insert(cmd.instrument_id);
1001 self.client.subscribe_quotes(cmd)?;
1002 }
1003 Ok(())
1004 }
1005
1006 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1012 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
1013 self.subscriptions_quotes.remove(&cmd.instrument_id);
1014 self.client.unsubscribe_quotes(cmd)?;
1015 }
1016 Ok(())
1017 }
1018
1019 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1025 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
1026 self.subscriptions_trades.insert(cmd.instrument_id);
1027 self.client.subscribe_trades(cmd)?;
1028 }
1029 Ok(())
1030 }
1031
1032 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1038 if self.subscriptions_trades.contains(&cmd.instrument_id) {
1039 self.subscriptions_trades.remove(&cmd.instrument_id);
1040 self.client.unsubscribe_trades(cmd)?;
1041 }
1042 Ok(())
1043 }
1044
1045 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1051 if !self.subscriptions_bars.contains(&cmd.bar_type) {
1052 self.subscriptions_bars.insert(cmd.bar_type);
1053 self.client.subscribe_bars(cmd)?;
1054 }
1055 Ok(())
1056 }
1057
1058 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1064 if self.subscriptions_bars.contains(&cmd.bar_type) {
1065 self.subscriptions_bars.remove(&cmd.bar_type);
1066 self.client.unsubscribe_bars(cmd)?;
1067 }
1068 Ok(())
1069 }
1070
1071 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1077 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1078 self.subscriptions_mark_prices.insert(cmd.instrument_id);
1079 self.client.subscribe_mark_prices(cmd)?;
1080 }
1081 Ok(())
1082 }
1083
1084 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1090 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1091 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1092 self.client.unsubscribe_mark_prices(cmd)?;
1093 }
1094 Ok(())
1095 }
1096
1097 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1103 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1104 self.subscriptions_index_prices.insert(cmd.instrument_id);
1105 self.client.subscribe_index_prices(cmd)?;
1106 }
1107 Ok(())
1108 }
1109
1110 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1116 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1117 self.subscriptions_index_prices.remove(&cmd.instrument_id);
1118 self.client.unsubscribe_index_prices(cmd)?;
1119 }
1120 Ok(())
1121 }
1122
1123 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1129 if !self
1130 .subscriptions_funding_rates
1131 .contains(&cmd.instrument_id)
1132 {
1133 self.subscriptions_funding_rates.insert(cmd.instrument_id);
1134 self.client.subscribe_funding_rates(cmd)?;
1135 }
1136 Ok(())
1137 }
1138
1139 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1145 if self
1146 .subscriptions_funding_rates
1147 .contains(&cmd.instrument_id)
1148 {
1149 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1150 self.client.unsubscribe_funding_rates(cmd)?;
1151 }
1152 Ok(())
1153 }
1154
1155 fn subscribe_instrument_status(
1161 &mut self,
1162 cmd: &SubscribeInstrumentStatus,
1163 ) -> anyhow::Result<()> {
1164 if !self
1165 .subscriptions_instrument_status
1166 .contains(&cmd.instrument_id)
1167 {
1168 self.subscriptions_instrument_status
1169 .insert(cmd.instrument_id);
1170 self.client.subscribe_instrument_status(cmd)?;
1171 }
1172 Ok(())
1173 }
1174
1175 fn unsubscribe_instrument_status(
1181 &mut self,
1182 cmd: &UnsubscribeInstrumentStatus,
1183 ) -> anyhow::Result<()> {
1184 if self
1185 .subscriptions_instrument_status
1186 .contains(&cmd.instrument_id)
1187 {
1188 self.subscriptions_instrument_status
1189 .remove(&cmd.instrument_id);
1190 self.client.unsubscribe_instrument_status(cmd)?;
1191 }
1192 Ok(())
1193 }
1194
1195 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1201 if !self
1202 .subscriptions_instrument_close
1203 .contains(&cmd.instrument_id)
1204 {
1205 self.subscriptions_instrument_close
1206 .insert(cmd.instrument_id);
1207 self.client.subscribe_instrument_close(cmd)?;
1208 }
1209 Ok(())
1210 }
1211
1212 fn unsubscribe_instrument_close(
1218 &mut self,
1219 cmd: &UnsubscribeInstrumentClose,
1220 ) -> anyhow::Result<()> {
1221 if self
1222 .subscriptions_instrument_close
1223 .contains(&cmd.instrument_id)
1224 {
1225 self.subscriptions_instrument_close
1226 .remove(&cmd.instrument_id);
1227 self.client.unsubscribe_instrument_close(cmd)?;
1228 }
1229 Ok(())
1230 }
1231
1232 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1240 self.client.request_data(req)
1241 }
1242
1243 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1249 self.client.request_instrument(req)
1250 }
1251
1252 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1258 self.client.request_instruments(req)
1259 }
1260
1261 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1267 self.client.request_quotes(req)
1268 }
1269
1270 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1276 self.client.request_trades(req)
1277 }
1278
1279 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1285 self.client.request_bars(req)
1286 }
1287
1288 pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1294 self.client.request_book_depth(req)
1295 }
1296}
1297
1298#[inline(always)]
1299fn log_not_implemented<T: Debug>(msg: &T) {
1300 log::warn!("{msg:?} – handler not implemented");
1301}
1302
1303#[inline(always)]
1304fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1305 log::error!("Error on {cmd:?}: {e}");
1306}