1use std::{
22 fmt::{Debug, Display},
23 ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use async_trait::async_trait;
28use nautilus_common::messages::data::{
29 RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
30 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
32 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
33 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
34 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
35 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
36 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
37 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
38 UnsubscribeTrades,
39};
40#[cfg(feature = "defi")]
41use nautilus_common::messages::defi::{
42 RequestPoolSnapshot, SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects,
43 SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
44 UnsubscribePool, UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
45 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
46};
47#[cfg(feature = "defi")]
48use nautilus_model::defi::Blockchain;
49use nautilus_model::{
50 data::{BarType, DataType},
51 identifiers::{ClientId, InstrumentId, Venue},
52};
53
54#[cfg(feature = "defi")]
55#[allow(unused_imports)] use crate::defi::client as _;
57
58#[async_trait(?Send)]
65pub trait DataClient {
66 fn client_id(&self) -> ClientId;
68
69 fn venue(&self) -> Option<Venue>;
71
72 fn start(&mut self) -> anyhow::Result<()>;
78
79 fn stop(&mut self) -> anyhow::Result<()>;
85
86 fn reset(&mut self) -> anyhow::Result<()>;
92
93 fn dispose(&mut self) -> anyhow::Result<()>;
99
100 fn is_connected(&self) -> bool;
102
103 fn is_disconnected(&self) -> bool;
105
106 async fn connect(&mut self) -> anyhow::Result<()> {
115 Ok(())
116 }
117
118 async fn disconnect(&mut self) -> anyhow::Result<()> {
127 Ok(())
128 }
129
130 fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
136 log_not_implemented(&cmd);
137 Ok(())
138 }
139
140 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
146 log_not_implemented(&cmd);
147 Ok(())
148 }
149
150 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
156 log_not_implemented(&cmd);
157 Ok(())
158 }
159
160 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
166 log_not_implemented(&cmd);
167 Ok(())
168 }
169
170 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
176 log_not_implemented(&cmd);
177 Ok(())
178 }
179
180 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
186 log_not_implemented(&cmd);
187 Ok(())
188 }
189
190 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
196 log_not_implemented(&cmd);
197 Ok(())
198 }
199
200 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
206 log_not_implemented(&cmd);
207 Ok(())
208 }
209
210 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
216 log_not_implemented(&cmd);
217 Ok(())
218 }
219
220 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
226 log_not_implemented(&cmd);
227 Ok(())
228 }
229
230 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
236 log_not_implemented(&cmd);
237 Ok(())
238 }
239
240 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
246 log_not_implemented(&cmd);
247 Ok(())
248 }
249
250 fn subscribe_instrument_status(
256 &mut self,
257 cmd: &SubscribeInstrumentStatus,
258 ) -> anyhow::Result<()> {
259 log_not_implemented(&cmd);
260 Ok(())
261 }
262
263 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
269 log_not_implemented(&cmd);
270 Ok(())
271 }
272
273 #[cfg(feature = "defi")]
274 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
280 log_not_implemented(&cmd);
281 Ok(())
282 }
283
284 #[cfg(feature = "defi")]
285 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
291 log_not_implemented(&cmd);
292 Ok(())
293 }
294
295 #[cfg(feature = "defi")]
296 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
302 log_not_implemented(&cmd);
303 Ok(())
304 }
305
306 #[cfg(feature = "defi")]
307 fn subscribe_pool_liquidity_updates(
313 &mut self,
314 cmd: &SubscribePoolLiquidityUpdates,
315 ) -> anyhow::Result<()> {
316 log_not_implemented(&cmd);
317 Ok(())
318 }
319
320 #[cfg(feature = "defi")]
321 fn subscribe_pool_fee_collects(
327 &mut self,
328 cmd: &SubscribePoolFeeCollects,
329 ) -> anyhow::Result<()> {
330 log_not_implemented(&cmd);
331 Ok(())
332 }
333
334 #[cfg(feature = "defi")]
335 fn subscribe_pool_flash_events(
341 &mut self,
342 cmd: &SubscribePoolFlashEvents,
343 ) -> anyhow::Result<()> {
344 log_not_implemented(&cmd);
345 Ok(())
346 }
347
348 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
354 log_not_implemented(&cmd);
355 Ok(())
356 }
357
358 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
364 log_not_implemented(&cmd);
365 Ok(())
366 }
367
368 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
374 log_not_implemented(&cmd);
375 Ok(())
376 }
377
378 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
384 log_not_implemented(&cmd);
385 Ok(())
386 }
387
388 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
394 log_not_implemented(&cmd);
395 Ok(())
396 }
397
398 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
404 log_not_implemented(&cmd);
405 Ok(())
406 }
407
408 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
414 log_not_implemented(&cmd);
415 Ok(())
416 }
417
418 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
424 log_not_implemented(&cmd);
425 Ok(())
426 }
427
428 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
434 log_not_implemented(&cmd);
435 Ok(())
436 }
437
438 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
444 log_not_implemented(&cmd);
445 Ok(())
446 }
447
448 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
454 log_not_implemented(&cmd);
455 Ok(())
456 }
457
458 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
464 log_not_implemented(&cmd);
465 Ok(())
466 }
467
468 fn unsubscribe_instrument_status(
474 &mut self,
475 cmd: &UnsubscribeInstrumentStatus,
476 ) -> anyhow::Result<()> {
477 log_not_implemented(&cmd);
478 Ok(())
479 }
480
481 fn unsubscribe_instrument_close(
487 &mut self,
488 cmd: &UnsubscribeInstrumentClose,
489 ) -> anyhow::Result<()> {
490 log_not_implemented(&cmd);
491 Ok(())
492 }
493
494 #[cfg(feature = "defi")]
495 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
501 log_not_implemented(&cmd);
502 Ok(())
503 }
504
505 #[cfg(feature = "defi")]
506 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
512 log_not_implemented(&cmd);
513 Ok(())
514 }
515
516 #[cfg(feature = "defi")]
517 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
523 log_not_implemented(&cmd);
524 Ok(())
525 }
526
527 #[cfg(feature = "defi")]
528 fn unsubscribe_pool_liquidity_updates(
534 &mut self,
535 cmd: &UnsubscribePoolLiquidityUpdates,
536 ) -> anyhow::Result<()> {
537 log_not_implemented(&cmd);
538 Ok(())
539 }
540
541 #[cfg(feature = "defi")]
542 fn unsubscribe_pool_fee_collects(
548 &mut self,
549 cmd: &UnsubscribePoolFeeCollects,
550 ) -> anyhow::Result<()> {
551 log_not_implemented(&cmd);
552 Ok(())
553 }
554
555 #[cfg(feature = "defi")]
556 fn unsubscribe_pool_flash_events(
562 &mut self,
563 cmd: &UnsubscribePoolFlashEvents,
564 ) -> anyhow::Result<()> {
565 log_not_implemented(&cmd);
566 Ok(())
567 }
568
569 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
575 log_not_implemented(&request);
576 Ok(())
577 }
578
579 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
585 log_not_implemented(&request);
586 Ok(())
587 }
588
589 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
595 log_not_implemented(&request);
596 Ok(())
597 }
598
599 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
605 log_not_implemented(&request);
606 Ok(())
607 }
608
609 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
615 log_not_implemented(&request);
616 Ok(())
617 }
618
619 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
625 log_not_implemented(&request);
626 Ok(())
627 }
628
629 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
635 log_not_implemented(&request);
636 Ok(())
637 }
638
639 fn request_book_depth(&self, request: &RequestBookDepth) -> anyhow::Result<()> {
645 log_not_implemented(&request);
646 Ok(())
647 }
648
649 #[cfg(feature = "defi")]
650 fn request_pool_snapshot(&self, request: &RequestPoolSnapshot) -> anyhow::Result<()> {
656 log_not_implemented(&request);
657 Ok(())
658 }
659}
660
661pub struct DataClientAdapter {
663 pub(crate) client: Box<dyn DataClient>,
664 pub client_id: ClientId,
665 pub venue: Option<Venue>,
666 pub handles_book_deltas: bool,
667 pub handles_book_snapshots: bool,
668 pub subscriptions_custom: AHashSet<DataType>,
669 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
670 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
671 pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
672 pub subscriptions_quotes: AHashSet<InstrumentId>,
673 pub subscriptions_trades: AHashSet<InstrumentId>,
674 pub subscriptions_bars: AHashSet<BarType>,
675 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
676 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
677 pub subscriptions_instrument: AHashSet<InstrumentId>,
678 pub subscriptions_instrument_venue: AHashSet<Venue>,
679 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
680 pub subscriptions_index_prices: AHashSet<InstrumentId>,
681 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
682 #[cfg(feature = "defi")]
683 pub subscriptions_blocks: AHashSet<Blockchain>,
684 #[cfg(feature = "defi")]
685 pub subscriptions_pools: AHashSet<InstrumentId>,
686 #[cfg(feature = "defi")]
687 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
688 #[cfg(feature = "defi")]
689 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
690 #[cfg(feature = "defi")]
691 pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
692 #[cfg(feature = "defi")]
693 pub subscriptions_pool_flash: AHashSet<InstrumentId>,
694}
695
696impl Deref for DataClientAdapter {
697 type Target = Box<dyn DataClient>;
698
699 fn deref(&self) -> &Self::Target {
700 &self.client
701 }
702}
703
704impl DerefMut for DataClientAdapter {
705 fn deref_mut(&mut self) -> &mut Self::Target {
706 &mut self.client
707 }
708}
709
710impl Debug for DataClientAdapter {
711 #[rustfmt::skip]
712 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
713 f.debug_struct(stringify!(DataClientAdapter))
714 .field("client_id", &self.client_id)
715 .field("venue", &self.venue)
716 .field("handles_book_deltas", &self.handles_book_deltas)
717 .field("handles_book_snapshots", &self.handles_book_snapshots)
718 .field("subscriptions_custom", &self.subscriptions_custom)
719 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
720 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
721 .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
722 .field("subscriptions_quotes", &self.subscriptions_quotes)
723 .field("subscriptions_trades", &self.subscriptions_trades)
724 .field("subscriptions_bars", &self.subscriptions_bars)
725 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
726 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
727 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
728 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
729 .field("subscriptions_instrument", &self.subscriptions_instrument)
730 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
731 .finish()
732 }
733}
734
735impl DataClientAdapter {
736 #[must_use]
738 pub fn new(
739 client_id: ClientId,
740 venue: Option<Venue>,
741 handles_order_book_deltas: bool,
742 handles_order_book_snapshots: bool,
743 client: Box<dyn DataClient>,
744 ) -> Self {
745 Self {
746 client,
747 client_id,
748 venue,
749 handles_book_deltas: handles_order_book_deltas,
750 handles_book_snapshots: handles_order_book_snapshots,
751 subscriptions_custom: AHashSet::new(),
752 subscriptions_book_deltas: AHashSet::new(),
753 subscriptions_book_depth10: AHashSet::new(),
754 subscriptions_book_snapshots: AHashSet::new(),
755 subscriptions_quotes: AHashSet::new(),
756 subscriptions_trades: AHashSet::new(),
757 subscriptions_mark_prices: AHashSet::new(),
758 subscriptions_index_prices: AHashSet::new(),
759 subscriptions_funding_rates: AHashSet::new(),
760 subscriptions_bars: AHashSet::new(),
761 subscriptions_instrument_status: AHashSet::new(),
762 subscriptions_instrument_close: AHashSet::new(),
763 subscriptions_instrument: AHashSet::new(),
764 subscriptions_instrument_venue: AHashSet::new(),
765 #[cfg(feature = "defi")]
766 subscriptions_blocks: AHashSet::new(),
767 #[cfg(feature = "defi")]
768 subscriptions_pools: AHashSet::new(),
769 #[cfg(feature = "defi")]
770 subscriptions_pool_swaps: AHashSet::new(),
771 #[cfg(feature = "defi")]
772 subscriptions_pool_liquidity_updates: AHashSet::new(),
773 #[cfg(feature = "defi")]
774 subscriptions_pool_fee_collects: AHashSet::new(),
775 #[cfg(feature = "defi")]
776 subscriptions_pool_flash: AHashSet::new(),
777 }
778 }
779
780 #[allow(clippy::borrowed_box)]
781 #[must_use]
782 pub fn get_client(&self) -> &Box<dyn DataClient> {
783 &self.client
784 }
785
786 pub async fn connect(&mut self) -> anyhow::Result<()> {
792 self.client.connect().await
793 }
794
795 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
801 self.client.disconnect().await
802 }
803
804 #[inline]
805 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
806 if let Err(e) = match cmd {
807 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
808 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
809 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
810 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
811 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
812 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
813 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
814 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
815 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
816 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
817 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
818 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
819 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
820 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
821 } {
822 log_command_error(&cmd, &e);
823 }
824 }
825
826 #[inline]
827 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
828 if let Err(e) = match cmd {
829 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
830 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
831 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
832 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
833 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
834 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
835 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
836 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
837 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
838 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
839 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
840 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
841 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
842 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
843 } {
844 log_command_error(&cmd, &e);
845 }
846 }
847
848 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
856 if !self.subscriptions_custom.contains(&cmd.data_type) {
857 self.subscriptions_custom.insert(cmd.data_type.clone());
858 self.client.subscribe(cmd)?;
859 }
860 Ok(())
861 }
862
863 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
869 if self.subscriptions_custom.contains(&cmd.data_type) {
870 self.subscriptions_custom.remove(&cmd.data_type);
871 self.client.unsubscribe(cmd)?;
872 }
873 Ok(())
874 }
875
876 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
882 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
883 self.subscriptions_instrument_venue.insert(cmd.venue);
884 self.client.subscribe_instruments(cmd)?;
885 }
886
887 Ok(())
888 }
889
890 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
896 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
897 self.subscriptions_instrument_venue.remove(&cmd.venue);
898 self.client.unsubscribe_instruments(cmd)?;
899 }
900
901 Ok(())
902 }
903
904 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
910 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
911 self.subscriptions_instrument.insert(cmd.instrument_id);
912 self.client.subscribe_instrument(cmd)?;
913 }
914
915 Ok(())
916 }
917
918 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
924 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
925 self.subscriptions_instrument.remove(&cmd.instrument_id);
926 self.client.unsubscribe_instrument(cmd)?;
927 }
928
929 Ok(())
930 }
931
932 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
938 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
939 self.subscriptions_book_deltas.insert(cmd.instrument_id);
940 self.client.subscribe_book_deltas(cmd)?;
941 }
942
943 Ok(())
944 }
945
946 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
952 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
953 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
954 self.client.unsubscribe_book_deltas(cmd)?;
955 }
956
957 Ok(())
958 }
959
960 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
966 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
967 self.subscriptions_book_depth10.insert(cmd.instrument_id);
968 self.client.subscribe_book_depth10(cmd)?;
969 }
970
971 Ok(())
972 }
973
974 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
980 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
981 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
982 self.client.unsubscribe_book_depth10(cmd)?;
983 }
984
985 Ok(())
986 }
987
988 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
994 if !self
995 .subscriptions_book_snapshots
996 .contains(&cmd.instrument_id)
997 {
998 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
999 self.client.subscribe_book_snapshots(cmd)?;
1000 }
1001
1002 Ok(())
1003 }
1004
1005 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1011 if self
1012 .subscriptions_book_snapshots
1013 .contains(&cmd.instrument_id)
1014 {
1015 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
1016 self.client.unsubscribe_book_snapshots(cmd)?;
1017 }
1018
1019 Ok(())
1020 }
1021
1022 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
1028 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
1029 self.subscriptions_quotes.insert(cmd.instrument_id);
1030 self.client.subscribe_quotes(cmd)?;
1031 }
1032 Ok(())
1033 }
1034
1035 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1041 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
1042 self.subscriptions_quotes.remove(&cmd.instrument_id);
1043 self.client.unsubscribe_quotes(cmd)?;
1044 }
1045 Ok(())
1046 }
1047
1048 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1054 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
1055 self.subscriptions_trades.insert(cmd.instrument_id);
1056 self.client.subscribe_trades(cmd)?;
1057 }
1058 Ok(())
1059 }
1060
1061 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1067 if self.subscriptions_trades.contains(&cmd.instrument_id) {
1068 self.subscriptions_trades.remove(&cmd.instrument_id);
1069 self.client.unsubscribe_trades(cmd)?;
1070 }
1071 Ok(())
1072 }
1073
1074 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1080 if !self.subscriptions_bars.contains(&cmd.bar_type) {
1081 self.subscriptions_bars.insert(cmd.bar_type);
1082 self.client.subscribe_bars(cmd)?;
1083 }
1084 Ok(())
1085 }
1086
1087 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1093 if self.subscriptions_bars.contains(&cmd.bar_type) {
1094 self.subscriptions_bars.remove(&cmd.bar_type);
1095 self.client.unsubscribe_bars(cmd)?;
1096 }
1097 Ok(())
1098 }
1099
1100 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1106 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1107 self.subscriptions_mark_prices.insert(cmd.instrument_id);
1108 self.client.subscribe_mark_prices(cmd)?;
1109 }
1110 Ok(())
1111 }
1112
1113 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1119 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
1120 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
1121 self.client.unsubscribe_mark_prices(cmd)?;
1122 }
1123 Ok(())
1124 }
1125
1126 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1132 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1133 self.subscriptions_index_prices.insert(cmd.instrument_id);
1134 self.client.subscribe_index_prices(cmd)?;
1135 }
1136 Ok(())
1137 }
1138
1139 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1145 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
1146 self.subscriptions_index_prices.remove(&cmd.instrument_id);
1147 self.client.unsubscribe_index_prices(cmd)?;
1148 }
1149 Ok(())
1150 }
1151
1152 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1158 if !self
1159 .subscriptions_funding_rates
1160 .contains(&cmd.instrument_id)
1161 {
1162 self.subscriptions_funding_rates.insert(cmd.instrument_id);
1163 self.client.subscribe_funding_rates(cmd)?;
1164 }
1165 Ok(())
1166 }
1167
1168 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1174 if self
1175 .subscriptions_funding_rates
1176 .contains(&cmd.instrument_id)
1177 {
1178 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
1179 self.client.unsubscribe_funding_rates(cmd)?;
1180 }
1181 Ok(())
1182 }
1183
1184 fn subscribe_instrument_status(
1190 &mut self,
1191 cmd: &SubscribeInstrumentStatus,
1192 ) -> anyhow::Result<()> {
1193 if !self
1194 .subscriptions_instrument_status
1195 .contains(&cmd.instrument_id)
1196 {
1197 self.subscriptions_instrument_status
1198 .insert(cmd.instrument_id);
1199 self.client.subscribe_instrument_status(cmd)?;
1200 }
1201 Ok(())
1202 }
1203
1204 fn unsubscribe_instrument_status(
1210 &mut self,
1211 cmd: &UnsubscribeInstrumentStatus,
1212 ) -> anyhow::Result<()> {
1213 if self
1214 .subscriptions_instrument_status
1215 .contains(&cmd.instrument_id)
1216 {
1217 self.subscriptions_instrument_status
1218 .remove(&cmd.instrument_id);
1219 self.client.unsubscribe_instrument_status(cmd)?;
1220 }
1221 Ok(())
1222 }
1223
1224 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
1230 if !self
1231 .subscriptions_instrument_close
1232 .contains(&cmd.instrument_id)
1233 {
1234 self.subscriptions_instrument_close
1235 .insert(cmd.instrument_id);
1236 self.client.subscribe_instrument_close(cmd)?;
1237 }
1238 Ok(())
1239 }
1240
1241 fn unsubscribe_instrument_close(
1247 &mut self,
1248 cmd: &UnsubscribeInstrumentClose,
1249 ) -> anyhow::Result<()> {
1250 if self
1251 .subscriptions_instrument_close
1252 .contains(&cmd.instrument_id)
1253 {
1254 self.subscriptions_instrument_close
1255 .remove(&cmd.instrument_id);
1256 self.client.unsubscribe_instrument_close(cmd)?;
1257 }
1258 Ok(())
1259 }
1260
1261 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
1269 self.client.request_data(req)
1270 }
1271
1272 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
1278 self.client.request_instrument(req)
1279 }
1280
1281 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
1287 self.client.request_instruments(req)
1288 }
1289
1290 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
1296 self.client.request_quotes(req)
1297 }
1298
1299 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1305 self.client.request_trades(req)
1306 }
1307
1308 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1314 self.client.request_bars(req)
1315 }
1316
1317 pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
1323 self.client.request_book_depth(req)
1324 }
1325}
1326
1327#[inline(always)]
1328fn log_not_implemented<T: Debug>(msg: &T) {
1329 log::warn!("{msg:?} – handler not implemented");
1330}
1331
1332#[inline(always)]
1333fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1334 log::error!("Error on {cmd:?}: {e}");
1335}