1use std::{
22 any::Any,
23 fmt::{Debug, Display},
24 ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29 RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
30 RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
31 SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
32 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
33 SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars,
34 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
35 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
36 UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
37 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38};
39#[cfg(feature = "defi")]
40use nautilus_common::messages::defi::{
41 DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
42 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
43 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
44};
45#[cfg(feature = "defi")]
46use nautilus_model::defi::Blockchain;
47use nautilus_model::{
48 data::{BarType, DataType},
49 identifiers::{ClientId, InstrumentId, Venue},
50};
51
52#[async_trait::async_trait]
54pub trait DataClient: Any + Sync + Send {
55 fn client_id(&self) -> ClientId;
57
58 fn venue(&self) -> Option<Venue>;
60
61 fn start(&mut self) -> anyhow::Result<()>;
67
68 fn stop(&mut self) -> anyhow::Result<()>;
74
75 fn reset(&mut self) -> anyhow::Result<()>;
81
82 fn dispose(&mut self) -> anyhow::Result<()>;
88
89 async fn connect(&mut self) -> anyhow::Result<()>;
95
96 async fn disconnect(&mut self) -> anyhow::Result<()>;
102
103 fn is_connected(&self) -> bool;
105
106 fn is_disconnected(&self) -> bool;
108
109 fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
115 log_not_implemented(&cmd);
116 Ok(())
117 }
118
119 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
125 log_not_implemented(&cmd);
126 Ok(())
127 }
128
129 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
135 log_not_implemented(&cmd);
136 Ok(())
137 }
138
139 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
145 log_not_implemented(&cmd);
146 Ok(())
147 }
148
149 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
155 log_not_implemented(&cmd);
156 Ok(())
157 }
158
159 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
165 log_not_implemented(&cmd);
166 Ok(())
167 }
168
169 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
175 log_not_implemented(&cmd);
176 Ok(())
177 }
178
179 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
185 log_not_implemented(&cmd);
186 Ok(())
187 }
188
189 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
195 log_not_implemented(&cmd);
196 Ok(())
197 }
198
199 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
205 log_not_implemented(&cmd);
206 Ok(())
207 }
208
209 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
215 log_not_implemented(&cmd);
216 Ok(())
217 }
218
219 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
225 log_not_implemented(&cmd);
226 Ok(())
227 }
228
229 fn subscribe_instrument_status(
235 &mut self,
236 cmd: &SubscribeInstrumentStatus,
237 ) -> anyhow::Result<()> {
238 log_not_implemented(&cmd);
239 Ok(())
240 }
241
242 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
248 log_not_implemented(&cmd);
249 Ok(())
250 }
251
252 #[cfg(feature = "defi")]
253 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
259 log_not_implemented(&cmd);
260 Ok(())
261 }
262
263 #[cfg(feature = "defi")]
264 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
270 log_not_implemented(&cmd);
271 Ok(())
272 }
273
274 #[cfg(feature = "defi")]
275 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
281 log_not_implemented(&cmd);
282 Ok(())
283 }
284
285 #[cfg(feature = "defi")]
286 fn subscribe_pool_liquidity_updates(
292 &mut self,
293 cmd: &SubscribePoolLiquidityUpdates,
294 ) -> anyhow::Result<()> {
295 log_not_implemented(&cmd);
296 Ok(())
297 }
298
299 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
305 log_not_implemented(&cmd);
306 Ok(())
307 }
308
309 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
315 log_not_implemented(&cmd);
316 Ok(())
317 }
318
319 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
325 log_not_implemented(&cmd);
326 Ok(())
327 }
328
329 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
335 log_not_implemented(&cmd);
336 Ok(())
337 }
338
339 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
345 log_not_implemented(&cmd);
346 Ok(())
347 }
348
349 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
355 log_not_implemented(&cmd);
356 Ok(())
357 }
358
359 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
365 log_not_implemented(&cmd);
366 Ok(())
367 }
368
369 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
375 log_not_implemented(&cmd);
376 Ok(())
377 }
378
379 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
385 log_not_implemented(&cmd);
386 Ok(())
387 }
388
389 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
395 log_not_implemented(&cmd);
396 Ok(())
397 }
398
399 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
405 log_not_implemented(&cmd);
406 Ok(())
407 }
408
409 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
415 log_not_implemented(&cmd);
416 Ok(())
417 }
418
419 fn unsubscribe_instrument_status(
425 &mut self,
426 cmd: &UnsubscribeInstrumentStatus,
427 ) -> anyhow::Result<()> {
428 log_not_implemented(&cmd);
429 Ok(())
430 }
431
432 fn unsubscribe_instrument_close(
438 &mut self,
439 cmd: &UnsubscribeInstrumentClose,
440 ) -> anyhow::Result<()> {
441 log_not_implemented(&cmd);
442 Ok(())
443 }
444
445 #[cfg(feature = "defi")]
446 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
452 log_not_implemented(&cmd);
453 Ok(())
454 }
455
456 #[cfg(feature = "defi")]
457 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
463 log_not_implemented(&cmd);
464 Ok(())
465 }
466
467 #[cfg(feature = "defi")]
468 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
474 log_not_implemented(&cmd);
475 Ok(())
476 }
477
478 #[cfg(feature = "defi")]
479 fn unsubscribe_pool_liquidity_updates(
485 &mut self,
486 cmd: &UnsubscribePoolLiquidityUpdates,
487 ) -> anyhow::Result<()> {
488 log_not_implemented(&cmd);
489 Ok(())
490 }
491
492 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
498 log_not_implemented(&request);
499 Ok(())
500 }
501
502 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
508 log_not_implemented(&request);
509 Ok(())
510 }
511
512 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
518 log_not_implemented(&request);
519 Ok(())
520 }
521
522 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
528 log_not_implemented(&request);
529 Ok(())
530 }
531
532 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
538 log_not_implemented(&request);
539 Ok(())
540 }
541
542 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
548 log_not_implemented(&request);
549 Ok(())
550 }
551
552 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
558 log_not_implemented(&request);
559 Ok(())
560 }
561}
562
563pub struct DataClientAdapter {
565 client: Box<dyn DataClient>,
566 pub client_id: ClientId,
567 pub venue: Option<Venue>,
568 pub handles_book_deltas: bool,
569 pub handles_book_snapshots: bool,
570 pub subscriptions_custom: AHashSet<DataType>,
571 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
572 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
573 pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
574 pub subscriptions_quotes: AHashSet<InstrumentId>,
575 pub subscriptions_trades: AHashSet<InstrumentId>,
576 pub subscriptions_bars: AHashSet<BarType>,
577 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
578 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
579 pub subscriptions_instrument: AHashSet<InstrumentId>,
580 pub subscriptions_instrument_venue: AHashSet<Venue>,
581 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
582 pub subscriptions_index_prices: AHashSet<InstrumentId>,
583 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
584 #[cfg(feature = "defi")]
585 pub subscriptions_blocks: AHashSet<Blockchain>,
586 #[cfg(feature = "defi")]
587 pub subscriptions_pools: AHashSet<InstrumentId>,
588 #[cfg(feature = "defi")]
589 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
590 #[cfg(feature = "defi")]
591 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
592}
593
594impl Deref for DataClientAdapter {
595 type Target = Box<dyn DataClient>;
596
597 fn deref(&self) -> &Self::Target {
598 &self.client
599 }
600}
601
602impl DerefMut for DataClientAdapter {
603 fn deref_mut(&mut self) -> &mut Self::Target {
604 &mut self.client
605 }
606}
607
608impl Debug for DataClientAdapter {
609 #[rustfmt::skip]
610 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
611 f.debug_struct(stringify!(DataClientAdapter))
612 .field("client_id", &self.client_id)
613 .field("venue", &self.venue)
614 .field("handles_book_deltas", &self.handles_book_deltas)
615 .field("handles_book_snapshots", &self.handles_book_snapshots)
616 .field("subscriptions_custom", &self.subscriptions_custom)
617 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
618 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
619 .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
620 .field("subscriptions_quotes", &self.subscriptions_quotes)
621 .field("subscriptions_trades", &self.subscriptions_trades)
622 .field("subscriptions_bars", &self.subscriptions_bars)
623 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
624 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
625 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
626 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
627 .field("subscriptions_instrument", &self.subscriptions_instrument)
628 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
629 .finish()
630 }
631}
632
633impl DataClientAdapter {
634 #[must_use]
636 pub fn new(
637 client_id: ClientId,
638 venue: Option<Venue>,
639 handles_order_book_deltas: bool,
640 handles_order_book_snapshots: bool,
641 client: Box<dyn DataClient>,
642 ) -> Self {
643 Self {
644 client,
645 client_id,
646 venue,
647 handles_book_deltas: handles_order_book_deltas,
648 handles_book_snapshots: handles_order_book_snapshots,
649 subscriptions_custom: AHashSet::new(),
650 subscriptions_book_deltas: AHashSet::new(),
651 subscriptions_book_depth10: AHashSet::new(),
652 subscriptions_book_snapshots: AHashSet::new(),
653 subscriptions_quotes: AHashSet::new(),
654 subscriptions_trades: AHashSet::new(),
655 subscriptions_mark_prices: AHashSet::new(),
656 subscriptions_index_prices: AHashSet::new(),
657 subscriptions_funding_rates: AHashSet::new(),
658 subscriptions_bars: AHashSet::new(),
659 subscriptions_instrument_status: AHashSet::new(),
660 subscriptions_instrument_close: AHashSet::new(),
661 subscriptions_instrument: AHashSet::new(),
662 subscriptions_instrument_venue: AHashSet::new(),
663 #[cfg(feature = "defi")]
664 subscriptions_blocks: AHashSet::new(),
665 #[cfg(feature = "defi")]
666 subscriptions_pools: AHashSet::new(),
667 #[cfg(feature = "defi")]
668 subscriptions_pool_swaps: AHashSet::new(),
669 #[cfg(feature = "defi")]
670 subscriptions_pool_liquidity_updates: AHashSet::new(),
671 }
672 }
673
674 #[allow(clippy::borrowed_box)]
675 #[must_use]
676 pub fn get_client(&self) -> &Box<dyn DataClient> {
677 &self.client
678 }
679
680 #[inline]
681 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
682 if let Err(e) = match cmd {
683 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
684 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
685 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
686 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
687 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
688 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
689 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
690 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
691 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
692 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
693 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
694 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
695 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
696 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
697 } {
698 log_command_error(&cmd, &e);
699 }
700 }
701
702 #[cfg(feature = "defi")]
703 #[inline]
704 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
705 if let Err(e) = match cmd {
706 DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
707 DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
708 DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
709 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
710 self.subscribe_pool_liquidity_updates(cmd)
711 }
712 } {
713 log_command_error(&cmd, &e);
714 }
715 }
716
717 #[inline]
718 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
719 if let Err(e) = match cmd {
720 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
721 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
722 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
723 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
724 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
725 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
726 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
727 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
728 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
729 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
730 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
731 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
732 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
733 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
734 } {
735 log_command_error(&cmd, &e);
736 }
737 }
738
739 #[cfg(feature = "defi")]
740 #[inline]
741 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
742 if let Err(e) = match cmd {
743 DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
744 DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
745 DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
746 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
747 self.unsubscribe_pool_liquidity_updates(cmd)
748 }
749 } {
750 log_command_error(&cmd, &e);
751 }
752 }
753
754 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
762 if !self.subscriptions_custom.contains(&cmd.data_type) {
763 self.subscriptions_custom.insert(cmd.data_type.clone());
764 self.client.subscribe(cmd)?;
765 }
766 Ok(())
767 }
768
769 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
775 if self.subscriptions_custom.contains(&cmd.data_type) {
776 self.subscriptions_custom.remove(&cmd.data_type);
777 self.client.unsubscribe(cmd)?;
778 }
779 Ok(())
780 }
781
782 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
788 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
789 self.subscriptions_instrument_venue.insert(cmd.venue);
790 self.client.subscribe_instruments(cmd)?;
791 }
792
793 Ok(())
794 }
795
796 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
802 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
803 self.subscriptions_instrument_venue.remove(&cmd.venue);
804 self.client.unsubscribe_instruments(cmd)?;
805 }
806
807 Ok(())
808 }
809
810 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
816 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
817 self.subscriptions_instrument.insert(cmd.instrument_id);
818 self.client.subscribe_instrument(cmd)?;
819 }
820
821 Ok(())
822 }
823
824 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
830 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
831 self.subscriptions_instrument.remove(&cmd.instrument_id);
832 self.client.unsubscribe_instrument(cmd)?;
833 }
834
835 Ok(())
836 }
837
838 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
844 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
845 self.subscriptions_book_deltas.insert(cmd.instrument_id);
846 self.client.subscribe_book_deltas(cmd)?;
847 }
848
849 Ok(())
850 }
851
852 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
858 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
859 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
860 self.client.unsubscribe_book_deltas(cmd)?;
861 }
862
863 Ok(())
864 }
865
866 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
872 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
873 self.subscriptions_book_depth10.insert(cmd.instrument_id);
874 self.client.subscribe_book_depth10(cmd)?;
875 }
876
877 Ok(())
878 }
879
880 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
886 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
887 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
888 self.client.unsubscribe_book_depth10(cmd)?;
889 }
890
891 Ok(())
892 }
893
894 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
900 if !self
901 .subscriptions_book_snapshots
902 .contains(&cmd.instrument_id)
903 {
904 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
905 self.client.subscribe_book_snapshots(cmd)?;
906 }
907
908 Ok(())
909 }
910
911 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
917 if self
918 .subscriptions_book_snapshots
919 .contains(&cmd.instrument_id)
920 {
921 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
922 self.client.unsubscribe_book_snapshots(cmd)?;
923 }
924
925 Ok(())
926 }
927
928 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
934 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
935 self.subscriptions_quotes.insert(cmd.instrument_id);
936 self.client.subscribe_quotes(cmd)?;
937 }
938 Ok(())
939 }
940
941 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
947 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
948 self.subscriptions_quotes.remove(&cmd.instrument_id);
949 self.client.unsubscribe_quotes(cmd)?;
950 }
951 Ok(())
952 }
953
954 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
960 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
961 self.subscriptions_trades.insert(cmd.instrument_id);
962 self.client.subscribe_trades(cmd)?;
963 }
964 Ok(())
965 }
966
967 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
973 if self.subscriptions_trades.contains(&cmd.instrument_id) {
974 self.subscriptions_trades.remove(&cmd.instrument_id);
975 self.client.unsubscribe_trades(cmd)?;
976 }
977 Ok(())
978 }
979
980 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
986 if !self.subscriptions_bars.contains(&cmd.bar_type) {
987 self.subscriptions_bars.insert(cmd.bar_type);
988 self.client.subscribe_bars(cmd)?;
989 }
990 Ok(())
991 }
992
993 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
999 if self.subscriptions_bars.contains(&cmd.bar_type) {
1000 self.subscriptions_bars.remove(&cmd.bar_type);
1001 self.client.unsubscribe_bars(cmd)?;
1002 }
1003 Ok(())
1004 }
1005
1006 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1012 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1013 self.subscriptions_mark_prices.insert(cmd.instrument_id);
1014 self.client.subscribe_mark_prices(cmd)?;
1015 }
1016 Ok(())
1017 }
1018
1019 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1025 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1026 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1027 self.client.unsubscribe_mark_prices(cmd)?;
1028 }
1029 Ok(())
1030 }
1031
1032 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1038 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1039 self.subscriptions_index_prices.insert(cmd.instrument_id);
1040 self.client.subscribe_index_prices(cmd)?;
1041 }
1042 Ok(())
1043 }
1044
1045 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1051 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1052 self.subscriptions_index_prices.remove(&cmd.instrument_id);
1053 self.client.unsubscribe_index_prices(cmd)?;
1054 }
1055 Ok(())
1056 }
1057
1058 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1064 if !self
1065 .subscriptions_funding_rates
1066 .contains(&cmd.instrument_id)
1067 {
1068 self.subscriptions_funding_rates.insert(cmd.instrument_id);
1069 self.client.subscribe_funding_rates(cmd)?;
1070 }
1071 Ok(())
1072 }
1073
1074 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1080 if self
1081 .subscriptions_funding_rates
1082 .contains(&cmd.instrument_id)
1083 {
1084 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1085 self.client.unsubscribe_funding_rates(cmd)?;
1086 }
1087 Ok(())
1088 }
1089
1090 fn subscribe_instrument_status(
1096 &mut self,
1097 cmd: &SubscribeInstrumentStatus,
1098 ) -> anyhow::Result<()> {
1099 if !self
1100 .subscriptions_instrument_status
1101 .contains(&cmd.instrument_id)
1102 {
1103 self.subscriptions_instrument_status
1104 .insert(cmd.instrument_id);
1105 self.client.subscribe_instrument_status(cmd)?;
1106 }
1107 Ok(())
1108 }
1109
1110 fn unsubscribe_instrument_status(
1116 &mut self,
1117 cmd: &UnsubscribeInstrumentStatus,
1118 ) -> anyhow::Result<()> {
1119 if self
1120 .subscriptions_instrument_status
1121 .contains(&cmd.instrument_id)
1122 {
1123 self.subscriptions_instrument_status
1124 .remove(&cmd.instrument_id);
1125 self.client.unsubscribe_instrument_status(cmd)?;
1126 }
1127 Ok(())
1128 }
1129
1130 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1136 if !self
1137 .subscriptions_instrument_close
1138 .contains(&cmd.instrument_id)
1139 {
1140 self.subscriptions_instrument_close
1141 .insert(cmd.instrument_id);
1142 self.client.subscribe_instrument_close(cmd)?;
1143 }
1144 Ok(())
1145 }
1146
1147 fn unsubscribe_instrument_close(
1153 &mut self,
1154 cmd: &UnsubscribeInstrumentClose,
1155 ) -> anyhow::Result<()> {
1156 if self
1157 .subscriptions_instrument_close
1158 .contains(&cmd.instrument_id)
1159 {
1160 self.subscriptions_instrument_close
1161 .remove(&cmd.instrument_id);
1162 self.client.unsubscribe_instrument_close(cmd)?;
1163 }
1164 Ok(())
1165 }
1166
1167 #[cfg(feature = "defi")]
1168 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
1174 if !self.subscriptions_blocks.contains(&cmd.chain) {
1175 self.subscriptions_blocks.insert(cmd.chain);
1176 self.client.subscribe_blocks(cmd)?;
1177 }
1178 Ok(())
1179 }
1180
1181 #[cfg(feature = "defi")]
1182 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
1188 if self.subscriptions_blocks.contains(&cmd.chain) {
1189 self.subscriptions_blocks.remove(&cmd.chain);
1190 self.client.unsubscribe_blocks(cmd)?;
1191 }
1192 Ok(())
1193 }
1194
1195 #[cfg(feature = "defi")]
1196 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
1202 if !self.subscriptions_pools.contains(&cmd.instrument_id) {
1203 self.subscriptions_pools.insert(cmd.instrument_id);
1204 self.client.subscribe_pool(cmd)?;
1205 }
1206 Ok(())
1207 }
1208
1209 #[cfg(feature = "defi")]
1210 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
1216 if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1217 self.subscriptions_pool_swaps.insert(cmd.instrument_id);
1218 self.client.subscribe_pool_swaps(cmd)?;
1219 }
1220 Ok(())
1221 }
1222
1223 #[cfg(feature = "defi")]
1224 fn subscribe_pool_liquidity_updates(
1230 &mut self,
1231 cmd: &SubscribePoolLiquidityUpdates,
1232 ) -> anyhow::Result<()> {
1233 if !self
1234 .subscriptions_pool_liquidity_updates
1235 .contains(&cmd.instrument_id)
1236 {
1237 self.subscriptions_pool_liquidity_updates
1238 .insert(cmd.instrument_id);
1239 self.client.subscribe_pool_liquidity_updates(cmd)?;
1240 }
1241 Ok(())
1242 }
1243
1244 #[cfg(feature = "defi")]
1245 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
1251 if self.subscriptions_pools.contains(&cmd.instrument_id) {
1252 self.subscriptions_pools.remove(&cmd.instrument_id);
1253 self.client.unsubscribe_pool(cmd)?;
1254 }
1255 Ok(())
1256 }
1257
1258 #[cfg(feature = "defi")]
1259 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
1265 if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
1266 self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
1267 self.client.unsubscribe_pool_swaps(cmd)?;
1268 }
1269 Ok(())
1270 }
1271
1272 #[cfg(feature = "defi")]
1273 fn unsubscribe_pool_liquidity_updates(
1279 &mut self,
1280 cmd: &UnsubscribePoolLiquidityUpdates,
1281 ) -> anyhow::Result<()> {
1282 if self
1283 .subscriptions_pool_liquidity_updates
1284 .contains(&cmd.instrument_id)
1285 {
1286 self.subscriptions_pool_liquidity_updates
1287 .remove(&cmd.instrument_id);
1288 self.client.unsubscribe_pool_liquidity_updates(cmd)?;
1289 }
1290 Ok(())
1291 }
1292
1293 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1301 self.client.request_data(req)
1302 }
1303
1304 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1310 self.client.request_instrument(req)
1311 }
1312
1313 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1319 self.client.request_instruments(req)
1320 }
1321
1322 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1328 self.client.request_quotes(req)
1329 }
1330
1331 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1337 self.client.request_trades(req)
1338 }
1339
1340 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1346 self.client.request_bars(req)
1347 }
1348}
1349
1350#[inline(always)]
1351fn log_not_implemented<T: Debug>(msg: &T) {
1352 log::warn!("{msg:?} – handler not implemented");
1353}
1354
1355#[inline(always)]
1356fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1357 log::error!("Error on {cmd:?}: {e}");
1358}