1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24 sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39 close::InstrumentClose,
40 },
41 enums::BookType,
42 events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
43 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
44 instruments::InstrumentAny,
45 orderbook::OrderBook,
46};
47use ustr::Ustr;
48
49#[cfg(feature = "indicators")]
50use super::indicators::Indicators;
51use super::{
52 Actor,
53 registry::{get_actor_unchecked, try_get_actor_unchecked},
54};
55#[cfg(feature = "defi")]
56use crate::defi;
57#[cfg(feature = "defi")]
58#[allow(unused_imports)]
59use crate::defi::data_actor as _; use crate::{
61 cache::Cache,
62 clock::Clock,
63 component::Component,
64 enums::{ComponentState, ComponentTrigger},
65 logging::{CMD, RECV, REQ, SEND},
66 messages::{
67 data::{
68 BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
69 InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
70 RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
71 RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
72 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
73 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
74 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
75 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
76 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
77 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
78 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
79 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
80 },
81 system::ShutdownSystem,
82 },
83 msgbus::{
84 self, MStr, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
85 switchboard::{
86 MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
87 get_custom_topic, get_funding_rate_topic, get_index_price_topic,
88 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
89 get_instruments_topic, get_mark_price_topic, get_order_cancels_topic,
90 get_order_fills_topic, get_quotes_topic, get_trades_topic,
91 },
92 },
93 signal::Signal,
94 timer::{TimeEvent, TimeEventCallback},
95};
96
97#[derive(Debug, Clone)]
99#[cfg_attr(
100 feature = "python",
101 pyo3::pyclass(
102 module = "nautilus_trader.core.nautilus_pyo3.common",
103 subclass,
104 from_py_object
105 )
106)]
107pub struct DataActorConfig {
108 pub actor_id: Option<ActorId>,
110 pub log_events: bool,
112 pub log_commands: bool,
114}
115
116impl Default for DataActorConfig {
117 fn default() -> Self {
118 Self {
119 actor_id: None,
120 log_events: true,
121 log_commands: true,
122 }
123 }
124}
125
126#[derive(Debug, Clone)]
128#[cfg_attr(
129 feature = "python",
130 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
131)]
132pub struct ImportableActorConfig {
133 pub actor_path: String,
135 pub config_path: String,
137 pub config: HashMap<String, serde_json::Value>,
139}
140
141type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
142
143pub trait DataActor:
144 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
145{
146 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
152 Ok(IndexMap::new())
153 }
154
155 #[allow(unused_variables)]
161 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
162 Ok(())
163 }
164
165 fn on_start(&mut self) -> anyhow::Result<()> {
171 log::warn!(
172 "The `on_start` handler was called when not overridden, \
173 it's expected that any actions required when starting the actor \
174 occur here, such as subscribing/requesting data"
175 );
176 Ok(())
177 }
178
179 fn on_stop(&mut self) -> anyhow::Result<()> {
185 log::warn!(
186 "The `on_stop` handler was called when not overridden, \
187 it's expected that any actions required when stopping the actor \
188 occur here, such as unsubscribing from data",
189 );
190 Ok(())
191 }
192
193 fn on_resume(&mut self) -> anyhow::Result<()> {
199 log::warn!(
200 "The `on_resume` handler was called when not overridden, \
201 it's expected that any actions required when resuming the actor \
202 following a stop occur here"
203 );
204 Ok(())
205 }
206
207 fn on_reset(&mut self) -> anyhow::Result<()> {
213 log::warn!(
214 "The `on_reset` handler was called when not overridden, \
215 it's expected that any actions required when resetting the actor \
216 occur here, such as resetting indicators and other state"
217 );
218 Ok(())
219 }
220
221 fn on_dispose(&mut self) -> anyhow::Result<()> {
227 Ok(())
228 }
229
230 fn on_degrade(&mut self) -> anyhow::Result<()> {
236 Ok(())
237 }
238
239 fn on_fault(&mut self) -> anyhow::Result<()> {
245 Ok(())
246 }
247
248 #[allow(unused_variables)]
254 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
255 Ok(())
256 }
257
258 #[allow(unused_variables)]
264 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
265 Ok(())
266 }
267
268 #[allow(unused_variables)]
274 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
275 Ok(())
276 }
277
278 #[allow(unused_variables)]
284 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
285 Ok(())
286 }
287
288 #[allow(unused_variables)]
294 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
295 Ok(())
296 }
297
298 #[allow(unused_variables)]
304 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
305 Ok(())
306 }
307
308 #[allow(unused_variables)]
314 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
315 Ok(())
316 }
317
318 #[allow(unused_variables)]
324 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
325 Ok(())
326 }
327
328 #[allow(unused_variables)]
334 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
335 Ok(())
336 }
337
338 #[allow(unused_variables)]
344 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
345 Ok(())
346 }
347
348 #[allow(unused_variables)]
354 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
355 Ok(())
356 }
357
358 #[allow(unused_variables)]
364 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
365 Ok(())
366 }
367
368 #[allow(unused_variables)]
374 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
375 Ok(())
376 }
377
378 #[allow(unused_variables)]
384 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
385 Ok(())
386 }
387
388 #[allow(unused_variables)]
394 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
395 Ok(())
396 }
397
398 #[allow(unused_variables)]
404 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
405 Ok(())
406 }
407
408 #[cfg(feature = "defi")]
409 #[allow(unused_variables)]
415 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
416 Ok(())
417 }
418
419 #[cfg(feature = "defi")]
420 #[allow(unused_variables)]
426 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
427 Ok(())
428 }
429
430 #[cfg(feature = "defi")]
431 #[allow(unused_variables)]
437 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
438 Ok(())
439 }
440
441 #[cfg(feature = "defi")]
442 #[allow(unused_variables)]
448 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
449 Ok(())
450 }
451
452 #[cfg(feature = "defi")]
453 #[allow(unused_variables)]
459 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
460 Ok(())
461 }
462
463 #[cfg(feature = "defi")]
464 #[allow(unused_variables)]
470 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
471 Ok(())
472 }
473
474 #[allow(unused_variables)]
480 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
481 Ok(())
482 }
483
484 #[allow(unused_variables)]
490 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
491 Ok(())
492 }
493
494 #[allow(unused_variables)]
500 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
501 Ok(())
502 }
503
504 #[allow(unused_variables)]
510 fn on_historical_funding_rates(
511 &mut self,
512 funding_rates: &[FundingRateUpdate],
513 ) -> anyhow::Result<()> {
514 Ok(())
515 }
516
517 #[allow(unused_variables)]
523 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
524 Ok(())
525 }
526
527 #[allow(unused_variables)]
533 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
534 Ok(())
535 }
536
537 #[allow(unused_variables)]
543 fn on_historical_index_prices(
544 &mut self,
545 index_prices: &[IndexPriceUpdate],
546 ) -> anyhow::Result<()> {
547 Ok(())
548 }
549
550 fn handle_time_event(&mut self, event: &TimeEvent) {
552 log_received(&event);
553
554 if let Err(e) = DataActor::on_time_event(self, event) {
555 log_error(&e);
556 }
557 }
558
559 fn handle_data(&mut self, data: &dyn Any) {
561 log_received(&data);
562
563 if self.not_running() {
564 log_not_running(&data);
565 return;
566 }
567
568 if let Err(e) = self.on_data(data) {
569 log_error(&e);
570 }
571 }
572
573 fn handle_signal(&mut self, signal: &Signal) {
575 log_received(&signal);
576
577 if self.not_running() {
578 log_not_running(&signal);
579 return;
580 }
581
582 if let Err(e) = self.on_signal(signal) {
583 log_error(&e);
584 }
585 }
586
587 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
589 log_received(&instrument);
590
591 if self.not_running() {
592 log_not_running(&instrument);
593 return;
594 }
595
596 if let Err(e) = self.on_instrument(instrument) {
597 log_error(&e);
598 }
599 }
600
601 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
603 log_received(&deltas);
604
605 if self.not_running() {
606 log_not_running(&deltas);
607 return;
608 }
609
610 if let Err(e) = self.on_book_deltas(deltas) {
611 log_error(&e);
612 }
613 }
614
615 fn handle_book(&mut self, book: &OrderBook) {
617 log_received(&book);
618
619 if self.not_running() {
620 log_not_running(&book);
621 return;
622 }
623
624 if let Err(e) = self.on_book(book) {
625 log_error(&e);
626 };
627 }
628
629 fn handle_quote(&mut self, quote: &QuoteTick) {
631 log_received("e);
632
633 if self.not_running() {
634 log_not_running("e);
635 return;
636 }
637
638 if let Err(e) = self.on_quote(quote) {
639 log_error(&e);
640 }
641 }
642
643 fn handle_trade(&mut self, trade: &TradeTick) {
645 log_received(&trade);
646
647 if self.not_running() {
648 log_not_running(&trade);
649 return;
650 }
651
652 if let Err(e) = self.on_trade(trade) {
653 log_error(&e);
654 }
655 }
656
657 fn handle_bar(&mut self, bar: &Bar) {
659 log_received(&bar);
660
661 if self.not_running() {
662 log_not_running(&bar);
663 return;
664 }
665
666 if let Err(e) = self.on_bar(bar) {
667 log_error(&e);
668 }
669 }
670
671 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
673 log_received(&mark_price);
674
675 if self.not_running() {
676 log_not_running(&mark_price);
677 return;
678 }
679
680 if let Err(e) = self.on_mark_price(mark_price) {
681 log_error(&e);
682 }
683 }
684
685 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
687 log_received(&index_price);
688
689 if self.not_running() {
690 log_not_running(&index_price);
691 return;
692 }
693
694 if let Err(e) = self.on_index_price(index_price) {
695 log_error(&e);
696 }
697 }
698
699 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
701 log_received(&funding_rate);
702
703 if self.not_running() {
704 log_not_running(&funding_rate);
705 return;
706 }
707
708 if let Err(e) = self.on_funding_rate(funding_rate) {
709 log_error(&e);
710 }
711 }
712
713 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
715 log_received(&status);
716
717 if self.not_running() {
718 log_not_running(&status);
719 return;
720 }
721
722 if let Err(e) = self.on_instrument_status(status) {
723 log_error(&e);
724 }
725 }
726
727 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
729 log_received(&close);
730
731 if self.not_running() {
732 log_not_running(&close);
733 return;
734 }
735
736 if let Err(e) = self.on_instrument_close(close) {
737 log_error(&e);
738 }
739 }
740
741 fn handle_order_filled(&mut self, event: &OrderFilled) {
743 log_received(&event);
744
745 if event.strategy_id.inner() == self.actor_id().inner() {
749 return;
750 }
751
752 if self.not_running() {
753 log_not_running(&event);
754 return;
755 }
756
757 if let Err(e) = self.on_order_filled(event) {
758 log_error(&e);
759 }
760 }
761
762 fn handle_order_canceled(&mut self, event: &OrderCanceled) {
764 log_received(&event);
765
766 if event.strategy_id.inner() == self.actor_id().inner() {
770 return;
771 }
772
773 if self.not_running() {
774 log_not_running(&event);
775 return;
776 }
777
778 if let Err(e) = self.on_order_canceled(event) {
779 log_error(&e);
780 }
781 }
782
783 #[cfg(feature = "defi")]
784 fn handle_block(&mut self, block: &Block) {
786 log_received(&block);
787
788 if self.not_running() {
789 log_not_running(&block);
790 return;
791 }
792
793 if let Err(e) = self.on_block(block) {
794 log_error(&e);
795 }
796 }
797
798 #[cfg(feature = "defi")]
799 fn handle_pool(&mut self, pool: &Pool) {
801 log_received(&pool);
802
803 if self.not_running() {
804 log_not_running(&pool);
805 return;
806 }
807
808 if let Err(e) = self.on_pool(pool) {
809 log_error(&e);
810 }
811 }
812
813 #[cfg(feature = "defi")]
814 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
816 log_received(&swap);
817
818 if self.not_running() {
819 log_not_running(&swap);
820 return;
821 }
822
823 if let Err(e) = self.on_pool_swap(swap) {
824 log_error(&e);
825 }
826 }
827
828 #[cfg(feature = "defi")]
829 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
831 log_received(&update);
832
833 if self.not_running() {
834 log_not_running(&update);
835 return;
836 }
837
838 if let Err(e) = self.on_pool_liquidity_update(update) {
839 log_error(&e);
840 }
841 }
842
843 #[cfg(feature = "defi")]
844 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
846 log_received(&collect);
847
848 if self.not_running() {
849 log_not_running(&collect);
850 return;
851 }
852
853 if let Err(e) = self.on_pool_fee_collect(collect) {
854 log_error(&e);
855 }
856 }
857
858 #[cfg(feature = "defi")]
859 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
861 log_received(&flash);
862
863 if self.not_running() {
864 log_not_running(&flash);
865 return;
866 }
867
868 if let Err(e) = self.on_pool_flash(flash) {
869 log_error(&e);
870 }
871 }
872
873 fn handle_historical_data(&mut self, data: &dyn Any) {
875 log_received(&data);
876
877 if let Err(e) = self.on_historical_data(data) {
878 log_error(&e);
879 }
880 }
881
882 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
884 log_received(&resp);
885
886 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
887 log_error(&e);
888 }
889 }
890
891 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
893 log_received(&resp);
894
895 if let Err(e) = self.on_instrument(&resp.data) {
896 log_error(&e);
897 }
898 }
899
900 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
902 log_received(&resp);
903
904 for inst in &resp.data {
905 if let Err(e) = self.on_instrument(inst) {
906 log_error(&e);
907 }
908 }
909 }
910
911 fn handle_book_response(&mut self, resp: &BookResponse) {
913 log_received(&resp);
914
915 if let Err(e) = self.on_book(&resp.data) {
916 log_error(&e);
917 }
918 }
919
920 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
922 log_received(&resp);
923
924 if let Err(e) = self.on_historical_quotes(&resp.data) {
925 log_error(&e);
926 }
927 }
928
929 fn handle_trades_response(&mut self, resp: &TradesResponse) {
931 log_received(&resp);
932
933 if let Err(e) = self.on_historical_trades(&resp.data) {
934 log_error(&e);
935 }
936 }
937
938 fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
940 log_received(&resp);
941
942 if let Err(e) = self.on_historical_funding_rates(&resp.data) {
943 log_error(&e);
944 }
945 }
946
947 fn handle_bars_response(&mut self, resp: &BarsResponse) {
949 log_received(&resp);
950
951 if let Err(e) = self.on_historical_bars(&resp.data) {
952 log_error(&e);
953 }
954 }
955
956 fn subscribe_data(
958 &mut self,
959 data_type: DataType,
960 client_id: Option<ClientId>,
961 params: Option<IndexMap<String, String>>,
962 ) where
963 Self: 'static + Debug + Sized,
964 {
965 let actor_id = self.actor_id().inner();
966 let handler = ShareableMessageHandler::from_any(move |data: &dyn Any| {
967 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
968 });
969
970 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
971 }
972
973 fn subscribe_quotes(
975 &mut self,
976 instrument_id: InstrumentId,
977 client_id: Option<ClientId>,
978 params: Option<IndexMap<String, String>>,
979 ) where
980 Self: 'static + Debug + Sized,
981 {
982 let actor_id = self.actor_id().inner();
983 let topic = get_quotes_topic(instrument_id);
984
985 let handler = TypedHandler::from(move |quote: &QuoteTick| {
986 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
987 actor.handle_quote(quote);
988 } else {
989 log::error!("Actor {actor_id} not found for quote handling");
990 }
991 });
992
993 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
994 }
995
996 fn subscribe_instruments(
998 &mut self,
999 venue: Venue,
1000 client_id: Option<ClientId>,
1001 params: Option<IndexMap<String, String>>,
1002 ) where
1003 Self: 'static + Debug + Sized,
1004 {
1005 let actor_id = self.actor_id().inner();
1006 let topic = get_instruments_topic(venue);
1007
1008 let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1009 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1010 actor.handle_instrument(instrument);
1011 } else {
1012 log::error!("Actor {actor_id} not found for instruments handling");
1013 }
1014 });
1015
1016 DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
1017 }
1018
1019 fn subscribe_instrument(
1021 &mut self,
1022 instrument_id: InstrumentId,
1023 client_id: Option<ClientId>,
1024 params: Option<IndexMap<String, String>>,
1025 ) where
1026 Self: 'static + Debug + Sized,
1027 {
1028 let actor_id = self.actor_id().inner();
1029 let topic = get_instrument_topic(instrument_id);
1030
1031 let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1032 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1033 actor.handle_instrument(instrument);
1034 } else {
1035 log::error!("Actor {actor_id} not found for instrument handling");
1036 }
1037 });
1038
1039 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1040 }
1041
1042 fn subscribe_book_deltas(
1044 &mut self,
1045 instrument_id: InstrumentId,
1046 book_type: BookType,
1047 depth: Option<NonZeroUsize>,
1048 client_id: Option<ClientId>,
1049 managed: bool,
1050 params: Option<IndexMap<String, String>>,
1051 ) where
1052 Self: 'static + Debug + Sized,
1053 {
1054 let actor_id = self.actor_id().inner();
1055 let topic = get_book_deltas_topic(instrument_id);
1056
1057 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1058 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1059 });
1060
1061 DataActorCore::subscribe_book_deltas(
1062 self,
1063 topic,
1064 handler,
1065 instrument_id,
1066 book_type,
1067 depth,
1068 client_id,
1069 managed,
1070 params,
1071 );
1072 }
1073
1074 fn subscribe_book_at_interval(
1076 &mut self,
1077 instrument_id: InstrumentId,
1078 book_type: BookType,
1079 depth: Option<NonZeroUsize>,
1080 interval_ms: NonZeroUsize,
1081 client_id: Option<ClientId>,
1082 params: Option<IndexMap<String, String>>,
1083 ) where
1084 Self: 'static + Debug + Sized,
1085 {
1086 let actor_id = self.actor_id().inner();
1087 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1088
1089 let handler = TypedHandler::from(move |book: &OrderBook| {
1090 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1091 });
1092
1093 DataActorCore::subscribe_book_at_interval(
1094 self,
1095 topic,
1096 handler,
1097 instrument_id,
1098 book_type,
1099 depth,
1100 interval_ms,
1101 client_id,
1102 params,
1103 );
1104 }
1105
1106 fn subscribe_trades(
1108 &mut self,
1109 instrument_id: InstrumentId,
1110 client_id: Option<ClientId>,
1111 params: Option<IndexMap<String, String>>,
1112 ) where
1113 Self: 'static + Debug + Sized,
1114 {
1115 let actor_id = self.actor_id().inner();
1116 let topic = get_trades_topic(instrument_id);
1117
1118 let handler = TypedHandler::from(move |trade: &TradeTick| {
1119 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1120 });
1121
1122 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1123 }
1124
1125 fn subscribe_bars(
1127 &mut self,
1128 bar_type: BarType,
1129 client_id: Option<ClientId>,
1130 params: Option<IndexMap<String, String>>,
1131 ) where
1132 Self: 'static + Debug + Sized,
1133 {
1134 let actor_id = self.actor_id().inner();
1135 let topic = get_bars_topic(bar_type);
1136
1137 let handler = TypedHandler::from(move |bar: &Bar| {
1138 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1139 });
1140
1141 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1142 }
1143
1144 fn subscribe_mark_prices(
1146 &mut self,
1147 instrument_id: InstrumentId,
1148 client_id: Option<ClientId>,
1149 params: Option<IndexMap<String, String>>,
1150 ) where
1151 Self: 'static + Debug + Sized,
1152 {
1153 let actor_id = self.actor_id().inner();
1154 let topic = get_mark_price_topic(instrument_id);
1155
1156 let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1157 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1158 });
1159
1160 DataActorCore::subscribe_mark_prices(
1161 self,
1162 topic,
1163 handler,
1164 instrument_id,
1165 client_id,
1166 params,
1167 );
1168 }
1169
1170 fn subscribe_index_prices(
1172 &mut self,
1173 instrument_id: InstrumentId,
1174 client_id: Option<ClientId>,
1175 params: Option<IndexMap<String, String>>,
1176 ) where
1177 Self: 'static + Debug + Sized,
1178 {
1179 let actor_id = self.actor_id().inner();
1180 let topic = get_index_price_topic(instrument_id);
1181
1182 let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1183 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1184 });
1185
1186 DataActorCore::subscribe_index_prices(
1187 self,
1188 topic,
1189 handler,
1190 instrument_id,
1191 client_id,
1192 params,
1193 );
1194 }
1195
1196 fn subscribe_funding_rates(
1198 &mut self,
1199 instrument_id: InstrumentId,
1200 client_id: Option<ClientId>,
1201 params: Option<IndexMap<String, String>>,
1202 ) where
1203 Self: 'static + Debug + Sized,
1204 {
1205 let actor_id = self.actor_id().inner();
1206 let topic = get_funding_rate_topic(instrument_id);
1207
1208 let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1209 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1210 });
1211
1212 DataActorCore::subscribe_funding_rates(
1213 self,
1214 topic,
1215 handler,
1216 instrument_id,
1217 client_id,
1218 params,
1219 );
1220 }
1221
1222 fn subscribe_instrument_status(
1224 &mut self,
1225 instrument_id: InstrumentId,
1226 client_id: Option<ClientId>,
1227 params: Option<IndexMap<String, String>>,
1228 ) where
1229 Self: 'static + Debug + Sized,
1230 {
1231 let actor_id = self.actor_id().inner();
1232 let topic = get_instrument_status_topic(instrument_id);
1233
1234 let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1235 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1236 });
1237
1238 DataActorCore::subscribe_instrument_status(
1239 self,
1240 topic,
1241 handler,
1242 instrument_id,
1243 client_id,
1244 params,
1245 );
1246 }
1247
1248 fn subscribe_instrument_close(
1250 &mut self,
1251 instrument_id: InstrumentId,
1252 client_id: Option<ClientId>,
1253 params: Option<IndexMap<String, String>>,
1254 ) where
1255 Self: 'static + Debug + Sized,
1256 {
1257 let actor_id = self.actor_id().inner();
1258 let topic = get_instrument_close_topic(instrument_id);
1259
1260 let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1261 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1262 });
1263
1264 DataActorCore::subscribe_instrument_close(
1265 self,
1266 topic,
1267 handler,
1268 instrument_id,
1269 client_id,
1270 params,
1271 );
1272 }
1273
1274 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1276 where
1277 Self: 'static + Debug + Sized,
1278 {
1279 let actor_id = self.actor_id().inner();
1280 let topic = get_order_fills_topic(instrument_id);
1281
1282 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1283 if let OrderEventAny::Filled(filled) = event {
1284 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1285 }
1286 });
1287
1288 DataActorCore::subscribe_order_fills(self, topic, handler);
1289 }
1290
1291 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1293 where
1294 Self: 'static + Debug + Sized,
1295 {
1296 let actor_id = self.actor_id().inner();
1297 let topic = get_order_cancels_topic(instrument_id);
1298
1299 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1300 if let OrderEventAny::Canceled(canceled) = event {
1301 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1302 }
1303 });
1304
1305 DataActorCore::subscribe_order_cancels(self, topic, handler);
1306 }
1307
1308 #[cfg(feature = "defi")]
1309 fn subscribe_blocks(
1311 &mut self,
1312 chain: Blockchain,
1313 client_id: Option<ClientId>,
1314 params: Option<IndexMap<String, String>>,
1315 ) where
1316 Self: 'static + Debug + Sized,
1317 {
1318 let actor_id = self.actor_id().inner();
1319 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1320
1321 let handler = TypedHandler::from(move |block: &Block| {
1322 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1323 });
1324
1325 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1326 }
1327
1328 #[cfg(feature = "defi")]
1329 fn subscribe_pool(
1331 &mut self,
1332 instrument_id: InstrumentId,
1333 client_id: Option<ClientId>,
1334 params: Option<IndexMap<String, String>>,
1335 ) where
1336 Self: 'static + Debug + Sized,
1337 {
1338 let actor_id = self.actor_id().inner();
1339 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1340
1341 let handler = TypedHandler::from(move |pool: &Pool| {
1342 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1343 });
1344
1345 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1346 }
1347
1348 #[cfg(feature = "defi")]
1349 fn subscribe_pool_swaps(
1351 &mut self,
1352 instrument_id: InstrumentId,
1353 client_id: Option<ClientId>,
1354 params: Option<IndexMap<String, String>>,
1355 ) where
1356 Self: 'static + Debug + Sized,
1357 {
1358 let actor_id = self.actor_id().inner();
1359 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1360
1361 let handler = TypedHandler::from(move |swap: &PoolSwap| {
1362 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1363 });
1364
1365 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1366 }
1367
1368 #[cfg(feature = "defi")]
1369 fn subscribe_pool_liquidity_updates(
1371 &mut self,
1372 instrument_id: InstrumentId,
1373 client_id: Option<ClientId>,
1374 params: Option<IndexMap<String, String>>,
1375 ) where
1376 Self: 'static + Debug + Sized,
1377 {
1378 let actor_id = self.actor_id().inner();
1379 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1380
1381 let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1382 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1383 });
1384
1385 DataActorCore::subscribe_pool_liquidity_updates(
1386 self,
1387 topic,
1388 handler,
1389 instrument_id,
1390 client_id,
1391 params,
1392 );
1393 }
1394
1395 #[cfg(feature = "defi")]
1396 fn subscribe_pool_fee_collects(
1398 &mut self,
1399 instrument_id: InstrumentId,
1400 client_id: Option<ClientId>,
1401 params: Option<IndexMap<String, String>>,
1402 ) where
1403 Self: 'static + Debug + Sized,
1404 {
1405 let actor_id = self.actor_id().inner();
1406 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1407
1408 let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1409 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1410 });
1411
1412 DataActorCore::subscribe_pool_fee_collects(
1413 self,
1414 topic,
1415 handler,
1416 instrument_id,
1417 client_id,
1418 params,
1419 );
1420 }
1421
1422 #[cfg(feature = "defi")]
1423 fn subscribe_pool_flash_events(
1425 &mut self,
1426 instrument_id: InstrumentId,
1427 client_id: Option<ClientId>,
1428 params: Option<IndexMap<String, String>>,
1429 ) where
1430 Self: 'static + Debug + Sized,
1431 {
1432 let actor_id = self.actor_id().inner();
1433 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1434
1435 let handler = TypedHandler::from(move |flash: &PoolFlash| {
1436 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1437 });
1438
1439 DataActorCore::subscribe_pool_flash_events(
1440 self,
1441 topic,
1442 handler,
1443 instrument_id,
1444 client_id,
1445 params,
1446 );
1447 }
1448
1449 fn unsubscribe_data(
1451 &mut self,
1452 data_type: DataType,
1453 client_id: Option<ClientId>,
1454 params: Option<IndexMap<String, String>>,
1455 ) where
1456 Self: 'static + Debug + Sized,
1457 {
1458 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1459 }
1460
1461 fn unsubscribe_instruments(
1463 &mut self,
1464 venue: Venue,
1465 client_id: Option<ClientId>,
1466 params: Option<IndexMap<String, String>>,
1467 ) where
1468 Self: 'static + Debug + Sized,
1469 {
1470 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1471 }
1472
1473 fn unsubscribe_instrument(
1475 &mut self,
1476 instrument_id: InstrumentId,
1477 client_id: Option<ClientId>,
1478 params: Option<IndexMap<String, String>>,
1479 ) where
1480 Self: 'static + Debug + Sized,
1481 {
1482 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1483 }
1484
1485 fn unsubscribe_book_deltas(
1487 &mut self,
1488 instrument_id: InstrumentId,
1489 client_id: Option<ClientId>,
1490 params: Option<IndexMap<String, String>>,
1491 ) where
1492 Self: 'static + Debug + Sized,
1493 {
1494 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1495 }
1496
1497 fn unsubscribe_book_at_interval(
1499 &mut self,
1500 instrument_id: InstrumentId,
1501 interval_ms: NonZeroUsize,
1502 client_id: Option<ClientId>,
1503 params: Option<IndexMap<String, String>>,
1504 ) where
1505 Self: 'static + Debug + Sized,
1506 {
1507 DataActorCore::unsubscribe_book_at_interval(
1508 self,
1509 instrument_id,
1510 interval_ms,
1511 client_id,
1512 params,
1513 );
1514 }
1515
1516 fn unsubscribe_quotes(
1518 &mut self,
1519 instrument_id: InstrumentId,
1520 client_id: Option<ClientId>,
1521 params: Option<IndexMap<String, String>>,
1522 ) where
1523 Self: 'static + Debug + Sized,
1524 {
1525 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1526 }
1527
1528 fn unsubscribe_trades(
1530 &mut self,
1531 instrument_id: InstrumentId,
1532 client_id: Option<ClientId>,
1533 params: Option<IndexMap<String, String>>,
1534 ) where
1535 Self: 'static + Debug + Sized,
1536 {
1537 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1538 }
1539
1540 fn unsubscribe_bars(
1542 &mut self,
1543 bar_type: BarType,
1544 client_id: Option<ClientId>,
1545 params: Option<IndexMap<String, String>>,
1546 ) where
1547 Self: 'static + Debug + Sized,
1548 {
1549 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1550 }
1551
1552 fn unsubscribe_mark_prices(
1554 &mut self,
1555 instrument_id: InstrumentId,
1556 client_id: Option<ClientId>,
1557 params: Option<IndexMap<String, String>>,
1558 ) where
1559 Self: 'static + Debug + Sized,
1560 {
1561 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1562 }
1563
1564 fn unsubscribe_index_prices(
1566 &mut self,
1567 instrument_id: InstrumentId,
1568 client_id: Option<ClientId>,
1569 params: Option<IndexMap<String, String>>,
1570 ) where
1571 Self: 'static + Debug + Sized,
1572 {
1573 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1574 }
1575
1576 fn unsubscribe_funding_rates(
1578 &mut self,
1579 instrument_id: InstrumentId,
1580 client_id: Option<ClientId>,
1581 params: Option<IndexMap<String, String>>,
1582 ) where
1583 Self: 'static + Debug + Sized,
1584 {
1585 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1586 }
1587
1588 fn unsubscribe_instrument_status(
1590 &mut self,
1591 instrument_id: InstrumentId,
1592 client_id: Option<ClientId>,
1593 params: Option<IndexMap<String, String>>,
1594 ) where
1595 Self: 'static + Debug + Sized,
1596 {
1597 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1598 }
1599
1600 fn unsubscribe_instrument_close(
1602 &mut self,
1603 instrument_id: InstrumentId,
1604 client_id: Option<ClientId>,
1605 params: Option<IndexMap<String, String>>,
1606 ) where
1607 Self: 'static + Debug + Sized,
1608 {
1609 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1610 }
1611
1612 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1614 where
1615 Self: 'static + Debug + Sized,
1616 {
1617 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1618 }
1619
1620 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1622 where
1623 Self: 'static + Debug + Sized,
1624 {
1625 DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1626 }
1627
1628 #[cfg(feature = "defi")]
1629 fn unsubscribe_blocks(
1631 &mut self,
1632 chain: Blockchain,
1633 client_id: Option<ClientId>,
1634 params: Option<IndexMap<String, String>>,
1635 ) where
1636 Self: 'static + Debug + Sized,
1637 {
1638 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1639 }
1640
1641 #[cfg(feature = "defi")]
1642 fn unsubscribe_pool(
1644 &mut self,
1645 instrument_id: InstrumentId,
1646 client_id: Option<ClientId>,
1647 params: Option<IndexMap<String, String>>,
1648 ) where
1649 Self: 'static + Debug + Sized,
1650 {
1651 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1652 }
1653
1654 #[cfg(feature = "defi")]
1655 fn unsubscribe_pool_swaps(
1657 &mut self,
1658 instrument_id: InstrumentId,
1659 client_id: Option<ClientId>,
1660 params: Option<IndexMap<String, String>>,
1661 ) where
1662 Self: 'static + Debug + Sized,
1663 {
1664 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1665 }
1666
1667 #[cfg(feature = "defi")]
1668 fn unsubscribe_pool_liquidity_updates(
1670 &mut self,
1671 instrument_id: InstrumentId,
1672 client_id: Option<ClientId>,
1673 params: Option<IndexMap<String, String>>,
1674 ) where
1675 Self: 'static + Debug + Sized,
1676 {
1677 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1678 }
1679
1680 #[cfg(feature = "defi")]
1681 fn unsubscribe_pool_fee_collects(
1683 &mut self,
1684 instrument_id: InstrumentId,
1685 client_id: Option<ClientId>,
1686 params: Option<IndexMap<String, String>>,
1687 ) where
1688 Self: 'static + Debug + Sized,
1689 {
1690 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1691 }
1692
1693 #[cfg(feature = "defi")]
1694 fn unsubscribe_pool_flash_events(
1696 &mut self,
1697 instrument_id: InstrumentId,
1698 client_id: Option<ClientId>,
1699 params: Option<IndexMap<String, String>>,
1700 ) where
1701 Self: 'static + Debug + Sized,
1702 {
1703 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1704 }
1705
1706 fn request_data(
1712 &mut self,
1713 data_type: DataType,
1714 client_id: ClientId,
1715 start: Option<DateTime<Utc>>,
1716 end: Option<DateTime<Utc>>,
1717 limit: Option<NonZeroUsize>,
1718 params: Option<IndexMap<String, String>>,
1719 ) -> anyhow::Result<UUID4>
1720 where
1721 Self: 'static + Debug + Sized,
1722 {
1723 let actor_id = self.actor_id().inner();
1724 let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1725 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1726 });
1727
1728 DataActorCore::request_data(
1729 self, data_type, client_id, start, end, limit, params, handler,
1730 )
1731 }
1732
1733 fn request_instrument(
1739 &mut self,
1740 instrument_id: InstrumentId,
1741 start: Option<DateTime<Utc>>,
1742 end: Option<DateTime<Utc>>,
1743 client_id: Option<ClientId>,
1744 params: Option<IndexMap<String, String>>,
1745 ) -> anyhow::Result<UUID4>
1746 where
1747 Self: 'static + Debug + Sized,
1748 {
1749 let actor_id = self.actor_id().inner();
1750 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1751 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1752 });
1753
1754 DataActorCore::request_instrument(
1755 self,
1756 instrument_id,
1757 start,
1758 end,
1759 client_id,
1760 params,
1761 handler,
1762 )
1763 }
1764
1765 fn request_instruments(
1771 &mut self,
1772 venue: Option<Venue>,
1773 start: Option<DateTime<Utc>>,
1774 end: Option<DateTime<Utc>>,
1775 client_id: Option<ClientId>,
1776 params: Option<IndexMap<String, String>>,
1777 ) -> anyhow::Result<UUID4>
1778 where
1779 Self: 'static + Debug + Sized,
1780 {
1781 let actor_id = self.actor_id().inner();
1782 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1783 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1784 });
1785
1786 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1787 }
1788
1789 fn request_book_snapshot(
1795 &mut self,
1796 instrument_id: InstrumentId,
1797 depth: Option<NonZeroUsize>,
1798 client_id: Option<ClientId>,
1799 params: Option<IndexMap<String, String>>,
1800 ) -> anyhow::Result<UUID4>
1801 where
1802 Self: 'static + Debug + Sized,
1803 {
1804 let actor_id = self.actor_id().inner();
1805 let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
1806 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1807 });
1808
1809 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1810 }
1811
1812 fn request_quotes(
1818 &mut self,
1819 instrument_id: InstrumentId,
1820 start: Option<DateTime<Utc>>,
1821 end: Option<DateTime<Utc>>,
1822 limit: Option<NonZeroUsize>,
1823 client_id: Option<ClientId>,
1824 params: Option<IndexMap<String, String>>,
1825 ) -> anyhow::Result<UUID4>
1826 where
1827 Self: 'static + Debug + Sized,
1828 {
1829 let actor_id = self.actor_id().inner();
1830 let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
1831 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1832 });
1833
1834 DataActorCore::request_quotes(
1835 self,
1836 instrument_id,
1837 start,
1838 end,
1839 limit,
1840 client_id,
1841 params,
1842 handler,
1843 )
1844 }
1845
1846 fn request_trades(
1852 &mut self,
1853 instrument_id: InstrumentId,
1854 start: Option<DateTime<Utc>>,
1855 end: Option<DateTime<Utc>>,
1856 limit: Option<NonZeroUsize>,
1857 client_id: Option<ClientId>,
1858 params: Option<IndexMap<String, String>>,
1859 ) -> anyhow::Result<UUID4>
1860 where
1861 Self: 'static + Debug + Sized,
1862 {
1863 let actor_id = self.actor_id().inner();
1864 let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
1865 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1866 });
1867
1868 DataActorCore::request_trades(
1869 self,
1870 instrument_id,
1871 start,
1872 end,
1873 limit,
1874 client_id,
1875 params,
1876 handler,
1877 )
1878 }
1879
1880 fn request_funding_rates(
1886 &mut self,
1887 instrument_id: InstrumentId,
1888 start: Option<DateTime<Utc>>,
1889 end: Option<DateTime<Utc>>,
1890 limit: Option<NonZeroUsize>,
1891 client_id: Option<ClientId>,
1892 params: Option<IndexMap<String, String>>,
1893 ) -> anyhow::Result<UUID4>
1894 where
1895 Self: 'static + Debug + Sized,
1896 {
1897 let actor_id = self.actor_id().inner();
1898 let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
1899 get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
1900 });
1901
1902 DataActorCore::request_funding_rates(
1903 self,
1904 instrument_id,
1905 start,
1906 end,
1907 limit,
1908 client_id,
1909 params,
1910 handler,
1911 )
1912 }
1913
1914 fn request_bars(
1920 &mut self,
1921 bar_type: BarType,
1922 start: Option<DateTime<Utc>>,
1923 end: Option<DateTime<Utc>>,
1924 limit: Option<NonZeroUsize>,
1925 client_id: Option<ClientId>,
1926 params: Option<IndexMap<String, String>>,
1927 ) -> anyhow::Result<UUID4>
1928 where
1929 Self: 'static + Debug + Sized,
1930 {
1931 let actor_id = self.actor_id().inner();
1932 let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
1933 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1934 });
1935
1936 DataActorCore::request_bars(
1937 self, bar_type, start, end, limit, client_id, params, handler,
1938 )
1939 }
1940}
1941
1942impl<T> Actor for T
1944where
1945 T: DataActor + Debug + 'static,
1946{
1947 fn id(&self) -> Ustr {
1948 self.actor_id.inner()
1949 }
1950
1951 #[allow(unused_variables)]
1952 fn handle(&mut self, msg: &dyn Any) {
1953 }
1955
1956 fn as_any(&self) -> &dyn Any {
1957 self
1958 }
1959}
1960
1961impl<T> Component for T
1963where
1964 T: DataActor + Debug + 'static,
1965{
1966 fn component_id(&self) -> ComponentId {
1967 ComponentId::new(self.actor_id.inner().as_str())
1968 }
1969
1970 fn state(&self) -> ComponentState {
1971 self.state
1972 }
1973
1974 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1975 self.state = self.state.transition(&trigger)?;
1976 log::info!("{}", self.state.variant_name());
1977 Ok(())
1978 }
1979
1980 fn register(
1981 &mut self,
1982 trader_id: TraderId,
1983 clock: Rc<RefCell<dyn Clock>>,
1984 cache: Rc<RefCell<Cache>>,
1985 ) -> anyhow::Result<()> {
1986 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1987
1988 let actor_id = self.actor_id().inner();
1990 let callback = TimeEventCallback::from(move |event: TimeEvent| {
1991 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1992 actor.handle_time_event(&event);
1993 } else {
1994 log::error!("Actor {actor_id} not found for time event handling");
1995 }
1996 });
1997
1998 clock.borrow_mut().register_default_handler(callback);
1999
2000 self.initialize()
2001 }
2002
2003 fn on_start(&mut self) -> anyhow::Result<()> {
2004 DataActor::on_start(self)
2005 }
2006
2007 fn on_stop(&mut self) -> anyhow::Result<()> {
2008 DataActor::on_stop(self)
2009 }
2010
2011 fn on_resume(&mut self) -> anyhow::Result<()> {
2012 DataActor::on_resume(self)
2013 }
2014
2015 fn on_degrade(&mut self) -> anyhow::Result<()> {
2016 DataActor::on_degrade(self)
2017 }
2018
2019 fn on_fault(&mut self) -> anyhow::Result<()> {
2020 DataActor::on_fault(self)
2021 }
2022
2023 fn on_reset(&mut self) -> anyhow::Result<()> {
2024 DataActor::on_reset(self)
2025 }
2026
2027 fn on_dispose(&mut self) -> anyhow::Result<()> {
2028 DataActor::on_dispose(self)
2029 }
2030}
2031
2032#[derive(Clone)]
2034#[allow(
2035 dead_code,
2036 reason = "TODO: Under development (pending_requests, signal_classes)"
2037)]
2038pub struct DataActorCore {
2039 pub actor_id: ActorId,
2041 pub config: DataActorConfig,
2043 trader_id: Option<TraderId>,
2044 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2047 topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
2048 deltas_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDeltas>>,
2049 depth10_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDepth10>>,
2050 book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2051 quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2052 trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2053 bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2054 mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2055 index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2056 funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2057 order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2058 #[cfg(feature = "defi")]
2059 block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2060 #[cfg(feature = "defi")]
2061 pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2062 #[cfg(feature = "defi")]
2063 pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2064 #[cfg(feature = "defi")]
2065 pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2066 #[cfg(feature = "defi")]
2067 pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2068 #[cfg(feature = "defi")]
2069 pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2070 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2072 signal_classes: AHashMap<String, String>,
2073 #[cfg(feature = "indicators")]
2074 indicators: Indicators,
2075}
2076
2077impl Debug for DataActorCore {
2078 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2079 f.debug_struct(stringify!(DataActorCore))
2080 .field("actor_id", &self.actor_id)
2081 .field("config", &self.config)
2082 .field("state", &self.state)
2083 .field("trader_id", &self.trader_id)
2084 .finish()
2085 }
2086}
2087
2088impl DataActorCore {
2089 pub(crate) fn add_subscription_any(
2093 &mut self,
2094 topic: MStr<Topic>,
2095 handler: ShareableMessageHandler,
2096 ) {
2097 if self.topic_handlers.contains_key(&topic) {
2098 log::warn!(
2099 "Actor {} attempted duplicate subscription to topic '{topic}'",
2100 self.actor_id,
2101 );
2102 return;
2103 }
2104
2105 self.topic_handlers.insert(topic, handler.clone());
2106 msgbus::subscribe_any(topic.into(), handler, None);
2107 }
2108
2109 pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2113 if let Some(handler) = self.topic_handlers.remove(&topic) {
2114 msgbus::unsubscribe_any(topic.into(), handler);
2115 } else {
2116 log::warn!(
2117 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2118 self.actor_id,
2119 );
2120 }
2121 }
2122
2123 pub(crate) fn add_quote_subscription(
2124 &mut self,
2125 topic: MStr<Topic>,
2126 handler: TypedHandler<QuoteTick>,
2127 ) {
2128 if self.quote_handlers.contains_key(&topic) {
2129 log::warn!(
2130 "Actor {} attempted duplicate quote subscription to '{topic}'",
2131 self.actor_id
2132 );
2133 return;
2134 }
2135 self.quote_handlers.insert(topic, handler.clone());
2136 msgbus::subscribe_quotes(topic.into(), handler, None);
2137 }
2138
2139 #[allow(dead_code)]
2140 pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2141 if let Some(handler) = self.quote_handlers.remove(&topic) {
2142 msgbus::unsubscribe_quotes(topic.into(), &handler);
2143 }
2144 }
2145
2146 pub(crate) fn add_trade_subscription(
2147 &mut self,
2148 topic: MStr<Topic>,
2149 handler: TypedHandler<TradeTick>,
2150 ) {
2151 if self.trade_handlers.contains_key(&topic) {
2152 log::warn!(
2153 "Actor {} attempted duplicate trade subscription to '{topic}'",
2154 self.actor_id
2155 );
2156 return;
2157 }
2158 self.trade_handlers.insert(topic, handler.clone());
2159 msgbus::subscribe_trades(topic.into(), handler, None);
2160 }
2161
2162 #[allow(dead_code)]
2163 pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2164 if let Some(handler) = self.trade_handlers.remove(&topic) {
2165 msgbus::unsubscribe_trades(topic.into(), &handler);
2166 }
2167 }
2168
2169 pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2170 if self.bar_handlers.contains_key(&topic) {
2171 log::warn!(
2172 "Actor {} attempted duplicate bar subscription to '{topic}'",
2173 self.actor_id
2174 );
2175 return;
2176 }
2177 self.bar_handlers.insert(topic, handler.clone());
2178 msgbus::subscribe_bars(topic.into(), handler, None);
2179 }
2180
2181 #[allow(dead_code)]
2182 pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2183 if let Some(handler) = self.bar_handlers.remove(&topic) {
2184 msgbus::unsubscribe_bars(topic.into(), &handler);
2185 }
2186 }
2187
2188 pub(crate) fn add_order_event_subscription(
2189 &mut self,
2190 topic: MStr<Topic>,
2191 handler: TypedHandler<OrderEventAny>,
2192 ) {
2193 if self.order_event_handlers.contains_key(&topic) {
2194 log::warn!(
2195 "Actor {} attempted duplicate order event subscription to '{topic}'",
2196 self.actor_id
2197 );
2198 return;
2199 }
2200 self.order_event_handlers.insert(topic, handler.clone());
2201 msgbus::subscribe_order_events(topic.into(), handler, None);
2202 }
2203
2204 #[allow(dead_code)]
2205 pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2206 if let Some(handler) = self.order_event_handlers.remove(&topic) {
2207 msgbus::unsubscribe_order_events(topic.into(), &handler);
2208 }
2209 }
2210
2211 pub(crate) fn add_deltas_subscription(
2212 &mut self,
2213 topic: MStr<Topic>,
2214 handler: TypedHandler<OrderBookDeltas>,
2215 ) {
2216 if self.deltas_handlers.contains_key(&topic) {
2217 log::warn!(
2218 "Actor {} attempted duplicate deltas subscription to '{topic}'",
2219 self.actor_id
2220 );
2221 return;
2222 }
2223 self.deltas_handlers.insert(topic, handler.clone());
2224 msgbus::subscribe_book_deltas(topic.into(), handler, None);
2225 }
2226
2227 #[allow(dead_code)]
2228 pub(crate) fn remove_deltas_subscription(&mut self, topic: MStr<Topic>) {
2229 if let Some(handler) = self.deltas_handlers.remove(&topic) {
2230 msgbus::unsubscribe_book_deltas(topic.into(), &handler);
2231 }
2232 }
2233
2234 #[allow(dead_code)]
2235 pub(crate) fn add_depth10_subscription(
2236 &mut self,
2237 topic: MStr<Topic>,
2238 handler: TypedHandler<OrderBookDepth10>,
2239 ) {
2240 if self.depth10_handlers.contains_key(&topic) {
2241 log::warn!(
2242 "Actor {} attempted duplicate depth10 subscription to '{topic}'",
2243 self.actor_id
2244 );
2245 return;
2246 }
2247 self.depth10_handlers.insert(topic, handler.clone());
2248 msgbus::subscribe_book_depth10(topic.into(), handler, None);
2249 }
2250
2251 #[allow(dead_code)]
2252 pub(crate) fn remove_depth10_subscription(&mut self, topic: MStr<Topic>) {
2253 if let Some(handler) = self.depth10_handlers.remove(&topic) {
2254 msgbus::unsubscribe_book_depth10(topic.into(), &handler);
2255 }
2256 }
2257
2258 pub(crate) fn add_instrument_subscription(
2259 &mut self,
2260 topic: MStr<Topic>,
2261 handler: ShareableMessageHandler,
2262 ) {
2263 if self.topic_handlers.contains_key(&topic) {
2264 log::warn!(
2265 "Actor {} attempted duplicate instrument subscription to '{topic}'",
2266 self.actor_id
2267 );
2268 return;
2269 }
2270 self.topic_handlers.insert(topic, handler.clone());
2271 msgbus::subscribe_any(topic.into(), handler, None);
2272 }
2273
2274 #[allow(dead_code)]
2275 pub(crate) fn remove_instrument_subscription(&mut self, topic: MStr<Topic>) {
2276 if let Some(handler) = self.topic_handlers.remove(&topic) {
2277 msgbus::unsubscribe_any(topic.into(), handler);
2278 }
2279 }
2280
2281 pub(crate) fn add_instrument_close_subscription(
2282 &mut self,
2283 topic: MStr<Topic>,
2284 handler: ShareableMessageHandler,
2285 ) {
2286 if self.topic_handlers.contains_key(&topic) {
2287 log::warn!(
2288 "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2289 self.actor_id
2290 );
2291 return;
2292 }
2293 self.topic_handlers.insert(topic, handler.clone());
2294 msgbus::subscribe_any(topic.into(), handler, None);
2295 }
2296
2297 #[allow(dead_code)]
2298 pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2299 if let Some(handler) = self.topic_handlers.remove(&topic) {
2300 msgbus::unsubscribe_any(topic.into(), handler);
2301 }
2302 }
2303
2304 pub(crate) fn add_book_snapshot_subscription(
2305 &mut self,
2306 topic: MStr<Topic>,
2307 handler: TypedHandler<OrderBook>,
2308 ) {
2309 if self.book_handlers.contains_key(&topic) {
2310 log::warn!(
2311 "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2312 self.actor_id
2313 );
2314 return;
2315 }
2316 self.book_handlers.insert(topic, handler.clone());
2317 msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2318 }
2319
2320 #[allow(dead_code)]
2321 pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2322 if let Some(handler) = self.book_handlers.remove(&topic) {
2323 msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2324 }
2325 }
2326
2327 pub(crate) fn add_mark_price_subscription(
2328 &mut self,
2329 topic: MStr<Topic>,
2330 handler: TypedHandler<MarkPriceUpdate>,
2331 ) {
2332 if self.mark_price_handlers.contains_key(&topic) {
2333 log::warn!(
2334 "Actor {} attempted duplicate mark price subscription to '{topic}'",
2335 self.actor_id
2336 );
2337 return;
2338 }
2339 self.mark_price_handlers.insert(topic, handler.clone());
2340 msgbus::subscribe_mark_prices(topic.into(), handler, None);
2341 }
2342
2343 #[allow(dead_code)]
2344 pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2345 if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2346 msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2347 }
2348 }
2349
2350 pub(crate) fn add_index_price_subscription(
2351 &mut self,
2352 topic: MStr<Topic>,
2353 handler: TypedHandler<IndexPriceUpdate>,
2354 ) {
2355 if self.index_price_handlers.contains_key(&topic) {
2356 log::warn!(
2357 "Actor {} attempted duplicate index price subscription to '{topic}'",
2358 self.actor_id
2359 );
2360 return;
2361 }
2362 self.index_price_handlers.insert(topic, handler.clone());
2363 msgbus::subscribe_index_prices(topic.into(), handler, None);
2364 }
2365
2366 #[allow(dead_code)]
2367 pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2368 if let Some(handler) = self.index_price_handlers.remove(&topic) {
2369 msgbus::unsubscribe_index_prices(topic.into(), &handler);
2370 }
2371 }
2372
2373 pub(crate) fn add_funding_rate_subscription(
2374 &mut self,
2375 topic: MStr<Topic>,
2376 handler: TypedHandler<FundingRateUpdate>,
2377 ) {
2378 if self.funding_rate_handlers.contains_key(&topic) {
2379 log::warn!(
2380 "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2381 self.actor_id
2382 );
2383 return;
2384 }
2385 self.funding_rate_handlers.insert(topic, handler.clone());
2386 msgbus::subscribe_funding_rates(topic.into(), handler, None);
2387 }
2388
2389 #[allow(dead_code)]
2390 pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2391 if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2392 msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2393 }
2394 }
2395
2396 #[cfg(feature = "defi")]
2397 pub(crate) fn add_block_subscription(
2398 &mut self,
2399 topic: MStr<Topic>,
2400 handler: TypedHandler<Block>,
2401 ) {
2402 if self.block_handlers.contains_key(&topic) {
2403 log::warn!(
2404 "Actor {} attempted duplicate block subscription to '{topic}'",
2405 self.actor_id
2406 );
2407 return;
2408 }
2409 self.block_handlers.insert(topic, handler.clone());
2410 msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2411 }
2412
2413 #[cfg(feature = "defi")]
2414 #[allow(dead_code)]
2415 pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2416 if let Some(handler) = self.block_handlers.remove(&topic) {
2417 msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2418 }
2419 }
2420
2421 #[cfg(feature = "defi")]
2422 pub(crate) fn add_pool_subscription(
2423 &mut self,
2424 topic: MStr<Topic>,
2425 handler: TypedHandler<Pool>,
2426 ) {
2427 if self.pool_handlers.contains_key(&topic) {
2428 log::warn!(
2429 "Actor {} attempted duplicate pool subscription to '{topic}'",
2430 self.actor_id
2431 );
2432 return;
2433 }
2434 self.pool_handlers.insert(topic, handler.clone());
2435 msgbus::subscribe_defi_pools(topic.into(), handler, None);
2436 }
2437
2438 #[cfg(feature = "defi")]
2439 #[allow(dead_code)]
2440 pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2441 if let Some(handler) = self.pool_handlers.remove(&topic) {
2442 msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2443 }
2444 }
2445
2446 #[cfg(feature = "defi")]
2447 pub(crate) fn add_pool_swap_subscription(
2448 &mut self,
2449 topic: MStr<Topic>,
2450 handler: TypedHandler<PoolSwap>,
2451 ) {
2452 if self.pool_swap_handlers.contains_key(&topic) {
2453 log::warn!(
2454 "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2455 self.actor_id
2456 );
2457 return;
2458 }
2459 self.pool_swap_handlers.insert(topic, handler.clone());
2460 msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2461 }
2462
2463 #[cfg(feature = "defi")]
2464 #[allow(dead_code)]
2465 pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2466 if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2467 msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2468 }
2469 }
2470
2471 #[cfg(feature = "defi")]
2472 pub(crate) fn add_pool_liquidity_subscription(
2473 &mut self,
2474 topic: MStr<Topic>,
2475 handler: TypedHandler<PoolLiquidityUpdate>,
2476 ) {
2477 if self.pool_liquidity_handlers.contains_key(&topic) {
2478 log::warn!(
2479 "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2480 self.actor_id
2481 );
2482 return;
2483 }
2484 self.pool_liquidity_handlers.insert(topic, handler.clone());
2485 msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2486 }
2487
2488 #[cfg(feature = "defi")]
2489 #[allow(dead_code)]
2490 pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2491 if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2492 msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2493 }
2494 }
2495
2496 #[cfg(feature = "defi")]
2497 pub(crate) fn add_pool_collect_subscription(
2498 &mut self,
2499 topic: MStr<Topic>,
2500 handler: TypedHandler<PoolFeeCollect>,
2501 ) {
2502 if self.pool_collect_handlers.contains_key(&topic) {
2503 log::warn!(
2504 "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2505 self.actor_id
2506 );
2507 return;
2508 }
2509 self.pool_collect_handlers.insert(topic, handler.clone());
2510 msgbus::subscribe_defi_collects(topic.into(), handler, None);
2511 }
2512
2513 #[cfg(feature = "defi")]
2514 #[allow(dead_code)]
2515 pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2516 if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2517 msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2518 }
2519 }
2520
2521 #[cfg(feature = "defi")]
2522 pub(crate) fn add_pool_flash_subscription(
2523 &mut self,
2524 topic: MStr<Topic>,
2525 handler: TypedHandler<PoolFlash>,
2526 ) {
2527 if self.pool_flash_handlers.contains_key(&topic) {
2528 log::warn!(
2529 "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2530 self.actor_id
2531 );
2532 return;
2533 }
2534 self.pool_flash_handlers.insert(topic, handler.clone());
2535 msgbus::subscribe_defi_flash(topic.into(), handler, None);
2536 }
2537
2538 #[cfg(feature = "defi")]
2539 #[allow(dead_code)]
2540 pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2541 if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2542 msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2543 }
2544 }
2545
2546 pub fn new(config: DataActorConfig) -> Self {
2548 let actor_id = config
2549 .actor_id
2550 .unwrap_or_else(|| Self::default_actor_id(&config));
2551
2552 Self {
2553 actor_id,
2554 config,
2555 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2559 topic_handlers: AHashMap::new(),
2560 deltas_handlers: AHashMap::new(),
2561 depth10_handlers: AHashMap::new(),
2562 book_handlers: AHashMap::new(),
2563 quote_handlers: AHashMap::new(),
2564 trade_handlers: AHashMap::new(),
2565 bar_handlers: AHashMap::new(),
2566 mark_price_handlers: AHashMap::new(),
2567 index_price_handlers: AHashMap::new(),
2568 funding_rate_handlers: AHashMap::new(),
2569 order_event_handlers: AHashMap::new(),
2570 #[cfg(feature = "defi")]
2571 block_handlers: AHashMap::new(),
2572 #[cfg(feature = "defi")]
2573 pool_handlers: AHashMap::new(),
2574 #[cfg(feature = "defi")]
2575 pool_swap_handlers: AHashMap::new(),
2576 #[cfg(feature = "defi")]
2577 pool_liquidity_handlers: AHashMap::new(),
2578 #[cfg(feature = "defi")]
2579 pool_collect_handlers: AHashMap::new(),
2580 #[cfg(feature = "defi")]
2581 pool_flash_handlers: AHashMap::new(),
2582 warning_events: AHashSet::new(),
2583 pending_requests: AHashMap::new(),
2584 signal_classes: AHashMap::new(),
2585 #[cfg(feature = "indicators")]
2586 indicators: Indicators::default(),
2587 }
2588 }
2589
2590 #[must_use]
2592 pub fn mem_address(&self) -> String {
2593 format!("{self:p}")
2594 }
2595
2596 pub fn state(&self) -> ComponentState {
2598 self.state
2599 }
2600
2601 pub fn trader_id(&self) -> Option<TraderId> {
2603 self.trader_id
2604 }
2605
2606 pub fn actor_id(&self) -> ActorId {
2608 self.actor_id
2609 }
2610
2611 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2612 let memory_address = std::ptr::from_ref(config) as usize;
2613 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2614 }
2615
2616 pub fn timestamp_ns(&self) -> UnixNanos {
2618 self.clock_ref().timestamp_ns()
2619 }
2620
2621 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2627 self.clock
2628 .as_ref()
2629 .unwrap_or_else(|| {
2630 panic!(
2631 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2632 self.actor_id, self.trader_id
2633 )
2634 })
2635 .borrow_mut()
2636 }
2637
2638 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2644 self.clock
2645 .as_ref()
2646 .expect("DataActor must be registered before accessing clock")
2647 .clone()
2648 }
2649
2650 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2651 self.clock
2652 .as_ref()
2653 .unwrap_or_else(|| {
2654 panic!(
2655 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2656 self.actor_id, self.trader_id
2657 )
2658 })
2659 .borrow()
2660 }
2661
2662 pub fn cache(&self) -> Ref<'_, Cache> {
2668 self.cache
2669 .as_ref()
2670 .expect("DataActor must be registered before accessing cache")
2671 .borrow()
2672 }
2673
2674 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2680 self.cache
2681 .as_ref()
2682 .expect("DataActor must be registered before accessing cache")
2683 .clone()
2684 }
2685
2686 pub fn register(
2695 &mut self,
2696 trader_id: TraderId,
2697 clock: Rc<RefCell<dyn Clock>>,
2698 cache: Rc<RefCell<Cache>>,
2699 ) -> anyhow::Result<()> {
2700 if let Some(existing_trader_id) = self.trader_id {
2701 anyhow::bail!(
2702 "DataActor {} already registered with trader {existing_trader_id}",
2703 self.actor_id
2704 );
2705 }
2706
2707 {
2709 let _timestamp = clock.borrow().timestamp_ns();
2710 }
2711
2712 {
2714 let _cache_borrow = cache.borrow();
2715 }
2716
2717 self.trader_id = Some(trader_id);
2718 self.clock = Some(clock);
2719 self.cache = Some(cache);
2720
2721 if !self.is_properly_registered() {
2723 anyhow::bail!(
2724 "DataActor {} registration incomplete - validation failed",
2725 self.actor_id
2726 );
2727 }
2728
2729 log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2730 Ok(())
2731 }
2732
2733 pub fn register_warning_event(&mut self, event_type: &str) {
2735 self.warning_events.insert(event_type.to_string());
2736 log::debug!("Registered event type '{event_type}' for warning logs");
2737 }
2738
2739 pub fn deregister_warning_event(&mut self, event_type: &str) {
2741 self.warning_events.remove(event_type);
2742 log::debug!("Deregistered event type '{event_type}' from warning logs");
2743 }
2744
2745 pub fn is_registered(&self) -> bool {
2746 self.trader_id.is_some()
2747 }
2748
2749 pub(crate) fn check_registered(&self) {
2750 assert!(
2751 self.is_registered(),
2752 "Actor has not been registered with a Trader"
2753 );
2754 }
2755
2756 fn is_properly_registered(&self) -> bool {
2758 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2759 }
2760
2761 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2762 if self.config.log_commands {
2763 log::info!("{CMD}{SEND} {command:?}");
2764 }
2765
2766 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2767 msgbus::send_data_command(endpoint, command);
2768 }
2769
2770 #[allow(dead_code)]
2771 fn send_data_req(&self, request: RequestCommand) {
2772 if self.config.log_commands {
2773 log::info!("{REQ}{SEND} {request:?}");
2774 }
2775
2776 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2779 msgbus::send_any(endpoint, request.as_any());
2780 }
2781
2782 pub fn shutdown_system(&self, reason: Option<String>) {
2788 self.check_registered();
2789
2790 let command = ShutdownSystem::new(
2792 self.trader_id().unwrap(),
2793 self.actor_id.inner(),
2794 reason,
2795 UUID4::new(),
2796 self.timestamp_ns(),
2797 );
2798
2799 let endpoint = "command.system.shutdown".into();
2800 msgbus::send_any(endpoint, command.as_any());
2801 }
2802
2803 pub fn subscribe_data(
2811 &mut self,
2812 handler: ShareableMessageHandler,
2813 data_type: DataType,
2814 client_id: Option<ClientId>,
2815 params: Option<IndexMap<String, String>>,
2816 ) {
2817 assert!(
2818 self.is_properly_registered(),
2819 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2820 self.actor_id,
2821 self.trader_id,
2822 self.clock.is_some(),
2823 self.cache.is_some()
2824 );
2825
2826 let topic = get_custom_topic(&data_type);
2827 self.add_subscription_any(topic, handler);
2828
2829 if client_id.is_none() {
2831 return;
2832 }
2833
2834 let command = SubscribeCommand::Data(SubscribeCustomData {
2835 data_type,
2836 client_id,
2837 venue: None,
2838 command_id: UUID4::new(),
2839 ts_init: self.timestamp_ns(),
2840 correlation_id: None,
2841 params,
2842 });
2843
2844 self.send_data_cmd(DataCommand::Subscribe(command));
2845 }
2846
2847 pub fn subscribe_quotes(
2849 &mut self,
2850 topic: MStr<Topic>,
2851 handler: TypedHandler<QuoteTick>,
2852 instrument_id: InstrumentId,
2853 client_id: Option<ClientId>,
2854 params: Option<IndexMap<String, String>>,
2855 ) {
2856 self.check_registered();
2857
2858 self.add_quote_subscription(topic, handler);
2859
2860 let command = SubscribeCommand::Quotes(SubscribeQuotes {
2861 instrument_id,
2862 client_id,
2863 venue: Some(instrument_id.venue),
2864 command_id: UUID4::new(),
2865 ts_init: self.timestamp_ns(),
2866 correlation_id: None,
2867 params,
2868 });
2869
2870 self.send_data_cmd(DataCommand::Subscribe(command));
2871 }
2872
2873 pub fn subscribe_instruments(
2875 &mut self,
2876 topic: MStr<Topic>,
2877 handler: ShareableMessageHandler,
2878 venue: Venue,
2879 client_id: Option<ClientId>,
2880 params: Option<IndexMap<String, String>>,
2881 ) {
2882 self.check_registered();
2883
2884 self.add_instrument_subscription(topic, handler);
2885
2886 let command = SubscribeCommand::Instruments(SubscribeInstruments {
2887 client_id,
2888 venue,
2889 command_id: UUID4::new(),
2890 ts_init: self.timestamp_ns(),
2891 correlation_id: None,
2892 params,
2893 });
2894
2895 self.send_data_cmd(DataCommand::Subscribe(command));
2896 }
2897
2898 pub fn subscribe_instrument(
2900 &mut self,
2901 topic: MStr<Topic>,
2902 handler: ShareableMessageHandler,
2903 instrument_id: InstrumentId,
2904 client_id: Option<ClientId>,
2905 params: Option<IndexMap<String, String>>,
2906 ) {
2907 self.check_registered();
2908
2909 self.add_instrument_subscription(topic, handler);
2910
2911 let command = SubscribeCommand::Instrument(SubscribeInstrument {
2912 instrument_id,
2913 client_id,
2914 venue: Some(instrument_id.venue),
2915 command_id: UUID4::new(),
2916 ts_init: self.timestamp_ns(),
2917 correlation_id: None,
2918 params,
2919 });
2920
2921 self.send_data_cmd(DataCommand::Subscribe(command));
2922 }
2923
2924 #[allow(clippy::too_many_arguments)]
2926 pub fn subscribe_book_deltas(
2927 &mut self,
2928 topic: MStr<Topic>,
2929 handler: TypedHandler<OrderBookDeltas>,
2930 instrument_id: InstrumentId,
2931 book_type: BookType,
2932 depth: Option<NonZeroUsize>,
2933 client_id: Option<ClientId>,
2934 managed: bool,
2935 params: Option<IndexMap<String, String>>,
2936 ) {
2937 self.check_registered();
2938
2939 self.add_deltas_subscription(topic, handler);
2940
2941 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2942 instrument_id,
2943 book_type,
2944 client_id,
2945 venue: Some(instrument_id.venue),
2946 command_id: UUID4::new(),
2947 ts_init: self.timestamp_ns(),
2948 depth,
2949 managed,
2950 correlation_id: None,
2951 params,
2952 });
2953
2954 self.send_data_cmd(DataCommand::Subscribe(command));
2955 }
2956
2957 #[allow(clippy::too_many_arguments)]
2959 pub fn subscribe_book_at_interval(
2960 &mut self,
2961 topic: MStr<Topic>,
2962 handler: TypedHandler<OrderBook>,
2963 instrument_id: InstrumentId,
2964 book_type: BookType,
2965 depth: Option<NonZeroUsize>,
2966 interval_ms: NonZeroUsize,
2967 client_id: Option<ClientId>,
2968 params: Option<IndexMap<String, String>>,
2969 ) {
2970 self.check_registered();
2971
2972 self.add_book_snapshot_subscription(topic, handler);
2973
2974 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2975 instrument_id,
2976 book_type,
2977 client_id,
2978 venue: Some(instrument_id.venue),
2979 command_id: UUID4::new(),
2980 ts_init: self.timestamp_ns(),
2981 depth,
2982 interval_ms,
2983 correlation_id: None,
2984 params,
2985 });
2986
2987 self.send_data_cmd(DataCommand::Subscribe(command));
2988 }
2989
2990 pub fn subscribe_trades(
2992 &mut self,
2993 topic: MStr<Topic>,
2994 handler: TypedHandler<TradeTick>,
2995 instrument_id: InstrumentId,
2996 client_id: Option<ClientId>,
2997 params: Option<IndexMap<String, String>>,
2998 ) {
2999 self.check_registered();
3000
3001 self.add_trade_subscription(topic, handler);
3002
3003 let command = SubscribeCommand::Trades(SubscribeTrades {
3004 instrument_id,
3005 client_id,
3006 venue: Some(instrument_id.venue),
3007 command_id: UUID4::new(),
3008 ts_init: self.timestamp_ns(),
3009 correlation_id: None,
3010 params,
3011 });
3012
3013 self.send_data_cmd(DataCommand::Subscribe(command));
3014 }
3015
3016 pub fn subscribe_bars(
3018 &mut self,
3019 topic: MStr<Topic>,
3020 handler: TypedHandler<Bar>,
3021 bar_type: BarType,
3022 client_id: Option<ClientId>,
3023 params: Option<IndexMap<String, String>>,
3024 ) {
3025 self.check_registered();
3026
3027 self.add_bar_subscription(topic, handler);
3028
3029 let command = SubscribeCommand::Bars(SubscribeBars {
3030 bar_type,
3031 client_id,
3032 venue: Some(bar_type.instrument_id().venue),
3033 command_id: UUID4::new(),
3034 ts_init: self.timestamp_ns(),
3035 correlation_id: None,
3036 params,
3037 });
3038
3039 self.send_data_cmd(DataCommand::Subscribe(command));
3040 }
3041
3042 pub fn subscribe_mark_prices(
3044 &mut self,
3045 topic: MStr<Topic>,
3046 handler: TypedHandler<MarkPriceUpdate>,
3047 instrument_id: InstrumentId,
3048 client_id: Option<ClientId>,
3049 params: Option<IndexMap<String, String>>,
3050 ) {
3051 self.check_registered();
3052
3053 self.add_mark_price_subscription(topic, handler);
3054
3055 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3056 instrument_id,
3057 client_id,
3058 venue: Some(instrument_id.venue),
3059 command_id: UUID4::new(),
3060 ts_init: self.timestamp_ns(),
3061 correlation_id: None,
3062 params,
3063 });
3064
3065 self.send_data_cmd(DataCommand::Subscribe(command));
3066 }
3067
3068 pub fn subscribe_index_prices(
3070 &mut self,
3071 topic: MStr<Topic>,
3072 handler: TypedHandler<IndexPriceUpdate>,
3073 instrument_id: InstrumentId,
3074 client_id: Option<ClientId>,
3075 params: Option<IndexMap<String, String>>,
3076 ) {
3077 self.check_registered();
3078
3079 self.add_index_price_subscription(topic, handler);
3080
3081 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3082 instrument_id,
3083 client_id,
3084 venue: Some(instrument_id.venue),
3085 command_id: UUID4::new(),
3086 ts_init: self.timestamp_ns(),
3087 correlation_id: None,
3088 params,
3089 });
3090
3091 self.send_data_cmd(DataCommand::Subscribe(command));
3092 }
3093
3094 pub fn subscribe_funding_rates(
3096 &mut self,
3097 topic: MStr<Topic>,
3098 handler: TypedHandler<FundingRateUpdate>,
3099 instrument_id: InstrumentId,
3100 client_id: Option<ClientId>,
3101 params: Option<IndexMap<String, String>>,
3102 ) {
3103 self.check_registered();
3104
3105 self.add_funding_rate_subscription(topic, handler);
3106
3107 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3108 instrument_id,
3109 client_id,
3110 venue: Some(instrument_id.venue),
3111 command_id: UUID4::new(),
3112 ts_init: self.timestamp_ns(),
3113 correlation_id: None,
3114 params,
3115 });
3116
3117 self.send_data_cmd(DataCommand::Subscribe(command));
3118 }
3119
3120 pub fn subscribe_instrument_status(
3122 &mut self,
3123 topic: MStr<Topic>,
3124 handler: ShareableMessageHandler,
3125 instrument_id: InstrumentId,
3126 client_id: Option<ClientId>,
3127 params: Option<IndexMap<String, String>>,
3128 ) {
3129 self.check_registered();
3130
3131 self.add_subscription_any(topic, handler);
3132
3133 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3134 instrument_id,
3135 client_id,
3136 venue: Some(instrument_id.venue),
3137 command_id: UUID4::new(),
3138 ts_init: self.timestamp_ns(),
3139 correlation_id: None,
3140 params,
3141 });
3142
3143 self.send_data_cmd(DataCommand::Subscribe(command));
3144 }
3145
3146 pub fn subscribe_instrument_close(
3148 &mut self,
3149 topic: MStr<Topic>,
3150 handler: ShareableMessageHandler,
3151 instrument_id: InstrumentId,
3152 client_id: Option<ClientId>,
3153 params: Option<IndexMap<String, String>>,
3154 ) {
3155 self.check_registered();
3156
3157 self.add_instrument_close_subscription(topic, handler);
3158
3159 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3160 instrument_id,
3161 client_id,
3162 venue: Some(instrument_id.venue),
3163 command_id: UUID4::new(),
3164 ts_init: self.timestamp_ns(),
3165 correlation_id: None,
3166 params,
3167 });
3168
3169 self.send_data_cmd(DataCommand::Subscribe(command));
3170 }
3171
3172 pub fn subscribe_order_fills(
3174 &mut self,
3175 topic: MStr<Topic>,
3176 handler: TypedHandler<OrderEventAny>,
3177 ) {
3178 self.check_registered();
3179 self.add_order_event_subscription(topic, handler);
3180 }
3181
3182 pub fn subscribe_order_cancels(
3184 &mut self,
3185 topic: MStr<Topic>,
3186 handler: TypedHandler<OrderEventAny>,
3187 ) {
3188 self.check_registered();
3189 self.add_order_event_subscription(topic, handler);
3190 }
3191
3192 pub fn unsubscribe_data(
3194 &mut self,
3195 data_type: DataType,
3196 client_id: Option<ClientId>,
3197 params: Option<IndexMap<String, String>>,
3198 ) {
3199 self.check_registered();
3200
3201 let topic = get_custom_topic(&data_type);
3202 self.remove_subscription_any(topic);
3203
3204 if client_id.is_none() {
3205 return;
3206 }
3207
3208 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3209 data_type,
3210 client_id,
3211 venue: None,
3212 command_id: UUID4::new(),
3213 ts_init: self.timestamp_ns(),
3214 correlation_id: None,
3215 params,
3216 });
3217
3218 self.send_data_cmd(DataCommand::Unsubscribe(command));
3219 }
3220
3221 pub fn unsubscribe_instruments(
3223 &mut self,
3224 venue: Venue,
3225 client_id: Option<ClientId>,
3226 params: Option<IndexMap<String, String>>,
3227 ) {
3228 self.check_registered();
3229
3230 let topic = get_instruments_topic(venue);
3231 self.remove_instrument_subscription(topic);
3232
3233 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3234 client_id,
3235 venue,
3236 command_id: UUID4::new(),
3237 ts_init: self.timestamp_ns(),
3238 correlation_id: None,
3239 params,
3240 });
3241
3242 self.send_data_cmd(DataCommand::Unsubscribe(command));
3243 }
3244
3245 pub fn unsubscribe_instrument(
3247 &mut self,
3248 instrument_id: InstrumentId,
3249 client_id: Option<ClientId>,
3250 params: Option<IndexMap<String, String>>,
3251 ) {
3252 self.check_registered();
3253
3254 let topic = get_instrument_topic(instrument_id);
3255 self.remove_instrument_subscription(topic);
3256
3257 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3258 instrument_id,
3259 client_id,
3260 venue: Some(instrument_id.venue),
3261 command_id: UUID4::new(),
3262 ts_init: self.timestamp_ns(),
3263 correlation_id: None,
3264 params,
3265 });
3266
3267 self.send_data_cmd(DataCommand::Unsubscribe(command));
3268 }
3269
3270 pub fn unsubscribe_book_deltas(
3272 &mut self,
3273 instrument_id: InstrumentId,
3274 client_id: Option<ClientId>,
3275 params: Option<IndexMap<String, String>>,
3276 ) {
3277 self.check_registered();
3278
3279 let topic = get_book_deltas_topic(instrument_id);
3280 self.remove_deltas_subscription(topic);
3281
3282 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3283 instrument_id,
3284 client_id,
3285 venue: Some(instrument_id.venue),
3286 command_id: UUID4::new(),
3287 ts_init: self.timestamp_ns(),
3288 correlation_id: None,
3289 params,
3290 });
3291
3292 self.send_data_cmd(DataCommand::Unsubscribe(command));
3293 }
3294
3295 pub fn unsubscribe_book_at_interval(
3297 &mut self,
3298 instrument_id: InstrumentId,
3299 interval_ms: NonZeroUsize,
3300 client_id: Option<ClientId>,
3301 params: Option<IndexMap<String, String>>,
3302 ) {
3303 self.check_registered();
3304
3305 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3306 self.remove_book_snapshot_subscription(topic);
3307
3308 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3309 instrument_id,
3310 client_id,
3311 venue: Some(instrument_id.venue),
3312 command_id: UUID4::new(),
3313 ts_init: self.timestamp_ns(),
3314 correlation_id: None,
3315 params,
3316 });
3317
3318 self.send_data_cmd(DataCommand::Unsubscribe(command));
3319 }
3320
3321 pub fn unsubscribe_quotes(
3323 &mut self,
3324 instrument_id: InstrumentId,
3325 client_id: Option<ClientId>,
3326 params: Option<IndexMap<String, String>>,
3327 ) {
3328 self.check_registered();
3329
3330 let topic = get_quotes_topic(instrument_id);
3331 self.remove_quote_subscription(topic);
3332
3333 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3334 instrument_id,
3335 client_id,
3336 venue: Some(instrument_id.venue),
3337 command_id: UUID4::new(),
3338 ts_init: self.timestamp_ns(),
3339 correlation_id: None,
3340 params,
3341 });
3342
3343 self.send_data_cmd(DataCommand::Unsubscribe(command));
3344 }
3345
3346 pub fn unsubscribe_trades(
3348 &mut self,
3349 instrument_id: InstrumentId,
3350 client_id: Option<ClientId>,
3351 params: Option<IndexMap<String, String>>,
3352 ) {
3353 self.check_registered();
3354
3355 let topic = get_trades_topic(instrument_id);
3356 self.remove_trade_subscription(topic);
3357
3358 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3359 instrument_id,
3360 client_id,
3361 venue: Some(instrument_id.venue),
3362 command_id: UUID4::new(),
3363 ts_init: self.timestamp_ns(),
3364 correlation_id: None,
3365 params,
3366 });
3367
3368 self.send_data_cmd(DataCommand::Unsubscribe(command));
3369 }
3370
3371 pub fn unsubscribe_bars(
3373 &mut self,
3374 bar_type: BarType,
3375 client_id: Option<ClientId>,
3376 params: Option<IndexMap<String, String>>,
3377 ) {
3378 self.check_registered();
3379
3380 let topic = get_bars_topic(bar_type);
3381 self.remove_bar_subscription(topic);
3382
3383 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3384 bar_type,
3385 client_id,
3386 venue: Some(bar_type.instrument_id().venue),
3387 command_id: UUID4::new(),
3388 ts_init: self.timestamp_ns(),
3389 correlation_id: None,
3390 params,
3391 });
3392
3393 self.send_data_cmd(DataCommand::Unsubscribe(command));
3394 }
3395
3396 pub fn unsubscribe_mark_prices(
3398 &mut self,
3399 instrument_id: InstrumentId,
3400 client_id: Option<ClientId>,
3401 params: Option<IndexMap<String, String>>,
3402 ) {
3403 self.check_registered();
3404
3405 let topic = get_mark_price_topic(instrument_id);
3406 self.remove_mark_price_subscription(topic);
3407
3408 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3409 instrument_id,
3410 client_id,
3411 venue: Some(instrument_id.venue),
3412 command_id: UUID4::new(),
3413 ts_init: self.timestamp_ns(),
3414 correlation_id: None,
3415 params,
3416 });
3417
3418 self.send_data_cmd(DataCommand::Unsubscribe(command));
3419 }
3420
3421 pub fn unsubscribe_index_prices(
3423 &mut self,
3424 instrument_id: InstrumentId,
3425 client_id: Option<ClientId>,
3426 params: Option<IndexMap<String, String>>,
3427 ) {
3428 self.check_registered();
3429
3430 let topic = get_index_price_topic(instrument_id);
3431 self.remove_index_price_subscription(topic);
3432
3433 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3434 instrument_id,
3435 client_id,
3436 venue: Some(instrument_id.venue),
3437 command_id: UUID4::new(),
3438 ts_init: self.timestamp_ns(),
3439 correlation_id: None,
3440 params,
3441 });
3442
3443 self.send_data_cmd(DataCommand::Unsubscribe(command));
3444 }
3445
3446 pub fn unsubscribe_funding_rates(
3448 &mut self,
3449 instrument_id: InstrumentId,
3450 client_id: Option<ClientId>,
3451 params: Option<IndexMap<String, String>>,
3452 ) {
3453 self.check_registered();
3454
3455 let topic = get_funding_rate_topic(instrument_id);
3456 self.remove_funding_rate_subscription(topic);
3457
3458 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3459 instrument_id,
3460 client_id,
3461 venue: Some(instrument_id.venue),
3462 command_id: UUID4::new(),
3463 ts_init: self.timestamp_ns(),
3464 correlation_id: None,
3465 params,
3466 });
3467
3468 self.send_data_cmd(DataCommand::Unsubscribe(command));
3469 }
3470
3471 pub fn unsubscribe_instrument_status(
3473 &mut self,
3474 instrument_id: InstrumentId,
3475 client_id: Option<ClientId>,
3476 params: Option<IndexMap<String, String>>,
3477 ) {
3478 self.check_registered();
3479
3480 let topic = get_instrument_status_topic(instrument_id);
3481 self.remove_subscription_any(topic);
3482
3483 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3484 instrument_id,
3485 client_id,
3486 venue: Some(instrument_id.venue),
3487 command_id: UUID4::new(),
3488 ts_init: self.timestamp_ns(),
3489 correlation_id: None,
3490 params,
3491 });
3492
3493 self.send_data_cmd(DataCommand::Unsubscribe(command));
3494 }
3495
3496 pub fn unsubscribe_instrument_close(
3498 &mut self,
3499 instrument_id: InstrumentId,
3500 client_id: Option<ClientId>,
3501 params: Option<IndexMap<String, String>>,
3502 ) {
3503 self.check_registered();
3504
3505 let topic = get_instrument_close_topic(instrument_id);
3506 self.remove_instrument_close_subscription(topic);
3507
3508 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3509 instrument_id,
3510 client_id,
3511 venue: Some(instrument_id.venue),
3512 command_id: UUID4::new(),
3513 ts_init: self.timestamp_ns(),
3514 correlation_id: None,
3515 params,
3516 });
3517
3518 self.send_data_cmd(DataCommand::Unsubscribe(command));
3519 }
3520
3521 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3523 self.check_registered();
3524
3525 let topic = get_order_fills_topic(instrument_id);
3526 self.remove_order_event_subscription(topic);
3527 }
3528
3529 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3531 self.check_registered();
3532
3533 let topic = get_order_cancels_topic(instrument_id);
3534 self.remove_order_event_subscription(topic);
3535 }
3536
3537 #[allow(clippy::too_many_arguments)]
3543 pub fn request_data(
3544 &self,
3545 data_type: DataType,
3546 client_id: ClientId,
3547 start: Option<DateTime<Utc>>,
3548 end: Option<DateTime<Utc>>,
3549 limit: Option<NonZeroUsize>,
3550 params: Option<IndexMap<String, String>>,
3551 handler: ShareableMessageHandler,
3552 ) -> anyhow::Result<UUID4> {
3553 self.check_registered();
3554
3555 let now = self.clock_ref().utc_now();
3556 check_timestamps(now, start, end)?;
3557
3558 let request_id = UUID4::new();
3559 let command = RequestCommand::Data(RequestCustomData {
3560 client_id,
3561 data_type,
3562 start,
3563 end,
3564 limit,
3565 request_id,
3566 ts_init: self.timestamp_ns(),
3567 params,
3568 });
3569
3570 get_message_bus()
3571 .borrow_mut()
3572 .register_response_handler(command.request_id(), handler)?;
3573
3574 self.send_data_cmd(DataCommand::Request(command));
3575
3576 Ok(request_id)
3577 }
3578
3579 pub fn request_instrument(
3585 &self,
3586 instrument_id: InstrumentId,
3587 start: Option<DateTime<Utc>>,
3588 end: Option<DateTime<Utc>>,
3589 client_id: Option<ClientId>,
3590 params: Option<IndexMap<String, String>>,
3591 handler: ShareableMessageHandler,
3592 ) -> anyhow::Result<UUID4> {
3593 self.check_registered();
3594
3595 let now = self.clock_ref().utc_now();
3596 check_timestamps(now, start, end)?;
3597
3598 let request_id = UUID4::new();
3599 let command = RequestCommand::Instrument(RequestInstrument {
3600 instrument_id,
3601 start,
3602 end,
3603 client_id,
3604 request_id,
3605 ts_init: now.into(),
3606 params,
3607 });
3608
3609 get_message_bus()
3610 .borrow_mut()
3611 .register_response_handler(command.request_id(), handler)?;
3612
3613 self.send_data_cmd(DataCommand::Request(command));
3614
3615 Ok(request_id)
3616 }
3617
3618 pub fn request_instruments(
3624 &self,
3625 venue: Option<Venue>,
3626 start: Option<DateTime<Utc>>,
3627 end: Option<DateTime<Utc>>,
3628 client_id: Option<ClientId>,
3629 params: Option<IndexMap<String, String>>,
3630 handler: ShareableMessageHandler,
3631 ) -> anyhow::Result<UUID4> {
3632 self.check_registered();
3633
3634 let now = self.clock_ref().utc_now();
3635 check_timestamps(now, start, end)?;
3636
3637 let request_id = UUID4::new();
3638 let command = RequestCommand::Instruments(RequestInstruments {
3639 venue,
3640 start,
3641 end,
3642 client_id,
3643 request_id,
3644 ts_init: now.into(),
3645 params,
3646 });
3647
3648 get_message_bus()
3649 .borrow_mut()
3650 .register_response_handler(command.request_id(), handler)?;
3651
3652 self.send_data_cmd(DataCommand::Request(command));
3653
3654 Ok(request_id)
3655 }
3656
3657 pub fn request_book_snapshot(
3663 &self,
3664 instrument_id: InstrumentId,
3665 depth: Option<NonZeroUsize>,
3666 client_id: Option<ClientId>,
3667 params: Option<IndexMap<String, String>>,
3668 handler: ShareableMessageHandler,
3669 ) -> anyhow::Result<UUID4> {
3670 self.check_registered();
3671
3672 let request_id = UUID4::new();
3673 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3674 instrument_id,
3675 depth,
3676 client_id,
3677 request_id,
3678 ts_init: self.timestamp_ns(),
3679 params,
3680 });
3681
3682 get_message_bus()
3683 .borrow_mut()
3684 .register_response_handler(command.request_id(), handler)?;
3685
3686 self.send_data_cmd(DataCommand::Request(command));
3687
3688 Ok(request_id)
3689 }
3690
3691 #[allow(clippy::too_many_arguments)]
3697 pub fn request_quotes(
3698 &self,
3699 instrument_id: InstrumentId,
3700 start: Option<DateTime<Utc>>,
3701 end: Option<DateTime<Utc>>,
3702 limit: Option<NonZeroUsize>,
3703 client_id: Option<ClientId>,
3704 params: Option<IndexMap<String, String>>,
3705 handler: ShareableMessageHandler,
3706 ) -> anyhow::Result<UUID4> {
3707 self.check_registered();
3708
3709 let now = self.clock_ref().utc_now();
3710 check_timestamps(now, start, end)?;
3711
3712 let request_id = UUID4::new();
3713 let command = RequestCommand::Quotes(RequestQuotes {
3714 instrument_id,
3715 start,
3716 end,
3717 limit,
3718 client_id,
3719 request_id,
3720 ts_init: now.into(),
3721 params,
3722 });
3723
3724 get_message_bus()
3725 .borrow_mut()
3726 .register_response_handler(command.request_id(), handler)?;
3727
3728 self.send_data_cmd(DataCommand::Request(command));
3729
3730 Ok(request_id)
3731 }
3732
3733 #[allow(clippy::too_many_arguments)]
3739 pub fn request_trades(
3740 &self,
3741 instrument_id: InstrumentId,
3742 start: Option<DateTime<Utc>>,
3743 end: Option<DateTime<Utc>>,
3744 limit: Option<NonZeroUsize>,
3745 client_id: Option<ClientId>,
3746 params: Option<IndexMap<String, String>>,
3747 handler: ShareableMessageHandler,
3748 ) -> anyhow::Result<UUID4> {
3749 self.check_registered();
3750
3751 let now = self.clock_ref().utc_now();
3752 check_timestamps(now, start, end)?;
3753
3754 let request_id = UUID4::new();
3755 let command = RequestCommand::Trades(RequestTrades {
3756 instrument_id,
3757 start,
3758 end,
3759 limit,
3760 client_id,
3761 request_id,
3762 ts_init: now.into(),
3763 params,
3764 });
3765
3766 get_message_bus()
3767 .borrow_mut()
3768 .register_response_handler(command.request_id(), handler)?;
3769
3770 self.send_data_cmd(DataCommand::Request(command));
3771
3772 Ok(request_id)
3773 }
3774
3775 #[allow(clippy::too_many_arguments)]
3781 pub fn request_funding_rates(
3782 &self,
3783 instrument_id: InstrumentId,
3784 start: Option<DateTime<Utc>>,
3785 end: Option<DateTime<Utc>>,
3786 limit: Option<NonZeroUsize>,
3787 client_id: Option<ClientId>,
3788 params: Option<IndexMap<String, String>>,
3789 handler: ShareableMessageHandler,
3790 ) -> anyhow::Result<UUID4> {
3791 self.check_registered();
3792
3793 let now = self.clock_ref().utc_now();
3794 check_timestamps(now, start, end)?;
3795
3796 let request_id = UUID4::new();
3797 let command = RequestCommand::FundingRates(RequestFundingRates {
3798 instrument_id,
3799 start,
3800 end,
3801 limit,
3802 client_id,
3803 request_id,
3804 ts_init: now.into(),
3805 params,
3806 });
3807
3808 get_message_bus()
3809 .borrow_mut()
3810 .register_response_handler(command.request_id(), handler)?;
3811
3812 self.send_data_cmd(DataCommand::Request(command));
3813
3814 Ok(request_id)
3815 }
3816
3817 #[allow(clippy::too_many_arguments)]
3823 pub fn request_bars(
3824 &self,
3825 bar_type: BarType,
3826 start: Option<DateTime<Utc>>,
3827 end: Option<DateTime<Utc>>,
3828 limit: Option<NonZeroUsize>,
3829 client_id: Option<ClientId>,
3830 params: Option<IndexMap<String, String>>,
3831 handler: ShareableMessageHandler,
3832 ) -> anyhow::Result<UUID4> {
3833 self.check_registered();
3834
3835 let now = self.clock_ref().utc_now();
3836 check_timestamps(now, start, end)?;
3837
3838 let request_id = UUID4::new();
3839 let command = RequestCommand::Bars(RequestBars {
3840 bar_type,
3841 start,
3842 end,
3843 limit,
3844 client_id,
3845 request_id,
3846 ts_init: now.into(),
3847 params,
3848 });
3849
3850 get_message_bus()
3851 .borrow_mut()
3852 .register_response_handler(command.request_id(), handler)?;
3853
3854 self.send_data_cmd(DataCommand::Request(command));
3855
3856 Ok(request_id)
3857 }
3858
3859 #[cfg(test)]
3860 pub fn quote_handler_count(&self) -> usize {
3861 self.quote_handlers.len()
3862 }
3863
3864 #[cfg(test)]
3865 pub fn trade_handler_count(&self) -> usize {
3866 self.trade_handlers.len()
3867 }
3868
3869 #[cfg(test)]
3870 pub fn bar_handler_count(&self) -> usize {
3871 self.bar_handlers.len()
3872 }
3873
3874 #[cfg(test)]
3875 pub fn deltas_handler_count(&self) -> usize {
3876 self.deltas_handlers.len()
3877 }
3878
3879 #[cfg(test)]
3880 pub fn has_quote_handler(&self, topic: &str) -> bool {
3881 self.quote_handlers
3882 .contains_key(&MStr::<Topic>::from(topic))
3883 }
3884
3885 #[cfg(test)]
3886 pub fn has_trade_handler(&self, topic: &str) -> bool {
3887 self.trade_handlers
3888 .contains_key(&MStr::<Topic>::from(topic))
3889 }
3890
3891 #[cfg(test)]
3892 pub fn has_bar_handler(&self, topic: &str) -> bool {
3893 self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
3894 }
3895
3896 #[cfg(test)]
3897 pub fn has_deltas_handler(&self, topic: &str) -> bool {
3898 self.deltas_handlers
3899 .contains_key(&MStr::<Topic>::from(topic))
3900 }
3901}
3902
3903fn check_timestamps(
3904 now: DateTime<Utc>,
3905 start: Option<DateTime<Utc>>,
3906 end: Option<DateTime<Utc>>,
3907) -> anyhow::Result<()> {
3908 if let Some(start) = start {
3909 check_predicate_true(start <= now, "start was > now")?;
3910 }
3911 if let Some(end) = end {
3912 check_predicate_true(end <= now, "end was > now")?;
3913 }
3914
3915 if let (Some(start), Some(end)) = (start, end) {
3916 check_predicate_true(start < end, "start was >= end")?;
3917 }
3918
3919 Ok(())
3920}
3921
3922fn log_error(e: &anyhow::Error) {
3923 log::error!("{e}");
3924}
3925
3926fn log_not_running<T>(msg: &T)
3927where
3928 T: Debug,
3929{
3930 log::trace!("Received message when not running - skipping {msg:?}");
3931}
3932
3933fn log_received<T>(msg: &T)
3934where
3935 T: Debug,
3936{
3937 log::debug!("{RECV} {msg:?}");
3938}