1use std::{
22 any::Any,
23 fmt::{Debug, Display},
24 ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28#[cfg(feature = "defi")]
29use alloy_primitives::Address;
30use nautilus_common::messages::data::{
31 RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
32 RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
33 SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeIndexPrices,
34 SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
35 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
36 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
37 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
38 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
39 UnsubscribeTrades,
40};
41#[cfg(feature = "defi")]
42use nautilus_common::messages::defi::{
43 DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
44 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
45 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50 data::{BarType, DataType},
51 identifiers::{ClientId, InstrumentId, Venue},
52};
53
54#[async_trait::async_trait]
56pub trait DataClient: Any + Sync + Send {
57 fn client_id(&self) -> ClientId;
59
60 fn venue(&self) -> Option<Venue>;
62
63 fn start(&mut self) -> anyhow::Result<()>;
69
70 fn stop(&mut self) -> anyhow::Result<()>;
76
77 fn reset(&mut self) -> anyhow::Result<()>;
83
84 fn dispose(&mut self) -> anyhow::Result<()>;
90
91 async fn connect(&mut self) -> anyhow::Result<()>;
97
98 async fn disconnect(&mut self) -> anyhow::Result<()>;
104
105 fn is_connected(&self) -> bool;
107
108 fn is_disconnected(&self) -> bool;
110
111 fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
117 log_not_implemented(&cmd);
118 Ok(())
119 }
120
121 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
127 log_not_implemented(&cmd);
128 Ok(())
129 }
130
131 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
137 log_not_implemented(&cmd);
138 Ok(())
139 }
140
141 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
147 log_not_implemented(&cmd);
148 Ok(())
149 }
150
151 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
157 log_not_implemented(&cmd);
158 Ok(())
159 }
160
161 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
167 log_not_implemented(&cmd);
168 Ok(())
169 }
170
171 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
177 log_not_implemented(&cmd);
178 Ok(())
179 }
180
181 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
187 log_not_implemented(&cmd);
188 Ok(())
189 }
190
191 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
197 log_not_implemented(&cmd);
198 Ok(())
199 }
200
201 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
207 log_not_implemented(&cmd);
208 Ok(())
209 }
210
211 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
217 log_not_implemented(&cmd);
218 Ok(())
219 }
220
221 fn subscribe_instrument_status(
227 &mut self,
228 cmd: &SubscribeInstrumentStatus,
229 ) -> anyhow::Result<()> {
230 log_not_implemented(&cmd);
231 Ok(())
232 }
233
234 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
240 log_not_implemented(&cmd);
241 Ok(())
242 }
243
244 #[cfg(feature = "defi")]
245 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
251 log_not_implemented(&cmd);
252 Ok(())
253 }
254
255 #[cfg(feature = "defi")]
256 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
262 log_not_implemented(&cmd);
263 Ok(())
264 }
265
266 #[cfg(feature = "defi")]
267 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
273 log_not_implemented(&cmd);
274 Ok(())
275 }
276
277 #[cfg(feature = "defi")]
278 fn subscribe_pool_liquidity_updates(
284 &mut self,
285 cmd: &SubscribePoolLiquidityUpdates,
286 ) -> anyhow::Result<()> {
287 log_not_implemented(&cmd);
288 Ok(())
289 }
290
291 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
297 log_not_implemented(&cmd);
298 Ok(())
299 }
300
301 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
307 log_not_implemented(&cmd);
308 Ok(())
309 }
310
311 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
317 log_not_implemented(&cmd);
318 Ok(())
319 }
320
321 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
327 log_not_implemented(&cmd);
328 Ok(())
329 }
330
331 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
337 log_not_implemented(&cmd);
338 Ok(())
339 }
340
341 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
347 log_not_implemented(&cmd);
348 Ok(())
349 }
350
351 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
357 log_not_implemented(&cmd);
358 Ok(())
359 }
360
361 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
367 log_not_implemented(&cmd);
368 Ok(())
369 }
370
371 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
377 log_not_implemented(&cmd);
378 Ok(())
379 }
380
381 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
387 log_not_implemented(&cmd);
388 Ok(())
389 }
390
391 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
397 log_not_implemented(&cmd);
398 Ok(())
399 }
400
401 fn unsubscribe_instrument_status(
407 &mut self,
408 cmd: &UnsubscribeInstrumentStatus,
409 ) -> anyhow::Result<()> {
410 log_not_implemented(&cmd);
411 Ok(())
412 }
413
414 fn unsubscribe_instrument_close(
420 &mut self,
421 cmd: &UnsubscribeInstrumentClose,
422 ) -> anyhow::Result<()> {
423 log_not_implemented(&cmd);
424 Ok(())
425 }
426
427 #[cfg(feature = "defi")]
428 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
434 log_not_implemented(&cmd);
435 Ok(())
436 }
437
438 #[cfg(feature = "defi")]
439 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
445 log_not_implemented(&cmd);
446 Ok(())
447 }
448
449 #[cfg(feature = "defi")]
450 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
456 log_not_implemented(&cmd);
457 Ok(())
458 }
459
460 #[cfg(feature = "defi")]
461 fn unsubscribe_pool_liquidity_updates(
467 &mut self,
468 cmd: &UnsubscribePoolLiquidityUpdates,
469 ) -> anyhow::Result<()> {
470 log_not_implemented(&cmd);
471 Ok(())
472 }
473
474 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
480 log_not_implemented(&request);
481 Ok(())
482 }
483
484 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
490 log_not_implemented(&request);
491 Ok(())
492 }
493
494 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
500 log_not_implemented(&request);
501 Ok(())
502 }
503
504 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
510 log_not_implemented(&request);
511 Ok(())
512 }
513
514 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
520 log_not_implemented(&request);
521 Ok(())
522 }
523
524 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
530 log_not_implemented(&request);
531 Ok(())
532 }
533
534 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
540 log_not_implemented(&request);
541 Ok(())
542 }
543}
544
545pub struct DataClientAdapter {
547 client: Box<dyn DataClient>,
548 pub client_id: ClientId,
549 pub venue: Option<Venue>,
550 pub handles_book_deltas: bool,
551 pub handles_book_snapshots: bool,
552 pub subscriptions_custom: AHashSet<DataType>,
553 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
554 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
555 pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
556 pub subscriptions_quotes: AHashSet<InstrumentId>,
557 pub subscriptions_trades: AHashSet<InstrumentId>,
558 pub subscriptions_bars: AHashSet<BarType>,
559 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
560 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
561 pub subscriptions_instrument: AHashSet<InstrumentId>,
562 pub subscriptions_instrument_venue: AHashSet<Venue>,
563 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
564 pub subscriptions_index_prices: AHashSet<InstrumentId>,
565 #[cfg(feature = "defi")]
566 pub subscriptions_blocks: AHashSet<Blockchain>,
567 #[cfg(feature = "defi")]
568 pub subscriptions_pools: AHashSet<Address>,
569 #[cfg(feature = "defi")]
570 pub subscriptions_pool_swaps: AHashSet<Address>,
571 #[cfg(feature = "defi")]
572 pub subscriptions_pool_liquidity_updates: AHashSet<Address>,
573}
574
575impl Deref for DataClientAdapter {
576 type Target = Box<dyn DataClient>;
577
578 fn deref(&self) -> &Self::Target {
579 &self.client
580 }
581}
582
583impl DerefMut for DataClientAdapter {
584 fn deref_mut(&mut self) -> &mut Self::Target {
585 &mut self.client
586 }
587}
588
589impl Debug for DataClientAdapter {
590 #[rustfmt::skip]
591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592 f.debug_struct(stringify!(DataClientAdapter))
593 .field("client_id", &self.client_id)
594 .field("venue", &self.venue)
595 .field("handles_book_deltas", &self.handles_book_deltas)
596 .field("handles_book_snapshots", &self.handles_book_snapshots)
597 .field("subscriptions_custom", &self.subscriptions_custom)
598 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
599 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
600 .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
601 .field("subscriptions_quotes", &self.subscriptions_quotes)
602 .field("subscriptions_trades", &self.subscriptions_trades)
603 .field("subscriptions_bars", &self.subscriptions_bars)
604 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
605 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
606 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
607 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
608 .field("subscriptions_instrument", &self.subscriptions_instrument)
609 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
610 .finish()
611 }
612}
613
614impl DataClientAdapter {
615 #[must_use]
617 pub fn new(
618 client_id: ClientId,
619 venue: Option<Venue>,
620 handles_order_book_deltas: bool,
621 handles_order_book_snapshots: bool,
622 client: Box<dyn DataClient>,
623 ) -> Self {
624 Self {
625 client,
626 client_id,
627 venue,
628 handles_book_deltas: handles_order_book_deltas,
629 handles_book_snapshots: handles_order_book_snapshots,
630 subscriptions_custom: AHashSet::new(),
631 subscriptions_book_deltas: AHashSet::new(),
632 subscriptions_book_depth10: AHashSet::new(),
633 subscriptions_book_snapshots: AHashSet::new(),
634 subscriptions_quotes: AHashSet::new(),
635 subscriptions_trades: AHashSet::new(),
636 subscriptions_mark_prices: AHashSet::new(),
637 subscriptions_index_prices: AHashSet::new(),
638 subscriptions_bars: AHashSet::new(),
639 subscriptions_instrument_status: AHashSet::new(),
640 subscriptions_instrument_close: AHashSet::new(),
641 subscriptions_instrument: AHashSet::new(),
642 subscriptions_instrument_venue: AHashSet::new(),
643 #[cfg(feature = "defi")]
644 subscriptions_blocks: AHashSet::new(),
645 #[cfg(feature = "defi")]
646 subscriptions_pools: AHashSet::new(),
647 #[cfg(feature = "defi")]
648 subscriptions_pool_swaps: AHashSet::new(),
649 #[cfg(feature = "defi")]
650 subscriptions_pool_liquidity_updates: AHashSet::new(),
651 }
652 }
653
654 #[allow(clippy::borrowed_box)]
655 #[must_use]
656 pub fn get_client(&self) -> &Box<dyn DataClient> {
657 &self.client
658 }
659
660 #[inline]
661 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
662 if let Err(e) = match cmd {
663 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
664 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
665 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
666 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
667 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
668 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
669 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
670 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
671 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
672 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
673 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
674 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
675 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
676 } {
677 log_command_error(&cmd, &e);
678 }
679 }
680
681 #[cfg(feature = "defi")]
682 #[inline]
683 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
684 if let Err(e) = match cmd {
685 DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
686 DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
687 DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_swaps(cmd),
688 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
689 self.subscribe_pool_liquidity_updates(cmd)
690 }
691 } {
692 log_command_error(&cmd, &e);
693 }
694 }
695
696 #[inline]
697 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
698 if let Err(e) = match cmd {
699 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
700 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
701 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
702 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
703 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
704 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
705 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
706 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
707 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
708 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
709 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
710 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
711 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
712 } {
713 log_command_error(&cmd, &e);
714 }
715 }
716
717 #[cfg(feature = "defi")]
718 #[inline]
719 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
720 if let Err(e) = match cmd {
721 DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
722 DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
723 DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_swaps(cmd),
724 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
725 self.unsubscribe_pool_liquidity_updates(cmd)
726 }
727 } {
728 log_command_error(&cmd, &e);
729 }
730 }
731
732 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
740 if !self.subscriptions_custom.contains(&cmd.data_type) {
741 self.subscriptions_custom.insert(cmd.data_type.clone());
742 self.client.subscribe(cmd)?;
743 }
744 Ok(())
745 }
746
747 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
753 if self.subscriptions_custom.contains(&cmd.data_type) {
754 self.subscriptions_custom.remove(&cmd.data_type);
755 self.client.unsubscribe(cmd)?;
756 }
757 Ok(())
758 }
759
760 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
766 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
767 self.subscriptions_instrument_venue.insert(cmd.venue);
768 self.client.subscribe_instruments(cmd)?;
769 }
770
771 Ok(())
772 }
773
774 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
780 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
781 self.subscriptions_instrument_venue.remove(&cmd.venue);
782 self.client.unsubscribe_instruments(cmd)?;
783 }
784
785 Ok(())
786 }
787
788 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
794 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
795 self.subscriptions_instrument.insert(cmd.instrument_id);
796 self.client.subscribe_instrument(cmd)?;
797 }
798
799 Ok(())
800 }
801
802 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
808 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
809 self.subscriptions_instrument.remove(&cmd.instrument_id);
810 self.client.unsubscribe_instrument(cmd)?;
811 }
812
813 Ok(())
814 }
815
816 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
822 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
823 self.subscriptions_book_deltas.insert(cmd.instrument_id);
824 self.client.subscribe_book_deltas(cmd)?;
825 }
826
827 Ok(())
828 }
829
830 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
836 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
837 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
838 self.client.unsubscribe_book_deltas(cmd)?;
839 }
840
841 Ok(())
842 }
843
844 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
850 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
851 self.subscriptions_book_depth10.insert(cmd.instrument_id);
852 self.client.subscribe_book_depth10(cmd)?;
853 }
854
855 Ok(())
856 }
857
858 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
864 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
865 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
866 self.client.unsubscribe_book_depth10(cmd)?;
867 }
868
869 Ok(())
870 }
871
872 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
878 if !self
879 .subscriptions_book_snapshots
880 .contains(&cmd.instrument_id)
881 {
882 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
883 self.client.subscribe_book_snapshots(cmd)?;
884 }
885
886 Ok(())
887 }
888
889 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
895 if self
896 .subscriptions_book_snapshots
897 .contains(&cmd.instrument_id)
898 {
899 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
900 self.client.unsubscribe_book_snapshots(cmd)?;
901 }
902
903 Ok(())
904 }
905
906 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
912 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
913 self.subscriptions_quotes.insert(cmd.instrument_id);
914 self.client.subscribe_quotes(cmd)?;
915 }
916 Ok(())
917 }
918
919 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
925 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
926 self.subscriptions_quotes.remove(&cmd.instrument_id);
927 self.client.unsubscribe_quotes(cmd)?;
928 }
929 Ok(())
930 }
931
932 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
938 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
939 self.subscriptions_trades.insert(cmd.instrument_id);
940 self.client.subscribe_trades(cmd)?;
941 }
942 Ok(())
943 }
944
945 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
951 if self.subscriptions_trades.contains(&cmd.instrument_id) {
952 self.subscriptions_trades.remove(&cmd.instrument_id);
953 self.client.unsubscribe_trades(cmd)?;
954 }
955 Ok(())
956 }
957
958 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
964 if !self.subscriptions_bars.contains(&cmd.bar_type) {
965 self.subscriptions_bars.insert(cmd.bar_type);
966 self.client.subscribe_bars(cmd)?;
967 }
968 Ok(())
969 }
970
971 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
977 if self.subscriptions_bars.contains(&cmd.bar_type) {
978 self.subscriptions_bars.remove(&cmd.bar_type);
979 self.client.unsubscribe_bars(cmd)?;
980 }
981 Ok(())
982 }
983
984 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
990 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
991 self.subscriptions_mark_prices.insert(cmd.instrument_id);
992 self.client.subscribe_mark_prices(cmd)?;
993 }
994 Ok(())
995 }
996
997 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1003 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1004 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1005 self.client.unsubscribe_mark_prices(cmd)?;
1006 }
1007 Ok(())
1008 }
1009
1010 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1016 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1017 self.subscriptions_index_prices.insert(cmd.instrument_id);
1018 self.client.subscribe_index_prices(cmd)?;
1019 }
1020 Ok(())
1021 }
1022
1023 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1029 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1030 self.subscriptions_index_prices.remove(&cmd.instrument_id);
1031 self.client.unsubscribe_index_prices(cmd)?;
1032 }
1033 Ok(())
1034 }
1035
1036 fn subscribe_instrument_status(
1042 &mut self,
1043 cmd: &SubscribeInstrumentStatus,
1044 ) -> anyhow::Result<()> {
1045 if !self
1046 .subscriptions_instrument_status
1047 .contains(&cmd.instrument_id)
1048 {
1049 self.subscriptions_instrument_status
1050 .insert(cmd.instrument_id);
1051 self.client.subscribe_instrument_status(cmd)?;
1052 }
1053 Ok(())
1054 }
1055
1056 fn unsubscribe_instrument_status(
1062 &mut self,
1063 cmd: &UnsubscribeInstrumentStatus,
1064 ) -> anyhow::Result<()> {
1065 if self
1066 .subscriptions_instrument_status
1067 .contains(&cmd.instrument_id)
1068 {
1069 self.subscriptions_instrument_status
1070 .remove(&cmd.instrument_id);
1071 self.client.unsubscribe_instrument_status(cmd)?;
1072 }
1073 Ok(())
1074 }
1075
1076 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1082 if !self
1083 .subscriptions_instrument_close
1084 .contains(&cmd.instrument_id)
1085 {
1086 self.subscriptions_instrument_close
1087 .insert(cmd.instrument_id);
1088 self.client.subscribe_instrument_close(cmd)?;
1089 }
1090 Ok(())
1091 }
1092
1093 fn unsubscribe_instrument_close(
1099 &mut self,
1100 cmd: &UnsubscribeInstrumentClose,
1101 ) -> anyhow::Result<()> {
1102 if self
1103 .subscriptions_instrument_close
1104 .contains(&cmd.instrument_id)
1105 {
1106 self.subscriptions_instrument_close
1107 .remove(&cmd.instrument_id);
1108 self.client.unsubscribe_instrument_close(cmd)?;
1109 }
1110 Ok(())
1111 }
1112
1113 #[cfg(feature = "defi")]
1114 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
1120 if !self.subscriptions_blocks.contains(&cmd.chain) {
1121 self.subscriptions_blocks.insert(cmd.chain);
1122 self.client.subscribe_blocks(cmd)?;
1123 }
1124 Ok(())
1125 }
1126
1127 #[cfg(feature = "defi")]
1128 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
1134 if self.subscriptions_blocks.contains(&cmd.chain) {
1135 self.subscriptions_blocks.remove(&cmd.chain);
1136 self.client.unsubscribe_blocks(cmd)?;
1137 }
1138 Ok(())
1139 }
1140
1141 #[cfg(feature = "defi")]
1142 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
1148 if !self.subscriptions_pools.contains(&cmd.address) {
1149 self.subscriptions_pools.insert(cmd.address);
1150 self.client.subscribe_pool(cmd)?;
1151 }
1152 Ok(())
1153 }
1154
1155 #[cfg(feature = "defi")]
1156 fn subscribe_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
1162 if !self.subscriptions_pool_swaps.contains(&cmd.address) {
1163 self.subscriptions_pool_swaps.insert(cmd.address);
1164 self.client.subscribe_pool_swaps(cmd)?;
1165 }
1166 Ok(())
1167 }
1168
1169 #[cfg(feature = "defi")]
1170 fn subscribe_pool_liquidity_updates(
1176 &mut self,
1177 cmd: &SubscribePoolLiquidityUpdates,
1178 ) -> anyhow::Result<()> {
1179 if !self
1180 .subscriptions_pool_liquidity_updates
1181 .contains(&cmd.address)
1182 {
1183 self.subscriptions_pool_liquidity_updates
1184 .insert(cmd.address);
1185 self.client.subscribe_pool_liquidity_updates(cmd)?;
1186 }
1187 Ok(())
1188 }
1189
1190 #[cfg(feature = "defi")]
1191 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
1197 if self.subscriptions_pools.contains(&cmd.address) {
1198 self.subscriptions_pools.remove(&cmd.address);
1199 self.client.unsubscribe_pool(cmd)?;
1200 }
1201 Ok(())
1202 }
1203
1204 #[cfg(feature = "defi")]
1205 fn unsubscribe_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
1211 if self.subscriptions_pool_swaps.contains(&cmd.address) {
1212 self.subscriptions_pool_swaps.remove(&cmd.address);
1213 self.client.unsubscribe_pool_swaps(cmd)?;
1214 }
1215 Ok(())
1216 }
1217
1218 #[cfg(feature = "defi")]
1219 fn unsubscribe_pool_liquidity_updates(
1225 &mut self,
1226 cmd: &UnsubscribePoolLiquidityUpdates,
1227 ) -> anyhow::Result<()> {
1228 if self
1229 .subscriptions_pool_liquidity_updates
1230 .contains(&cmd.address)
1231 {
1232 self.subscriptions_pool_liquidity_updates
1233 .remove(&cmd.address);
1234 self.client.unsubscribe_pool_liquidity_updates(cmd)?;
1235 }
1236 Ok(())
1237 }
1238
1239 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1247 self.client.request_data(req)
1248 }
1249
1250 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1256 self.client.request_instrument(req)
1257 }
1258
1259 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1265 self.client.request_instruments(req)
1266 }
1267
1268 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1274 self.client.request_quotes(req)
1275 }
1276
1277 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1283 self.client.request_trades(req)
1284 }
1285
1286 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1292 self.client.request_bars(req)
1293 }
1294}
1295
1296#[inline(always)]
1297fn log_not_implemented<T: Debug>(msg: &T) {
1298 log::warn!("{msg:?} – handler not implemented");
1299}
1300
1301#[inline(always)]
1302fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1303 log::error!("Error on {cmd:?}: {e}");
1304}