1use std::{
22 any::Any,
23 fmt::{Debug, Display},
24 ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29 RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38 UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42 DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
43 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
44 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
45};
46#[cfg(feature = "defi")]
47use nautilus_model::defi::Blockchain;
48use nautilus_model::{
49 data::{BarType, DataType},
50 identifiers::{ClientId, InstrumentId, Venue},
51};
52
53#[async_trait::async_trait]
55pub trait DataClient: Any + Sync + Send {
56 fn client_id(&self) -> ClientId;
58
59 fn venue(&self) -> Option<Venue>;
61
62 fn start(&mut self) -> anyhow::Result<()>;
68
69 fn stop(&mut self) -> anyhow::Result<()>;
75
76 fn reset(&mut self) -> anyhow::Result<()>;
82
83 fn dispose(&mut self) -> anyhow::Result<()>;
89
90 async fn connect(&mut self) -> anyhow::Result<()>;
96
97 async fn disconnect(&mut self) -> anyhow::Result<()>;
103
104 fn is_connected(&self) -> bool;
106
107 fn is_disconnected(&self) -> bool;
109
110 fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
116 log_not_implemented(&cmd);
117 Ok(())
118 }
119
120 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
126 log_not_implemented(&cmd);
127 Ok(())
128 }
129
130 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
136 log_not_implemented(&cmd);
137 Ok(())
138 }
139
140 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
146 log_not_implemented(&cmd);
147 Ok(())
148 }
149
150 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
156 log_not_implemented(&cmd);
157 Ok(())
158 }
159
160 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
166 log_not_implemented(&cmd);
167 Ok(())
168 }
169
170 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
176 log_not_implemented(&cmd);
177 Ok(())
178 }
179
180 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
186 log_not_implemented(&cmd);
187 Ok(())
188 }
189
190 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
196 log_not_implemented(&cmd);
197 Ok(())
198 }
199
200 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
206 log_not_implemented(&cmd);
207 Ok(())
208 }
209
210 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
216 log_not_implemented(&cmd);
217 Ok(())
218 }
219
220 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
226 log_not_implemented(&cmd);
227 Ok(())
228 }
229
230 fn subscribe_instrument_status(
236 &mut self,
237 cmd: &SubscribeInstrumentStatus,
238 ) -> anyhow::Result<()> {
239 log_not_implemented(&cmd);
240 Ok(())
241 }
242
243 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
249 log_not_implemented(&cmd);
250 Ok(())
251 }
252
253 #[cfg(feature = "defi")]
254 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
260 log_not_implemented(&cmd);
261 Ok(())
262 }
263
264 #[cfg(feature = "defi")]
265 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
271 log_not_implemented(&cmd);
272 Ok(())
273 }
274
275 #[cfg(feature = "defi")]
276 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
282 log_not_implemented(&cmd);
283 Ok(())
284 }
285
286 #[cfg(feature = "defi")]
287 fn subscribe_pool_liquidity_updates(
293 &mut self,
294 cmd: &SubscribePoolLiquidityUpdates,
295 ) -> anyhow::Result<()> {
296 log_not_implemented(&cmd);
297 Ok(())
298 }
299
300 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
306 log_not_implemented(&cmd);
307 Ok(())
308 }
309
310 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
316 log_not_implemented(&cmd);
317 Ok(())
318 }
319
320 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
326 log_not_implemented(&cmd);
327 Ok(())
328 }
329
330 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
336 log_not_implemented(&cmd);
337 Ok(())
338 }
339
340 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
346 log_not_implemented(&cmd);
347 Ok(())
348 }
349
350 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
356 log_not_implemented(&cmd);
357 Ok(())
358 }
359
360 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
366 log_not_implemented(&cmd);
367 Ok(())
368 }
369
370 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
376 log_not_implemented(&cmd);
377 Ok(())
378 }
379
380 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
386 log_not_implemented(&cmd);
387 Ok(())
388 }
389
390 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
396 log_not_implemented(&cmd);
397 Ok(())
398 }
399
400 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
406 log_not_implemented(&cmd);
407 Ok(())
408 }
409
410 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
416 log_not_implemented(&cmd);
417 Ok(())
418 }
419
420 fn unsubscribe_instrument_status(
426 &mut self,
427 cmd: &UnsubscribeInstrumentStatus,
428 ) -> anyhow::Result<()> {
429 log_not_implemented(&cmd);
430 Ok(())
431 }
432
433 fn unsubscribe_instrument_close(
439 &mut self,
440 cmd: &UnsubscribeInstrumentClose,
441 ) -> anyhow::Result<()> {
442 log_not_implemented(&cmd);
443 Ok(())
444 }
445
446 #[cfg(feature = "defi")]
447 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
453 log_not_implemented(&cmd);
454 Ok(())
455 }
456
457 #[cfg(feature = "defi")]
458 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
464 log_not_implemented(&cmd);
465 Ok(())
466 }
467
468 #[cfg(feature = "defi")]
469 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
475 log_not_implemented(&cmd);
476 Ok(())
477 }
478
479 #[cfg(feature = "defi")]
480 fn unsubscribe_pool_liquidity_updates(
486 &mut self,
487 cmd: &UnsubscribePoolLiquidityUpdates,
488 ) -> anyhow::Result<()> {
489 log_not_implemented(&cmd);
490 Ok(())
491 }
492
493 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
499 log_not_implemented(&request);
500 Ok(())
501 }
502
503 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
509 log_not_implemented(&request);
510 Ok(())
511 }
512
513 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
519 log_not_implemented(&request);
520 Ok(())
521 }
522
523 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
529 log_not_implemented(&request);
530 Ok(())
531 }
532
533 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
539 log_not_implemented(&request);
540 Ok(())
541 }
542
543 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
549 log_not_implemented(&request);
550 Ok(())
551 }
552
553 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
559 log_not_implemented(&request);
560 Ok(())
561 }
562
563 fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
569 log_not_implemented(&request);
570 Ok(())
571 }
572}
573
574pub struct DataClientAdapter {
576 client: Box<dyn DataClient>,
577 pub client_id: ClientId,
578 pub venue: Option<Venue>,
579 pub handles_book_deltas: bool,
580 pub handles_book_snapshots: bool,
581 pub subscriptions_custom: AHashSet<DataType>,
582 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
583 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
584 pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
585 pub subscriptions_quotes: AHashSet<InstrumentId>,
586 pub subscriptions_trades: AHashSet<InstrumentId>,
587 pub subscriptions_bars: AHashSet<BarType>,
588 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
589 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
590 pub subscriptions_instrument: AHashSet<InstrumentId>,
591 pub subscriptions_instrument_venue: AHashSet<Venue>,
592 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
593 pub subscriptions_index_prices: AHashSet<InstrumentId>,
594 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
595 #[cfg(feature = "defi")]
596 pub subscriptions_blocks: AHashSet<Blockchain>,
597 #[cfg(feature = "defi")]
598 pub subscriptions_pools: AHashSet<InstrumentId>,
599 #[cfg(feature = "defi")]
600 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
601 #[cfg(feature = "defi")]
602 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
603}
604
605impl Deref for DataClientAdapter {
606 type Target = Box<dyn DataClient>;
607
608 fn deref(&self) -> &Self::Target {
609 &self.client
610 }
611}
612
613impl DerefMut for DataClientAdapter {
614 fn deref_mut(&mut self) -> &mut Self::Target {
615 &mut self.client
616 }
617}
618
619impl Debug for DataClientAdapter {
620 #[rustfmt::skip]
621 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
622 f.debug_struct(stringify!(DataClientAdapter))
623 .field("client_id", &self.client_id)
624 .field("venue", &self.venue)
625 .field("handles_book_deltas", &self.handles_book_deltas)
626 .field("handles_book_snapshots", &self.handles_book_snapshots)
627 .field("subscriptions_custom", &self.subscriptions_custom)
628 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
629 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
630 .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
631 .field("subscriptions_quotes", &self.subscriptions_quotes)
632 .field("subscriptions_trades", &self.subscriptions_trades)
633 .field("subscriptions_bars", &self.subscriptions_bars)
634 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
635 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
636 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
637 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
638 .field("subscriptions_instrument", &self.subscriptions_instrument)
639 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
640 .finish()
641 }
642}
643
644impl DataClientAdapter {
645 #[must_use]
647 pub fn new(
648 client_id: ClientId,
649 venue: Option<Venue>,
650 handles_order_book_deltas: bool,
651 handles_order_book_snapshots: bool,
652 client: Box<dyn DataClient>,
653 ) -> Self {
654 Self {
655 client,
656 client_id,
657 venue,
658 handles_book_deltas: handles_order_book_deltas,
659 handles_book_snapshots: handles_order_book_snapshots,
660 subscriptions_custom: AHashSet::new(),
661 subscriptions_book_deltas: AHashSet::new(),
662 subscriptions_book_depth10: AHashSet::new(),
663 subscriptions_book_snapshots: AHashSet::new(),
664 subscriptions_quotes: AHashSet::new(),
665 subscriptions_trades: AHashSet::new(),
666 subscriptions_mark_prices: AHashSet::new(),
667 subscriptions_index_prices: AHashSet::new(),
668 subscriptions_funding_rates: AHashSet::new(),
669 subscriptions_bars: AHashSet::new(),
670 subscriptions_instrument_status: AHashSet::new(),
671 subscriptions_instrument_close: AHashSet::new(),
672 subscriptions_instrument: AHashSet::new(),
673 subscriptions_instrument_venue: AHashSet::new(),
674 #[cfg(feature = "defi")]
675 subscriptions_blocks: AHashSet::new(),
676 #[cfg(feature = "defi")]
677 subscriptions_pools: AHashSet::new(),
678 #[cfg(feature = "defi")]
679 subscriptions_pool_swaps: AHashSet::new(),
680 #[cfg(feature = "defi")]
681 subscriptions_pool_liquidity_updates: AHashSet::new(),
682 }
683 }
684
685 #[allow(clippy::borrowed_box)]
686 #[must_use]
687 pub fn get_client(&self) -> &Box<dyn DataClient> {
688 &self.client
689 }
690
691 #[inline]
692 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
693 if let Err(e) = match cmd {
694 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
695 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
696 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
697 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
698 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
699 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
700 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
701 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
702 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
703 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
704 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
705 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
706 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
707 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
708 } {
709 log_command_error(&cmd, &e);
710 }
711 }
712
713 #[cfg(feature = "defi")]
714 #[inline]
715 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
716 if let Err(e) = match cmd {
717 DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
718 DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
719 DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
720 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
721 self.subscribe_pool_liquidity_updates(cmd)
722 }
723 } {
724 log_command_error(&cmd, &e);
725 }
726 }
727
728 #[inline]
729 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
730 if let Err(e) = match cmd {
731 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
732 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
733 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
734 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
735 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
736 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
737 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
738 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
739 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
740 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
741 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
742 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
743 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
744 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
745 } {
746 log_command_error(&cmd, &e);
747 }
748 }
749
750 #[cfg(feature = "defi")]
751 #[inline]
752 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
753 if let Err(e) = match cmd {
754 DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
755 DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
756 DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
757 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
758 self.unsubscribe_pool_liquidity_updates(cmd)
759 }
760 } {
761 log_command_error(&cmd, &e);
762 }
763 }
764
765 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
773 if !self.subscriptions_custom.contains(&cmd.data_type) {
774 self.subscriptions_custom.insert(cmd.data_type.clone());
775 self.client.subscribe(cmd)?;
776 }
777 Ok(())
778 }
779
780 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
786 if self.subscriptions_custom.contains(&cmd.data_type) {
787 self.subscriptions_custom.remove(&cmd.data_type);
788 self.client.unsubscribe(cmd)?;
789 }
790 Ok(())
791 }
792
793 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
799 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
800 self.subscriptions_instrument_venue.insert(cmd.venue);
801 self.client.subscribe_instruments(cmd)?;
802 }
803
804 Ok(())
805 }
806
807 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
813 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
814 self.subscriptions_instrument_venue.remove(&cmd.venue);
815 self.client.unsubscribe_instruments(cmd)?;
816 }
817
818 Ok(())
819 }
820
821 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
827 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
828 self.subscriptions_instrument.insert(cmd.instrument_id);
829 self.client.subscribe_instrument(cmd)?;
830 }
831
832 Ok(())
833 }
834
835 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
841 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
842 self.subscriptions_instrument.remove(&cmd.instrument_id);
843 self.client.unsubscribe_instrument(cmd)?;
844 }
845
846 Ok(())
847 }
848
849 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
855 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
856 self.subscriptions_book_deltas.insert(cmd.instrument_id);
857 self.client.subscribe_book_deltas(cmd)?;
858 }
859
860 Ok(())
861 }
862
863 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
869 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
870 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
871 self.client.unsubscribe_book_deltas(cmd)?;
872 }
873
874 Ok(())
875 }
876
877 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
883 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
884 self.subscriptions_book_depth10.insert(cmd.instrument_id);
885 self.client.subscribe_book_depth10(cmd)?;
886 }
887
888 Ok(())
889 }
890
891 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
897 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
898 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
899 self.client.unsubscribe_book_depth10(cmd)?;
900 }
901
902 Ok(())
903 }
904
905 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
911 if !self
912 .subscriptions_book_snapshots
913 .contains(&cmd.instrument_id)
914 {
915 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
916 self.client.subscribe_book_snapshots(cmd)?;
917 }
918
919 Ok(())
920 }
921
922 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
928 if self
929 .subscriptions_book_snapshots
930 .contains(&cmd.instrument_id)
931 {
932 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
933 self.client.unsubscribe_book_snapshots(cmd)?;
934 }
935
936 Ok(())
937 }
938
939 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
945 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
946 self.subscriptions_quotes.insert(cmd.instrument_id);
947 self.client.subscribe_quotes(cmd)?;
948 }
949 Ok(())
950 }
951
952 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
958 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
959 self.subscriptions_quotes.remove(&cmd.instrument_id);
960 self.client.unsubscribe_quotes(cmd)?;
961 }
962 Ok(())
963 }
964
965 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
971 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
972 self.subscriptions_trades.insert(cmd.instrument_id);
973 self.client.subscribe_trades(cmd)?;
974 }
975 Ok(())
976 }
977
978 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
984 if self.subscriptions_trades.contains(&cmd.instrument_id) {
985 self.subscriptions_trades.remove(&cmd.instrument_id);
986 self.client.unsubscribe_trades(cmd)?;
987 }
988 Ok(())
989 }
990
991 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
997 if !self.subscriptions_bars.contains(&cmd.bar_type) {
998 self.subscriptions_bars.insert(cmd.bar_type);
999 self.client.subscribe_bars(cmd)?;
1000 }
1001 Ok(())
1002 }
1003
1004 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1010 if self.subscriptions_bars.contains(&cmd.bar_type) {
1011 self.subscriptions_bars.remove(&cmd.bar_type);
1012 self.client.unsubscribe_bars(cmd)?;
1013 }
1014 Ok(())
1015 }
1016
1017 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1023 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1024 self.subscriptions_mark_prices.insert(cmd.instrument_id);
1025 self.client.subscribe_mark_prices(cmd)?;
1026 }
1027 Ok(())
1028 }
1029
1030 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1036 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1037 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1038 self.client.unsubscribe_mark_prices(cmd)?;
1039 }
1040 Ok(())
1041 }
1042
1043 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1049 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1050 self.subscriptions_index_prices.insert(cmd.instrument_id);
1051 self.client.subscribe_index_prices(cmd)?;
1052 }
1053 Ok(())
1054 }
1055
1056 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1062 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1063 self.subscriptions_index_prices.remove(&cmd.instrument_id);
1064 self.client.unsubscribe_index_prices(cmd)?;
1065 }
1066 Ok(())
1067 }
1068
1069 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1075 if !self
1076 .subscriptions_funding_rates
1077 .contains(&cmd.instrument_id)
1078 {
1079 self.subscriptions_funding_rates.insert(cmd.instrument_id);
1080 self.client.subscribe_funding_rates(cmd)?;
1081 }
1082 Ok(())
1083 }
1084
1085 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1091 if self
1092 .subscriptions_funding_rates
1093 .contains(&cmd.instrument_id)
1094 {
1095 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1096 self.client.unsubscribe_funding_rates(cmd)?;
1097 }
1098 Ok(())
1099 }
1100
1101 fn subscribe_instrument_status(
1107 &mut self,
1108 cmd: &SubscribeInstrumentStatus,
1109 ) -> anyhow::Result<()> {
1110 if !self
1111 .subscriptions_instrument_status
1112 .contains(&cmd.instrument_id)
1113 {
1114 self.subscriptions_instrument_status
1115 .insert(cmd.instrument_id);
1116 self.client.subscribe_instrument_status(cmd)?;
1117 }
1118 Ok(())
1119 }
1120
1121 fn unsubscribe_instrument_status(
1127 &mut self,
1128 cmd: &UnsubscribeInstrumentStatus,
1129 ) -> anyhow::Result<()> {
1130 if self
1131 .subscriptions_instrument_status
1132 .contains(&cmd.instrument_id)
1133 {
1134 self.subscriptions_instrument_status
1135 .remove(&cmd.instrument_id);
1136 self.client.unsubscribe_instrument_status(cmd)?;
1137 }
1138 Ok(())
1139 }
1140
1141 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1147 if !self
1148 .subscriptions_instrument_close
1149 .contains(&cmd.instrument_id)
1150 {
1151 self.subscriptions_instrument_close
1152 .insert(cmd.instrument_id);
1153 self.client.subscribe_instrument_close(cmd)?;
1154 }
1155 Ok(())
1156 }
1157
1158 fn unsubscribe_instrument_close(
1164 &mut self,
1165 cmd: &UnsubscribeInstrumentClose,
1166 ) -> anyhow::Result<()> {
1167 if self
1168 .subscriptions_instrument_close
1169 .contains(&cmd.instrument_id)
1170 {
1171 self.subscriptions_instrument_close
1172 .remove(&cmd.instrument_id);
1173 self.client.unsubscribe_instrument_close(cmd)?;
1174 }
1175 Ok(())
1176 }
1177
1178 #[cfg(feature = "defi")]
1179 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
1185 if !self.subscriptions_blocks.contains(&cmd.chain) {
1186 self.subscriptions_blocks.insert(cmd.chain);
1187 self.client.subscribe_blocks(cmd)?;
1188 }
1189 Ok(())
1190 }
1191
1192 #[cfg(feature = "defi")]
1193 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
1199 if self.subscriptions_blocks.contains(&cmd.chain) {
1200 self.subscriptions_blocks.remove(&cmd.chain);
1201 self.client.unsubscribe_blocks(cmd)?;
1202 }
1203 Ok(())
1204 }
1205
1206 #[cfg(feature = "defi")]
1207 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
1213 if !self.subscriptions_pools.contains(&cmd.instrument_id) {
1214 self.subscriptions_pools.insert(cmd.instrument_id);
1215 self.client.subscribe_pool(cmd)?;
1216 }
1217 Ok(())
1218 }
1219
1220 #[cfg(feature = "defi")]
1221 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
1227 if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1228 self.subscriptions_pool_swaps.insert(cmd.instrument_id);
1229 self.client.subscribe_pool_swaps(cmd)?;
1230 }
1231 Ok(())
1232 }
1233
1234 #[cfg(feature = "defi")]
1235 fn subscribe_pool_liquidity_updates(
1241 &mut self,
1242 cmd: &SubscribePoolLiquidityUpdates,
1243 ) -> anyhow::Result<()> {
1244 if !self
1245 .subscriptions_pool_liquidity_updates
1246 .contains(&cmd.instrument_id)
1247 {
1248 self.subscriptions_pool_liquidity_updates
1249 .insert(cmd.instrument_id);
1250 self.client.subscribe_pool_liquidity_updates(cmd)?;
1251 }
1252 Ok(())
1253 }
1254
1255 #[cfg(feature = "defi")]
1256 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
1262 if self.subscriptions_pools.contains(&cmd.instrument_id) {
1263 self.subscriptions_pools.remove(&cmd.instrument_id);
1264 self.client.unsubscribe_pool(cmd)?;
1265 }
1266 Ok(())
1267 }
1268
1269 #[cfg(feature = "defi")]
1270 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
1276 if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1277 self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
1278 self.client.unsubscribe_pool_swaps(cmd)?;
1279 }
1280 Ok(())
1281 }
1282
1283 #[cfg(feature = "defi")]
1284 fn unsubscribe_pool_liquidity_updates(
1290 &mut self,
1291 cmd: &UnsubscribePoolLiquidityUpdates,
1292 ) -> anyhow::Result<()> {
1293 if self
1294 .subscriptions_pool_liquidity_updates
1295 .contains(&cmd.instrument_id)
1296 {
1297 self.subscriptions_pool_liquidity_updates
1298 .remove(&cmd.instrument_id);
1299 self.client.unsubscribe_pool_liquidity_updates(cmd)?;
1300 }
1301 Ok(())
1302 }
1303
1304 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1312 self.client.request_data(req)
1313 }
1314
1315 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1321 self.client.request_instrument(req)
1322 }
1323
1324 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1330 self.client.request_instruments(req)
1331 }
1332
1333 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1339 self.client.request_quotes(req)
1340 }
1341
1342 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1348 self.client.request_trades(req)
1349 }
1350
1351 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1357 self.client.request_bars(req)
1358 }
1359
1360 pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1366 self.client.request_book_depth(req)
1367 }
1368}
1369
1370#[inline(always)]
1371fn log_not_implemented<T: Debug>(msg: &T) {
1372 log::warn!("{msg:?} – handler not implemented");
1373}
1374
1375#[inline(always)]
1376fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1377 log::error!("Error on {cmd:?}: {e}");
1378}