1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use chrono::{DateTime, Utc};
28use indexmap::IndexMap;
29use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
30#[cfg(feature = "defi")]
31use nautilus_model::defi::{Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap};
32use nautilus_model::{
33 data::{
34 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
35 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
36 },
37 enums::BookType,
38 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
39 instruments::InstrumentAny,
40 orderbook::OrderBook,
41};
42use ustr::Ustr;
43
44#[cfg(feature = "indicators")]
45use super::indicators::Indicators;
46use super::{
47 Actor,
48 registry::{get_actor_unchecked, try_get_actor_unchecked},
49};
50#[cfg(feature = "defi")]
51use crate::msgbus::switchboard::{
52 get_defi_blocks_topic, get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
53};
54use crate::{
55 cache::Cache,
56 clock::Clock,
57 component::Component,
58 enums::{ComponentState, ComponentTrigger},
59 logging::{CMD, RECV, REQ, SEND},
60 messages::{
61 data::{
62 BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
63 InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
64 RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
65 SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
66 SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
67 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
68 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
69 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
70 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
71 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
72 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
73 },
74 system::ShutdownSystem,
75 },
76 msgbus::{
77 self, MStr, Topic, get_message_bus,
78 handler::{ShareableMessageHandler, TypedMessageHandler},
79 switchboard::{
80 MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
81 get_custom_topic, get_funding_rate_topic, get_index_price_topic,
82 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
83 get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
84 },
85 },
86 signal::Signal,
87 timer::{TimeEvent, TimeEventCallback},
88};
89
90#[derive(Debug, Clone)]
92#[cfg_attr(
93 feature = "python",
94 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", subclass)
95)]
96pub struct DataActorConfig {
97 pub actor_id: Option<ActorId>,
99 pub log_events: bool,
101 pub log_commands: bool,
103}
104
105impl Default for DataActorConfig {
106 fn default() -> Self {
107 Self {
108 actor_id: None,
109 log_events: true,
110 log_commands: true,
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
117#[cfg_attr(
118 feature = "python",
119 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
120)]
121pub struct ImportableActorConfig {
122 pub actor_path: String,
124 pub config_path: String,
126 pub config: HashMap<String, serde_json::Value>,
128}
129
130type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; pub trait DataActor:
133 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
134{
135 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
141 Ok(IndexMap::new())
142 }
143
144 #[allow(unused_variables)]
150 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
151 Ok(())
152 }
153
154 fn on_start(&mut self) -> anyhow::Result<()> {
160 log::warn!(
161 "The `on_start` handler was called when not overridden, \
162 it's expected that any actions required when starting the actor \
163 occur here, such as subscribing/requesting data"
164 );
165 Ok(())
166 }
167
168 fn on_stop(&mut self) -> anyhow::Result<()> {
174 log::warn!(
175 "The `on_stop` handler was called when not overridden, \
176 it's expected that any actions required when stopping the actor \
177 occur here, such as unsubscribing from data",
178 );
179 Ok(())
180 }
181
182 fn on_resume(&mut self) -> anyhow::Result<()> {
188 log::warn!(
189 "The `on_resume` handler was called when not overridden, \
190 it's expected that any actions required when resuming the actor \
191 following a stop occur here"
192 );
193 Ok(())
194 }
195
196 fn on_reset(&mut self) -> anyhow::Result<()> {
202 log::warn!(
203 "The `on_reset` handler was called when not overridden, \
204 it's expected that any actions required when resetting the actor \
205 occur here, such as resetting indicators and other state"
206 );
207 Ok(())
208 }
209
210 fn on_dispose(&mut self) -> anyhow::Result<()> {
216 Ok(())
217 }
218
219 fn on_degrade(&mut self) -> anyhow::Result<()> {
225 Ok(())
226 }
227
228 fn on_fault(&mut self) -> anyhow::Result<()> {
234 Ok(())
235 }
236
237 #[allow(unused_variables)]
243 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
244 Ok(())
245 }
246
247 #[allow(unused_variables)]
253 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
254 Ok(())
255 }
256
257 #[allow(unused_variables)]
263 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
264 Ok(())
265 }
266
267 #[allow(unused_variables)]
273 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
274 Ok(())
275 }
276
277 #[allow(unused_variables)]
283 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
284 Ok(())
285 }
286
287 #[allow(unused_variables)]
293 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
294 Ok(())
295 }
296
297 #[allow(unused_variables)]
303 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
304 Ok(())
305 }
306
307 #[allow(unused_variables)]
313 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
314 Ok(())
315 }
316
317 #[allow(unused_variables)]
323 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
324 Ok(())
325 }
326
327 #[allow(unused_variables)]
333 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
334 Ok(())
335 }
336
337 #[allow(unused_variables)]
343 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
344 Ok(())
345 }
346
347 #[allow(unused_variables)]
353 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
354 Ok(())
355 }
356
357 #[allow(unused_variables)]
363 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
364 Ok(())
365 }
366
367 #[allow(unused_variables)]
373 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
374 Ok(())
375 }
376
377 #[cfg(feature = "defi")]
378 #[allow(unused_variables)]
384 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
385 Ok(())
386 }
387
388 #[cfg(feature = "defi")]
389 #[allow(unused_variables)]
395 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
396 Ok(())
397 }
398
399 #[cfg(feature = "defi")]
400 #[allow(unused_variables)]
406 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
407 Ok(())
408 }
409
410 #[cfg(feature = "defi")]
411 #[allow(unused_variables)]
417 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
418 Ok(())
419 }
420
421 #[allow(unused_variables)]
427 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
428 Ok(())
429 }
430
431 #[allow(unused_variables)]
437 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
438 Ok(())
439 }
440
441 #[allow(unused_variables)]
447 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
448 Ok(())
449 }
450
451 #[allow(unused_variables)]
457 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
458 Ok(())
459 }
460
461 #[allow(unused_variables)]
467 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
468 Ok(())
469 }
470
471 #[allow(unused_variables)]
477 fn on_historical_index_prices(
478 &mut self,
479 index_prices: &[IndexPriceUpdate],
480 ) -> anyhow::Result<()> {
481 Ok(())
482 }
483
484 fn handle_time_event(&mut self, event: &TimeEvent) {
486 log_received(&event);
487
488 if let Err(e) = DataActor::on_time_event(self, event) {
489 log_error(&e);
490 }
491 }
492
493 fn handle_data(&mut self, data: &dyn Any) {
495 log_received(&data);
496
497 if self.not_running() {
498 log_not_running(&data);
499 return;
500 }
501
502 if let Err(e) = self.on_data(data) {
503 log_error(&e);
504 }
505 }
506
507 fn handle_signal(&mut self, signal: &Signal) {
509 log_received(&signal);
510
511 if self.not_running() {
512 log_not_running(&signal);
513 return;
514 }
515
516 if let Err(e) = self.on_signal(signal) {
517 log_error(&e);
518 }
519 }
520
521 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
523 log_received(&instrument);
524
525 if self.not_running() {
526 log_not_running(&instrument);
527 return;
528 }
529
530 if let Err(e) = self.on_instrument(instrument) {
531 log_error(&e);
532 }
533 }
534
535 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
537 log_received(&deltas);
538
539 if self.not_running() {
540 log_not_running(&deltas);
541 return;
542 }
543
544 if let Err(e) = self.on_book_deltas(deltas) {
545 log_error(&e);
546 }
547 }
548
549 fn handle_book(&mut self, book: &OrderBook) {
551 log_received(&book);
552
553 if self.not_running() {
554 log_not_running(&book);
555 return;
556 }
557
558 if let Err(e) = self.on_book(book) {
559 log_error(&e);
560 };
561 }
562
563 fn handle_quote(&mut self, quote: &QuoteTick) {
565 log_received("e);
566
567 if self.not_running() {
568 log_not_running("e);
569 return;
570 }
571
572 if let Err(e) = self.on_quote(quote) {
573 log_error(&e);
574 }
575 }
576
577 fn handle_trade(&mut self, trade: &TradeTick) {
579 log_received(&trade);
580
581 if self.not_running() {
582 log_not_running(&trade);
583 return;
584 }
585
586 if let Err(e) = self.on_trade(trade) {
587 log_error(&e);
588 }
589 }
590
591 fn handle_bar(&mut self, bar: &Bar) {
593 log_received(&bar);
594
595 if self.not_running() {
596 log_not_running(&bar);
597 return;
598 }
599
600 if let Err(e) = self.on_bar(bar) {
601 log_error(&e);
602 }
603 }
604
605 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
607 log_received(&mark_price);
608
609 if self.not_running() {
610 log_not_running(&mark_price);
611 return;
612 }
613
614 if let Err(e) = self.on_mark_price(mark_price) {
615 log_error(&e);
616 }
617 }
618
619 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
621 log_received(&index_price);
622
623 if self.not_running() {
624 log_not_running(&index_price);
625 return;
626 }
627
628 if let Err(e) = self.on_index_price(index_price) {
629 log_error(&e);
630 }
631 }
632
633 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
635 log_received(&funding_rate);
636
637 if self.not_running() {
638 log_not_running(&funding_rate);
639 return;
640 }
641
642 if let Err(e) = self.on_funding_rate(funding_rate) {
643 log_error(&e);
644 }
645 }
646
647 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
649 log_received(&status);
650
651 if self.not_running() {
652 log_not_running(&status);
653 return;
654 }
655
656 if let Err(e) = self.on_instrument_status(status) {
657 log_error(&e);
658 }
659 }
660
661 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
663 log_received(&close);
664
665 if self.not_running() {
666 log_not_running(&close);
667 return;
668 }
669
670 if let Err(e) = self.on_instrument_close(close) {
671 log_error(&e);
672 }
673 }
674
675 #[cfg(feature = "defi")]
676 fn handle_block(&mut self, block: &Block) {
678 log_received(&block);
679
680 if self.not_running() {
681 log_not_running(&block);
682 return;
683 }
684
685 if let Err(e) = self.on_block(block) {
686 log_error(&e);
687 }
688 }
689
690 #[cfg(feature = "defi")]
691 fn handle_pool(&mut self, pool: &Pool) {
693 log_received(&pool);
694
695 if self.not_running() {
696 log_not_running(&pool);
697 return;
698 }
699
700 if let Err(e) = self.on_pool(pool) {
701 log_error(&e);
702 }
703 }
704
705 #[cfg(feature = "defi")]
706 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
708 log_received(&swap);
709
710 if self.not_running() {
711 log_not_running(&swap);
712 return;
713 }
714
715 if let Err(e) = self.on_pool_swap(swap) {
716 log_error(&e);
717 }
718 }
719
720 #[cfg(feature = "defi")]
721 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
723 log_received(&update);
724
725 if self.not_running() {
726 log_not_running(&update);
727 return;
728 }
729
730 if let Err(e) = self.on_pool_liquidity_update(update) {
731 log_error(&e);
732 }
733 }
734
735 fn handle_historical_data(&mut self, data: &dyn Any) {
737 log_received(&data);
738
739 if let Err(e) = self.on_historical_data(data) {
740 log_error(&e);
741 }
742 }
743
744 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
746 log_received(&resp);
747
748 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
749 log_error(&e);
750 }
751 }
752
753 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
755 log_received(&resp);
756
757 if let Err(e) = self.on_instrument(&resp.data) {
758 log_error(&e);
759 }
760 }
761
762 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
764 log_received(&resp);
765
766 for inst in &resp.data {
767 if let Err(e) = self.on_instrument(inst) {
768 log_error(&e);
769 }
770 }
771 }
772
773 fn handle_book_response(&mut self, resp: &BookResponse) {
775 log_received(&resp);
776
777 if let Err(e) = self.on_book(&resp.data) {
778 log_error(&e);
779 }
780 }
781
782 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
784 log_received(&resp);
785
786 if let Err(e) = self.on_historical_quotes(&resp.data) {
787 log_error(&e);
788 }
789 }
790
791 fn handle_trades_response(&mut self, resp: &TradesResponse) {
793 log_received(&resp);
794
795 if let Err(e) = self.on_historical_trades(&resp.data) {
796 log_error(&e);
797 }
798 }
799
800 fn handle_bars_response(&mut self, resp: &BarsResponse) {
802 log_received(&resp);
803
804 if let Err(e) = self.on_historical_bars(&resp.data) {
805 log_error(&e);
806 }
807 }
808
809 fn subscribe_data(
811 &mut self,
812 data_type: DataType,
813 client_id: Option<ClientId>,
814 params: Option<IndexMap<String, String>>,
815 ) where
816 Self: 'static + Debug + Sized,
817 {
818 let actor_id = self.actor_id().inner();
819 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
820 move |data: &dyn Any| {
821 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
822 },
823 )));
824
825 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
826 }
827
828 fn subscribe_quotes(
830 &mut self,
831 instrument_id: InstrumentId,
832 client_id: Option<ClientId>,
833 params: Option<IndexMap<String, String>>,
834 ) where
835 Self: 'static + Debug + Sized,
836 {
837 let actor_id = self.actor_id().inner();
838 let topic = get_quotes_topic(instrument_id);
839
840 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
841 move |quote: &QuoteTick| {
842 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
843 actor.handle_quote(quote);
844 } else {
845 log::error!("Actor {actor_id} not found for quote handling");
846 }
847 },
848 )));
849
850 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
851 }
852
853 fn subscribe_instruments(
855 &mut self,
856 venue: Venue,
857 client_id: Option<ClientId>,
858 params: Option<IndexMap<String, String>>,
859 ) where
860 Self: 'static + Debug + Sized,
861 {
862 let actor_id = self.actor_id().inner();
863 let topic = get_instruments_topic(venue);
864
865 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
866 move |instrument: &InstrumentAny| {
867 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
868 actor.handle_instrument(instrument);
869 } else {
870 log::error!("Actor {actor_id} not found for instruments handling");
871 }
872 },
873 )));
874
875 DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
876 }
877
878 fn subscribe_instrument(
880 &mut self,
881 instrument_id: InstrumentId,
882 client_id: Option<ClientId>,
883 params: Option<IndexMap<String, String>>,
884 ) where
885 Self: 'static + Debug + Sized,
886 {
887 let actor_id = self.actor_id().inner();
888 let topic = get_instrument_topic(instrument_id);
889
890 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
891 move |instrument: &InstrumentAny| {
892 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
893 actor.handle_instrument(instrument);
894 } else {
895 log::error!("Actor {actor_id} not found for instrument handling");
896 }
897 },
898 )));
899
900 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
901 }
902
903 fn subscribe_book_deltas(
905 &mut self,
906 instrument_id: InstrumentId,
907 book_type: BookType,
908 depth: Option<NonZeroUsize>,
909 client_id: Option<ClientId>,
910 managed: bool,
911 params: Option<IndexMap<String, String>>,
912 ) where
913 Self: 'static + Debug + Sized,
914 {
915 let actor_id = self.actor_id().inner();
916 let topic = get_book_deltas_topic(instrument_id);
917
918 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
919 move |deltas: &OrderBookDeltas| {
920 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
921 },
922 )));
923
924 DataActorCore::subscribe_book_deltas(
925 self,
926 topic,
927 handler,
928 instrument_id,
929 book_type,
930 depth,
931 client_id,
932 managed,
933 params,
934 );
935 }
936
937 fn subscribe_book_at_interval(
939 &mut self,
940 instrument_id: InstrumentId,
941 book_type: BookType,
942 depth: Option<NonZeroUsize>,
943 interval_ms: NonZeroUsize,
944 client_id: Option<ClientId>,
945 params: Option<IndexMap<String, String>>,
946 ) where
947 Self: 'static + Debug + Sized,
948 {
949 let actor_id = self.actor_id().inner();
950 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
951
952 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
953 move |book: &OrderBook| {
954 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
955 },
956 )));
957
958 DataActorCore::subscribe_book_at_interval(
959 self,
960 topic,
961 handler,
962 instrument_id,
963 book_type,
964 depth,
965 interval_ms,
966 client_id,
967 params,
968 );
969 }
970
971 fn subscribe_trades(
973 &mut self,
974 instrument_id: InstrumentId,
975 client_id: Option<ClientId>,
976 params: Option<IndexMap<String, String>>,
977 ) where
978 Self: 'static + Debug + Sized,
979 {
980 let actor_id = self.actor_id().inner();
981 let topic = get_trades_topic(instrument_id);
982
983 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
984 move |trade: &TradeTick| {
985 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
986 },
987 )));
988
989 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
990 }
991
992 fn subscribe_bars(
994 &mut self,
995 bar_type: BarType,
996 client_id: Option<ClientId>,
997 await_partial: bool,
998 params: Option<IndexMap<String, String>>,
999 ) where
1000 Self: 'static + Debug + Sized,
1001 {
1002 let actor_id = self.actor_id().inner();
1003 let topic = get_bars_topic(bar_type);
1004
1005 let handler =
1006 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1007 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1008 })));
1009
1010 DataActorCore::subscribe_bars(
1011 self,
1012 topic,
1013 handler,
1014 bar_type,
1015 client_id,
1016 await_partial,
1017 params,
1018 );
1019 }
1020
1021 fn subscribe_mark_prices(
1023 &mut self,
1024 instrument_id: InstrumentId,
1025 client_id: Option<ClientId>,
1026 params: Option<IndexMap<String, String>>,
1027 ) where
1028 Self: 'static + Debug + Sized,
1029 {
1030 let actor_id = self.actor_id().inner();
1031 let topic = get_mark_price_topic(instrument_id);
1032
1033 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1034 move |mark_price: &MarkPriceUpdate| {
1035 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1036 },
1037 )));
1038
1039 DataActorCore::subscribe_mark_prices(
1040 self,
1041 topic,
1042 handler,
1043 instrument_id,
1044 client_id,
1045 params,
1046 );
1047 }
1048
1049 fn subscribe_index_prices(
1051 &mut self,
1052 instrument_id: InstrumentId,
1053 client_id: Option<ClientId>,
1054 params: Option<IndexMap<String, String>>,
1055 ) where
1056 Self: 'static + Debug + Sized,
1057 {
1058 let actor_id = self.actor_id().inner();
1059 let topic = get_index_price_topic(instrument_id);
1060
1061 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1062 move |index_price: &IndexPriceUpdate| {
1063 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1064 },
1065 )));
1066
1067 DataActorCore::subscribe_index_prices(
1068 self,
1069 topic,
1070 handler,
1071 instrument_id,
1072 client_id,
1073 params,
1074 );
1075 }
1076
1077 fn subscribe_funding_rates(
1079 &mut self,
1080 instrument_id: InstrumentId,
1081 client_id: Option<ClientId>,
1082 params: Option<IndexMap<String, String>>,
1083 ) where
1084 Self: 'static + Debug + Sized,
1085 {
1086 let actor_id = self.actor_id().inner();
1087 let topic = get_funding_rate_topic(instrument_id);
1088
1089 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1090 move |funding_rate: &FundingRateUpdate| {
1091 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1092 },
1093 )));
1094
1095 DataActorCore::subscribe_funding_rates(
1096 self,
1097 topic,
1098 handler,
1099 instrument_id,
1100 client_id,
1101 params,
1102 );
1103 }
1104
1105 fn subscribe_instrument_status(
1107 &mut self,
1108 instrument_id: InstrumentId,
1109 client_id: Option<ClientId>,
1110 params: Option<IndexMap<String, String>>,
1111 ) where
1112 Self: 'static + Debug + Sized,
1113 {
1114 let actor_id = self.actor_id().inner();
1115 let topic = get_instrument_status_topic(instrument_id);
1116
1117 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1118 move |status: &InstrumentStatus| {
1119 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1120 },
1121 )));
1122
1123 DataActorCore::subscribe_instrument_status(
1124 self,
1125 topic,
1126 handler,
1127 instrument_id,
1128 client_id,
1129 params,
1130 );
1131 }
1132
1133 fn subscribe_instrument_close(
1135 &mut self,
1136 instrument_id: InstrumentId,
1137 client_id: Option<ClientId>,
1138 params: Option<IndexMap<String, String>>,
1139 ) where
1140 Self: 'static + Debug + Sized,
1141 {
1142 let actor_id = self.actor_id().inner();
1143 let topic = get_instrument_close_topic(instrument_id);
1144
1145 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1146 move |close: &InstrumentClose| {
1147 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1148 },
1149 )));
1150
1151 DataActorCore::subscribe_instrument_close(
1152 self,
1153 topic,
1154 handler,
1155 instrument_id,
1156 client_id,
1157 params,
1158 );
1159 }
1160
1161 #[cfg(feature = "defi")]
1162 fn subscribe_blocks(
1164 &mut self,
1165 chain: Blockchain,
1166 client_id: Option<ClientId>,
1167 params: Option<IndexMap<String, String>>,
1168 ) where
1169 Self: 'static + Debug + Sized,
1170 {
1171 let actor_id = self.actor_id().inner();
1172 let topic = get_defi_blocks_topic(chain);
1173
1174 let handler =
1175 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1176 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1177 })));
1178
1179 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1180 }
1181
1182 #[cfg(feature = "defi")]
1183 fn subscribe_pool(
1185 &mut self,
1186 instrument_id: InstrumentId,
1187 client_id: Option<ClientId>,
1188 params: Option<IndexMap<String, String>>,
1189 ) where
1190 Self: 'static + Debug + Sized,
1191 {
1192 let actor_id = self.actor_id().inner();
1193 let topic = get_defi_pool_topic(instrument_id);
1194
1195 let handler =
1196 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1197 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1198 })));
1199
1200 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1201 }
1202
1203 #[cfg(feature = "defi")]
1204 fn subscribe_pool_swaps(
1206 &mut self,
1207 instrument_id: InstrumentId,
1208 client_id: Option<ClientId>,
1209 params: Option<IndexMap<String, String>>,
1210 ) where
1211 Self: 'static + Debug + Sized,
1212 {
1213 let actor_id = self.actor_id().inner();
1214 let topic = get_defi_pool_swaps_topic(instrument_id);
1215
1216 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1217 move |swap: &PoolSwap| {
1218 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1219 },
1220 )));
1221
1222 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1223 }
1224
1225 #[cfg(feature = "defi")]
1226 fn subscribe_pool_liquidity_updates(
1228 &mut self,
1229 instrument_id: InstrumentId,
1230 client_id: Option<ClientId>,
1231 params: Option<IndexMap<String, String>>,
1232 ) where
1233 Self: 'static + Debug + Sized,
1234 {
1235 let actor_id = self.actor_id().inner();
1236 let topic = get_defi_liquidity_topic(instrument_id);
1237
1238 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1239 move |update: &PoolLiquidityUpdate| {
1240 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1241 },
1242 )));
1243
1244 DataActorCore::subscribe_pool_liquidity_updates(
1245 self,
1246 topic,
1247 handler,
1248 instrument_id,
1249 client_id,
1250 params,
1251 );
1252 }
1253
1254 fn unsubscribe_data(
1256 &mut self,
1257 data_type: DataType,
1258 client_id: Option<ClientId>,
1259 params: Option<IndexMap<String, String>>,
1260 ) where
1261 Self: 'static + Debug + Sized,
1262 {
1263 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1264 }
1265
1266 fn unsubscribe_instruments(
1268 &mut self,
1269 venue: Venue,
1270 client_id: Option<ClientId>,
1271 params: Option<IndexMap<String, String>>,
1272 ) where
1273 Self: 'static + Debug + Sized,
1274 {
1275 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1276 }
1277
1278 fn unsubscribe_instrument(
1280 &mut self,
1281 instrument_id: InstrumentId,
1282 client_id: Option<ClientId>,
1283 params: Option<IndexMap<String, String>>,
1284 ) where
1285 Self: 'static + Debug + Sized,
1286 {
1287 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1288 }
1289
1290 fn unsubscribe_book_deltas(
1292 &mut self,
1293 instrument_id: InstrumentId,
1294 client_id: Option<ClientId>,
1295 params: Option<IndexMap<String, String>>,
1296 ) where
1297 Self: 'static + Debug + Sized,
1298 {
1299 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1300 }
1301
1302 fn unsubscribe_book_at_interval(
1304 &mut self,
1305 instrument_id: InstrumentId,
1306 interval_ms: NonZeroUsize,
1307 client_id: Option<ClientId>,
1308 params: Option<IndexMap<String, String>>,
1309 ) where
1310 Self: 'static + Debug + Sized,
1311 {
1312 DataActorCore::unsubscribe_book_at_interval(
1313 self,
1314 instrument_id,
1315 interval_ms,
1316 client_id,
1317 params,
1318 );
1319 }
1320
1321 fn unsubscribe_quotes(
1323 &mut self,
1324 instrument_id: InstrumentId,
1325 client_id: Option<ClientId>,
1326 params: Option<IndexMap<String, String>>,
1327 ) where
1328 Self: 'static + Debug + Sized,
1329 {
1330 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1331 }
1332
1333 fn unsubscribe_trades(
1335 &mut self,
1336 instrument_id: InstrumentId,
1337 client_id: Option<ClientId>,
1338 params: Option<IndexMap<String, String>>,
1339 ) where
1340 Self: 'static + Debug + Sized,
1341 {
1342 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1343 }
1344
1345 fn unsubscribe_bars(
1347 &mut self,
1348 bar_type: BarType,
1349 client_id: Option<ClientId>,
1350 params: Option<IndexMap<String, String>>,
1351 ) where
1352 Self: 'static + Debug + Sized,
1353 {
1354 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1355 }
1356
1357 fn unsubscribe_mark_prices(
1359 &mut self,
1360 instrument_id: InstrumentId,
1361 client_id: Option<ClientId>,
1362 params: Option<IndexMap<String, String>>,
1363 ) where
1364 Self: 'static + Debug + Sized,
1365 {
1366 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1367 }
1368
1369 fn unsubscribe_index_prices(
1371 &mut self,
1372 instrument_id: InstrumentId,
1373 client_id: Option<ClientId>,
1374 params: Option<IndexMap<String, String>>,
1375 ) where
1376 Self: 'static + Debug + Sized,
1377 {
1378 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1379 }
1380
1381 fn unsubscribe_funding_rates(
1383 &mut self,
1384 instrument_id: InstrumentId,
1385 client_id: Option<ClientId>,
1386 params: Option<IndexMap<String, String>>,
1387 ) where
1388 Self: 'static + Debug + Sized,
1389 {
1390 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1391 }
1392
1393 fn unsubscribe_instrument_status(
1395 &mut self,
1396 instrument_id: InstrumentId,
1397 client_id: Option<ClientId>,
1398 params: Option<IndexMap<String, String>>,
1399 ) where
1400 Self: 'static + Debug + Sized,
1401 {
1402 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1403 }
1404
1405 fn unsubscribe_instrument_close(
1407 &mut self,
1408 instrument_id: InstrumentId,
1409 client_id: Option<ClientId>,
1410 params: Option<IndexMap<String, String>>,
1411 ) where
1412 Self: 'static + Debug + Sized,
1413 {
1414 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1415 }
1416
1417 #[cfg(feature = "defi")]
1418 fn unsubscribe_blocks(
1420 &mut self,
1421 chain: Blockchain,
1422 client_id: Option<ClientId>,
1423 params: Option<IndexMap<String, String>>,
1424 ) where
1425 Self: 'static + Debug + Sized,
1426 {
1427 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1428 }
1429
1430 #[cfg(feature = "defi")]
1431 fn unsubscribe_pool(
1433 &mut self,
1434 instrument_id: InstrumentId,
1435 client_id: Option<ClientId>,
1436 params: Option<IndexMap<String, String>>,
1437 ) where
1438 Self: 'static + Debug + Sized,
1439 {
1440 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1441 }
1442
1443 #[cfg(feature = "defi")]
1444 fn unsubscribe_pool_swaps(
1446 &mut self,
1447 instrument_id: InstrumentId,
1448 client_id: Option<ClientId>,
1449 params: Option<IndexMap<String, String>>,
1450 ) where
1451 Self: 'static + Debug + Sized,
1452 {
1453 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1454 }
1455
1456 #[cfg(feature = "defi")]
1457 fn unsubscribe_pool_liquidity_updates(
1459 &mut self,
1460 instrument_id: InstrumentId,
1461 client_id: Option<ClientId>,
1462 params: Option<IndexMap<String, String>>,
1463 ) where
1464 Self: 'static + Debug + Sized,
1465 {
1466 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1467 }
1468
1469 fn request_data(
1475 &mut self,
1476 data_type: DataType,
1477 client_id: ClientId,
1478 start: Option<DateTime<Utc>>,
1479 end: Option<DateTime<Utc>>,
1480 limit: Option<NonZeroUsize>,
1481 params: Option<IndexMap<String, String>>,
1482 ) -> anyhow::Result<UUID4>
1483 where
1484 Self: 'static + Debug + Sized,
1485 {
1486 let actor_id = self.actor_id().inner();
1487 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1488 move |resp: &CustomDataResponse| {
1489 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1490 },
1491 )));
1492
1493 DataActorCore::request_data(
1494 self, data_type, client_id, start, end, limit, params, handler,
1495 )
1496 }
1497
1498 fn request_instrument(
1504 &mut self,
1505 instrument_id: InstrumentId,
1506 start: Option<DateTime<Utc>>,
1507 end: Option<DateTime<Utc>>,
1508 client_id: Option<ClientId>,
1509 params: Option<IndexMap<String, String>>,
1510 ) -> anyhow::Result<UUID4>
1511 where
1512 Self: 'static + Debug + Sized,
1513 {
1514 let actor_id = self.actor_id().inner();
1515 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1516 move |resp: &InstrumentResponse| {
1517 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1518 },
1519 )));
1520
1521 DataActorCore::request_instrument(
1522 self,
1523 instrument_id,
1524 start,
1525 end,
1526 client_id,
1527 params,
1528 handler,
1529 )
1530 }
1531
1532 fn request_instruments(
1538 &mut self,
1539 venue: Option<Venue>,
1540 start: Option<DateTime<Utc>>,
1541 end: Option<DateTime<Utc>>,
1542 client_id: Option<ClientId>,
1543 params: Option<IndexMap<String, String>>,
1544 ) -> anyhow::Result<UUID4>
1545 where
1546 Self: 'static + Debug + Sized,
1547 {
1548 let actor_id = self.actor_id().inner();
1549 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1550 move |resp: &InstrumentsResponse| {
1551 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1552 },
1553 )));
1554
1555 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1556 }
1557
1558 fn request_book_snapshot(
1564 &mut self,
1565 instrument_id: InstrumentId,
1566 depth: Option<NonZeroUsize>,
1567 client_id: Option<ClientId>,
1568 params: Option<IndexMap<String, String>>,
1569 ) -> anyhow::Result<UUID4>
1570 where
1571 Self: 'static + Debug + Sized,
1572 {
1573 let actor_id = self.actor_id().inner();
1574 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1575 move |resp: &BookResponse| {
1576 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1577 },
1578 )));
1579
1580 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1581 }
1582
1583 fn request_quotes(
1589 &mut self,
1590 instrument_id: InstrumentId,
1591 start: Option<DateTime<Utc>>,
1592 end: Option<DateTime<Utc>>,
1593 limit: Option<NonZeroUsize>,
1594 client_id: Option<ClientId>,
1595 params: Option<IndexMap<String, String>>,
1596 ) -> anyhow::Result<UUID4>
1597 where
1598 Self: 'static + Debug + Sized,
1599 {
1600 let actor_id = self.actor_id().inner();
1601 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1602 move |resp: &QuotesResponse| {
1603 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1604 },
1605 )));
1606
1607 DataActorCore::request_quotes(
1608 self,
1609 instrument_id,
1610 start,
1611 end,
1612 limit,
1613 client_id,
1614 params,
1615 handler,
1616 )
1617 }
1618
1619 fn request_trades(
1625 &mut self,
1626 instrument_id: InstrumentId,
1627 start: Option<DateTime<Utc>>,
1628 end: Option<DateTime<Utc>>,
1629 limit: Option<NonZeroUsize>,
1630 client_id: Option<ClientId>,
1631 params: Option<IndexMap<String, String>>,
1632 ) -> anyhow::Result<UUID4>
1633 where
1634 Self: 'static + Debug + Sized,
1635 {
1636 let actor_id = self.actor_id().inner();
1637 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1638 move |resp: &TradesResponse| {
1639 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1640 },
1641 )));
1642
1643 DataActorCore::request_trades(
1644 self,
1645 instrument_id,
1646 start,
1647 end,
1648 limit,
1649 client_id,
1650 params,
1651 handler,
1652 )
1653 }
1654
1655 fn request_bars(
1661 &mut self,
1662 bar_type: BarType,
1663 start: Option<DateTime<Utc>>,
1664 end: Option<DateTime<Utc>>,
1665 limit: Option<NonZeroUsize>,
1666 client_id: Option<ClientId>,
1667 params: Option<IndexMap<String, String>>,
1668 ) -> anyhow::Result<UUID4>
1669 where
1670 Self: 'static + Debug + Sized,
1671 {
1672 let actor_id = self.actor_id().inner();
1673 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1674 move |resp: &BarsResponse| {
1675 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1676 },
1677 )));
1678
1679 DataActorCore::request_bars(
1680 self, bar_type, start, end, limit, client_id, params, handler,
1681 )
1682 }
1683}
1684
1685impl<T> Actor for T
1687where
1688 T: DataActor + Debug + 'static,
1689{
1690 fn id(&self) -> Ustr {
1691 self.actor_id.inner()
1692 }
1693
1694 #[allow(unused_variables)]
1695 fn handle(&mut self, msg: &dyn Any) {
1696 }
1698
1699 fn as_any(&self) -> &dyn Any {
1700 self
1701 }
1702}
1703
1704impl<T> Component for T
1706where
1707 T: DataActor + Debug + 'static,
1708{
1709 fn component_id(&self) -> ComponentId {
1710 ComponentId::new(self.actor_id.inner().as_str())
1711 }
1712
1713 fn state(&self) -> ComponentState {
1714 self.state
1715 }
1716
1717 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1718 self.state = self.state.transition(&trigger)?;
1719 log::info!("{}", self.state.variant_name());
1720 Ok(())
1721 }
1722
1723 fn register(
1724 &mut self,
1725 trader_id: TraderId,
1726 clock: Rc<RefCell<dyn Clock>>,
1727 cache: Rc<RefCell<Cache>>,
1728 ) -> anyhow::Result<()> {
1729 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1730
1731 let actor_id = self.actor_id().inner();
1733 let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1734 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1735 actor.handle_time_event(&event);
1736 } else {
1737 log::error!("Actor {actor_id} not found for time event handling");
1738 }
1739 }));
1740
1741 clock.borrow_mut().register_default_handler(callback);
1742
1743 self.initialize()
1744 }
1745
1746 fn on_start(&mut self) -> anyhow::Result<()> {
1747 DataActor::on_start(self)
1748 }
1749
1750 fn on_stop(&mut self) -> anyhow::Result<()> {
1751 DataActor::on_stop(self)
1752 }
1753
1754 fn on_resume(&mut self) -> anyhow::Result<()> {
1755 DataActor::on_resume(self)
1756 }
1757
1758 fn on_degrade(&mut self) -> anyhow::Result<()> {
1759 DataActor::on_degrade(self)
1760 }
1761
1762 fn on_fault(&mut self) -> anyhow::Result<()> {
1763 DataActor::on_fault(self)
1764 }
1765
1766 fn on_reset(&mut self) -> anyhow::Result<()> {
1767 DataActor::on_reset(self)
1768 }
1769
1770 fn on_dispose(&mut self) -> anyhow::Result<()> {
1771 DataActor::on_dispose(self)
1772 }
1773}
1774
1775#[allow(dead_code)] pub struct DataActorCore {
1778 pub actor_id: ActorId,
1780 pub config: DataActorConfig,
1782 trader_id: Option<TraderId>,
1783 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
1786 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
1788 signal_classes: AHashMap<String, String>,
1789 #[cfg(feature = "indicators")]
1790 indicators: Indicators,
1791 topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
1792}
1793
1794impl Debug for DataActorCore {
1795 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1796 f.debug_struct(stringify!(DataActorCore))
1797 .field("actor_id", &self.actor_id)
1798 .field("config", &self.config)
1799 .field("state", &self.state)
1800 .field("trader_id", &self.trader_id)
1801 .finish()
1802 }
1803}
1804
1805impl DataActorCore {
1806 fn add_subscription(&mut self, topic: MStr<Topic>, handler: ShareableMessageHandler) {
1810 if self.topic_handlers.contains_key(&topic) {
1811 log::warn!(
1812 "Actor {} attempted duplicate subscription to topic '{topic}'",
1813 self.actor_id,
1814 );
1815 return;
1816 }
1817
1818 self.topic_handlers.insert(topic, handler.clone());
1819 msgbus::subscribe_topic(topic, handler, None);
1820 }
1821
1822 fn remove_subscription(&mut self, topic: MStr<Topic>) {
1826 if let Some(handler) = self.topic_handlers.remove(&topic) {
1827 msgbus::unsubscribe_topic(topic, handler.clone());
1828 } else {
1829 log::warn!(
1830 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
1831 self.actor_id,
1832 );
1833 }
1834 }
1835
1836 pub fn new(config: DataActorConfig) -> Self {
1838 let actor_id = config
1839 .actor_id
1840 .unwrap_or_else(|| Self::default_actor_id(&config));
1841
1842 Self {
1843 actor_id,
1844 config,
1845 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
1849 warning_events: AHashSet::new(),
1850 pending_requests: AHashMap::new(),
1851 signal_classes: AHashMap::new(),
1852 #[cfg(feature = "indicators")]
1853 indicators: Indicators::default(),
1854 topic_handlers: AHashMap::new(),
1855 }
1856 }
1857
1858 #[must_use]
1860 pub fn mem_address(&self) -> String {
1861 format!("{self:p}")
1862 }
1863
1864 pub fn state(&self) -> ComponentState {
1866 self.state
1867 }
1868
1869 pub fn trader_id(&self) -> Option<TraderId> {
1871 self.trader_id
1872 }
1873
1874 pub fn actor_id(&self) -> ActorId {
1876 self.actor_id
1877 }
1878
1879 fn default_actor_id(config: &DataActorConfig) -> ActorId {
1880 let memory_address = std::ptr::from_ref(config) as *const _ as usize;
1881 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
1882 }
1883
1884 pub fn timestamp_ns(&self) -> UnixNanos {
1886 self.clock_ref().timestamp_ns()
1887 }
1888
1889 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
1895 self.clock
1896 .as_ref()
1897 .unwrap_or_else(|| {
1898 panic!(
1899 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
1900 self.actor_id, self.trader_id
1901 )
1902 })
1903 .borrow_mut()
1904 }
1905
1906 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
1912 self.clock
1913 .as_ref()
1914 .expect("DataActor must be registered before accessing clock")
1915 .clone()
1916 }
1917
1918 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
1919 self.clock
1920 .as_ref()
1921 .unwrap_or_else(|| {
1922 panic!(
1923 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
1924 self.actor_id, self.trader_id
1925 )
1926 })
1927 .borrow()
1928 }
1929
1930 pub fn register(
1939 &mut self,
1940 trader_id: TraderId,
1941 clock: Rc<RefCell<dyn Clock>>,
1942 cache: Rc<RefCell<Cache>>,
1943 ) -> anyhow::Result<()> {
1944 if let Some(existing_trader_id) = self.trader_id {
1945 anyhow::bail!(
1946 "DataActor {} already registered with trader {existing_trader_id}",
1947 self.actor_id
1948 );
1949 }
1950
1951 {
1953 let _timestamp = clock.borrow().timestamp_ns();
1954 }
1955
1956 {
1958 let _cache_borrow = cache.borrow();
1959 }
1960
1961 self.trader_id = Some(trader_id);
1962 self.clock = Some(clock);
1963 self.cache = Some(cache);
1964
1965 if !self.is_properly_registered() {
1967 anyhow::bail!(
1968 "DataActor {} registration incomplete - validation failed",
1969 self.actor_id
1970 );
1971 }
1972
1973 log::info!("Registered {} with trader {trader_id}", self.actor_id);
1974 Ok(())
1975 }
1976
1977 pub fn register_warning_event(&mut self, event_type: &str) {
1979 self.warning_events.insert(event_type.to_string());
1980 log::debug!("Registered event type '{event_type}' for warning logs")
1981 }
1982
1983 pub fn deregister_warning_event(&mut self, event_type: &str) {
1985 self.warning_events.remove(event_type);
1986 log::debug!("Deregistered event type '{event_type}' from warning logs")
1987 }
1988
1989 pub fn is_registered(&self) -> bool {
1990 self.trader_id.is_some()
1991 }
1992
1993 fn check_registered(&self) {
1994 assert!(
1995 self.is_registered(),
1996 "Actor has not been registered with a Trader"
1997 );
1998 }
1999
2000 fn is_properly_registered(&self) -> bool {
2002 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2003 }
2004
2005 fn send_data_cmd(&self, command: DataCommand) {
2006 if self.config.log_commands {
2007 log::info!("{CMD}{SEND} {command:?}");
2008 }
2009
2010 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2011 msgbus::send_any(endpoint, command.as_any())
2012 }
2013
2014 #[allow(dead_code)] fn send_data_req(&self, request: RequestCommand) {
2016 if self.config.log_commands {
2017 log::info!("{REQ}{SEND} {request:?}");
2018 }
2019
2020 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2023 msgbus::send_any(endpoint, request.as_any())
2024 }
2025
2026 pub fn shutdown_system(&self, reason: Option<String>) {
2032 self.check_registered();
2033
2034 let command = ShutdownSystem::new(
2036 self.trader_id().unwrap(),
2037 self.actor_id.inner(),
2038 reason,
2039 UUID4::new(),
2040 self.timestamp_ns(),
2041 );
2042
2043 let endpoint = "command.system.shutdown".into();
2044 msgbus::send_any(endpoint, command.as_any());
2045 }
2046
2047 pub fn subscribe_data(
2055 &mut self,
2056 handler: ShareableMessageHandler,
2057 data_type: DataType,
2058 client_id: Option<ClientId>,
2059 params: Option<IndexMap<String, String>>,
2060 ) {
2061 if !self.is_properly_registered() {
2062 panic!(
2063 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2064 self.actor_id,
2065 self.trader_id,
2066 self.clock.is_some(),
2067 self.cache.is_some()
2068 );
2069 }
2070
2071 let topic = get_custom_topic(&data_type);
2072 self.add_subscription(topic, handler);
2073
2074 if client_id.is_none() {
2076 return;
2077 }
2078
2079 let command = SubscribeCommand::Data(SubscribeCustomData {
2080 data_type,
2081 client_id,
2082 venue: None,
2083 command_id: UUID4::new(),
2084 ts_init: self.timestamp_ns(),
2085 params,
2086 });
2087
2088 self.send_data_cmd(DataCommand::Subscribe(command));
2089 }
2090
2091 pub fn subscribe_quotes(
2093 &mut self,
2094 topic: MStr<Topic>,
2095 handler: ShareableMessageHandler,
2096 instrument_id: InstrumentId,
2097 client_id: Option<ClientId>,
2098 params: Option<IndexMap<String, String>>,
2099 ) {
2100 self.check_registered();
2101
2102 self.add_subscription(topic, handler);
2103
2104 let command = SubscribeCommand::Quotes(SubscribeQuotes {
2105 instrument_id,
2106 client_id,
2107 venue: Some(instrument_id.venue),
2108 command_id: UUID4::new(),
2109 ts_init: self.timestamp_ns(),
2110 params,
2111 });
2112
2113 self.send_data_cmd(DataCommand::Subscribe(command));
2114 }
2115
2116 pub fn subscribe_instruments(
2118 &mut self,
2119 topic: MStr<Topic>,
2120 handler: ShareableMessageHandler,
2121 venue: Venue,
2122 client_id: Option<ClientId>,
2123 params: Option<IndexMap<String, String>>,
2124 ) {
2125 self.check_registered();
2126
2127 self.add_subscription(topic, handler);
2128
2129 let command = SubscribeCommand::Instruments(SubscribeInstruments {
2130 client_id,
2131 venue,
2132 command_id: UUID4::new(),
2133 ts_init: self.timestamp_ns(),
2134 params,
2135 });
2136
2137 self.send_data_cmd(DataCommand::Subscribe(command));
2138 }
2139
2140 pub fn subscribe_instrument(
2142 &mut self,
2143 topic: MStr<Topic>,
2144 handler: ShareableMessageHandler,
2145 instrument_id: InstrumentId,
2146 client_id: Option<ClientId>,
2147 params: Option<IndexMap<String, String>>,
2148 ) {
2149 self.check_registered();
2150
2151 self.add_subscription(topic, handler);
2152
2153 let command = SubscribeCommand::Instrument(SubscribeInstrument {
2154 instrument_id,
2155 client_id,
2156 venue: Some(instrument_id.venue),
2157 command_id: UUID4::new(),
2158 ts_init: self.timestamp_ns(),
2159 params,
2160 });
2161
2162 self.send_data_cmd(DataCommand::Subscribe(command));
2163 }
2164
2165 #[allow(clippy::too_many_arguments)]
2167 pub fn subscribe_book_deltas(
2168 &mut self,
2169 topic: MStr<Topic>,
2170 handler: ShareableMessageHandler,
2171 instrument_id: InstrumentId,
2172 book_type: BookType,
2173 depth: Option<NonZeroUsize>,
2174 client_id: Option<ClientId>,
2175 managed: bool,
2176 params: Option<IndexMap<String, String>>,
2177 ) {
2178 self.check_registered();
2179
2180 self.add_subscription(topic, handler);
2181
2182 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2183 instrument_id,
2184 book_type,
2185 client_id,
2186 venue: Some(instrument_id.venue),
2187 command_id: UUID4::new(),
2188 ts_init: self.timestamp_ns(),
2189 depth,
2190 managed,
2191 params,
2192 });
2193
2194 self.send_data_cmd(DataCommand::Subscribe(command));
2195 }
2196
2197 #[allow(clippy::too_many_arguments)]
2199 pub fn subscribe_book_at_interval(
2200 &mut self,
2201 topic: MStr<Topic>,
2202 handler: ShareableMessageHandler,
2203 instrument_id: InstrumentId,
2204 book_type: BookType,
2205 depth: Option<NonZeroUsize>,
2206 interval_ms: NonZeroUsize,
2207 client_id: Option<ClientId>,
2208 params: Option<IndexMap<String, String>>,
2209 ) {
2210 self.check_registered();
2211
2212 self.add_subscription(topic, handler);
2213
2214 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2215 instrument_id,
2216 book_type,
2217 client_id,
2218 venue: Some(instrument_id.venue),
2219 command_id: UUID4::new(),
2220 ts_init: self.timestamp_ns(),
2221 depth,
2222 interval_ms,
2223 params,
2224 });
2225
2226 self.send_data_cmd(DataCommand::Subscribe(command));
2227 }
2228
2229 pub fn subscribe_trades(
2231 &mut self,
2232 topic: MStr<Topic>,
2233 handler: ShareableMessageHandler,
2234 instrument_id: InstrumentId,
2235 client_id: Option<ClientId>,
2236 params: Option<IndexMap<String, String>>,
2237 ) {
2238 self.check_registered();
2239
2240 self.add_subscription(topic, handler);
2241
2242 let command = SubscribeCommand::Trades(SubscribeTrades {
2243 instrument_id,
2244 client_id,
2245 venue: Some(instrument_id.venue),
2246 command_id: UUID4::new(),
2247 ts_init: self.timestamp_ns(),
2248 params,
2249 });
2250
2251 self.send_data_cmd(DataCommand::Subscribe(command));
2252 }
2253
2254 pub fn subscribe_bars(
2256 &mut self,
2257 topic: MStr<Topic>,
2258 handler: ShareableMessageHandler,
2259 bar_type: BarType,
2260 client_id: Option<ClientId>,
2261 await_partial: bool,
2262 params: Option<IndexMap<String, String>>,
2263 ) {
2264 self.check_registered();
2265
2266 self.add_subscription(topic, handler);
2267
2268 let command = SubscribeCommand::Bars(SubscribeBars {
2269 bar_type,
2270 client_id,
2271 venue: Some(bar_type.instrument_id().venue),
2272 command_id: UUID4::new(),
2273 ts_init: self.timestamp_ns(),
2274 await_partial,
2275 params,
2276 });
2277
2278 self.send_data_cmd(DataCommand::Subscribe(command));
2279 }
2280
2281 pub fn subscribe_mark_prices(
2283 &mut self,
2284 topic: MStr<Topic>,
2285 handler: ShareableMessageHandler,
2286 instrument_id: InstrumentId,
2287 client_id: Option<ClientId>,
2288 params: Option<IndexMap<String, String>>,
2289 ) {
2290 self.check_registered();
2291
2292 self.add_subscription(topic, handler);
2293
2294 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2295 instrument_id,
2296 client_id,
2297 venue: Some(instrument_id.venue),
2298 command_id: UUID4::new(),
2299 ts_init: self.timestamp_ns(),
2300 params,
2301 });
2302
2303 self.send_data_cmd(DataCommand::Subscribe(command));
2304 }
2305
2306 pub fn subscribe_index_prices(
2308 &mut self,
2309 topic: MStr<Topic>,
2310 handler: ShareableMessageHandler,
2311 instrument_id: InstrumentId,
2312 client_id: Option<ClientId>,
2313 params: Option<IndexMap<String, String>>,
2314 ) {
2315 self.check_registered();
2316
2317 self.add_subscription(topic, handler);
2318
2319 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2320 instrument_id,
2321 client_id,
2322 venue: Some(instrument_id.venue),
2323 command_id: UUID4::new(),
2324 ts_init: self.timestamp_ns(),
2325 params,
2326 });
2327
2328 self.send_data_cmd(DataCommand::Subscribe(command));
2329 }
2330
2331 pub fn subscribe_funding_rates(
2333 &mut self,
2334 topic: MStr<Topic>,
2335 handler: ShareableMessageHandler,
2336 instrument_id: InstrumentId,
2337 client_id: Option<ClientId>,
2338 params: Option<IndexMap<String, String>>,
2339 ) {
2340 self.check_registered();
2341
2342 self.add_subscription(topic, handler);
2343
2344 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
2345 instrument_id,
2346 client_id,
2347 venue: Some(instrument_id.venue),
2348 command_id: UUID4::new(),
2349 ts_init: self.timestamp_ns(),
2350 params,
2351 });
2352
2353 self.send_data_cmd(DataCommand::Subscribe(command));
2354 }
2355
2356 pub fn subscribe_instrument_status(
2358 &mut self,
2359 topic: MStr<Topic>,
2360 handler: ShareableMessageHandler,
2361 instrument_id: InstrumentId,
2362 client_id: Option<ClientId>,
2363 params: Option<IndexMap<String, String>>,
2364 ) {
2365 self.check_registered();
2366
2367 self.add_subscription(topic, handler);
2368
2369 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2370 instrument_id,
2371 client_id,
2372 venue: Some(instrument_id.venue),
2373 command_id: UUID4::new(),
2374 ts_init: self.timestamp_ns(),
2375 params,
2376 });
2377
2378 self.send_data_cmd(DataCommand::Subscribe(command));
2379 }
2380
2381 pub fn subscribe_instrument_close(
2383 &mut self,
2384 topic: MStr<Topic>,
2385 handler: ShareableMessageHandler,
2386 instrument_id: InstrumentId,
2387 client_id: Option<ClientId>,
2388 params: Option<IndexMap<String, String>>,
2389 ) {
2390 self.check_registered();
2391
2392 self.add_subscription(topic, handler);
2393
2394 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2395 instrument_id,
2396 client_id,
2397 venue: Some(instrument_id.venue),
2398 command_id: UUID4::new(),
2399 ts_init: self.timestamp_ns(),
2400 params,
2401 });
2402
2403 self.send_data_cmd(DataCommand::Subscribe(command));
2404 }
2405
2406 #[cfg(feature = "defi")]
2407 pub fn subscribe_blocks(
2409 &mut self,
2410 topic: MStr<Topic>,
2411 handler: ShareableMessageHandler,
2412 chain: Blockchain,
2413 client_id: Option<ClientId>,
2414 params: Option<IndexMap<String, String>>,
2415 ) {
2416 use crate::messages::defi::{DefiSubscribeCommand, SubscribeBlocks};
2417
2418 self.check_registered();
2419
2420 self.add_subscription(topic, handler);
2421
2422 let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
2423 chain,
2424 client_id,
2425 command_id: UUID4::new(),
2426 ts_init: self.timestamp_ns(),
2427 params,
2428 });
2429
2430 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2431 }
2432
2433 #[cfg(feature = "defi")]
2434 pub fn subscribe_pool(
2436 &mut self,
2437 topic: MStr<Topic>,
2438 handler: ShareableMessageHandler,
2439 instrument_id: InstrumentId,
2440 client_id: Option<ClientId>,
2441 params: Option<IndexMap<String, String>>,
2442 ) {
2443 use crate::messages::defi::{DefiSubscribeCommand, SubscribePool};
2444
2445 self.check_registered();
2446
2447 self.add_subscription(topic, handler);
2448
2449 let command = DefiSubscribeCommand::Pool(SubscribePool {
2450 instrument_id,
2451 client_id,
2452 command_id: UUID4::new(),
2453 ts_init: self.timestamp_ns(),
2454 params,
2455 });
2456
2457 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2458 }
2459
2460 #[cfg(feature = "defi")]
2461 pub fn subscribe_pool_swaps(
2463 &mut self,
2464 topic: MStr<Topic>,
2465 handler: ShareableMessageHandler,
2466 instrument_id: InstrumentId,
2467 client_id: Option<ClientId>,
2468 params: Option<IndexMap<String, String>>,
2469 ) {
2470 use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolSwaps};
2471
2472 self.check_registered();
2473
2474 self.add_subscription(topic, handler);
2475
2476 let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
2477 instrument_id,
2478 client_id,
2479 command_id: UUID4::new(),
2480 ts_init: self.timestamp_ns(),
2481 params,
2482 });
2483
2484 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2485 }
2486
2487 #[cfg(feature = "defi")]
2488 pub fn subscribe_pool_liquidity_updates(
2490 &mut self,
2491 topic: MStr<Topic>,
2492 handler: ShareableMessageHandler,
2493 instrument_id: InstrumentId,
2494 client_id: Option<ClientId>,
2495 params: Option<IndexMap<String, String>>,
2496 ) {
2497 use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolLiquidityUpdates};
2498
2499 self.check_registered();
2500
2501 self.add_subscription(topic, handler);
2502
2503 let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
2504 instrument_id,
2505 client_id,
2506 command_id: UUID4::new(),
2507 ts_init: self.timestamp_ns(),
2508 params,
2509 });
2510
2511 self.send_data_cmd(DataCommand::DefiSubscribe(command));
2512 }
2513
2514 pub fn unsubscribe_data(
2516 &mut self,
2517 data_type: DataType,
2518 client_id: Option<ClientId>,
2519 params: Option<IndexMap<String, String>>,
2520 ) {
2521 self.check_registered();
2522
2523 let topic = get_custom_topic(&data_type);
2524 self.remove_subscription(topic);
2525
2526 if client_id.is_none() {
2527 return;
2528 }
2529
2530 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2531 data_type,
2532 client_id,
2533 venue: None,
2534 command_id: UUID4::new(),
2535 ts_init: self.timestamp_ns(),
2536 params,
2537 });
2538
2539 self.send_data_cmd(DataCommand::Unsubscribe(command));
2540 }
2541
2542 pub fn unsubscribe_instruments(
2544 &mut self,
2545 venue: Venue,
2546 client_id: Option<ClientId>,
2547 params: Option<IndexMap<String, String>>,
2548 ) {
2549 self.check_registered();
2550
2551 let topic = get_instruments_topic(venue);
2552 self.remove_subscription(topic);
2553
2554 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2555 client_id,
2556 venue,
2557 command_id: UUID4::new(),
2558 ts_init: self.timestamp_ns(),
2559 params,
2560 });
2561
2562 self.send_data_cmd(DataCommand::Unsubscribe(command));
2563 }
2564
2565 pub fn unsubscribe_instrument(
2567 &mut self,
2568 instrument_id: InstrumentId,
2569 client_id: Option<ClientId>,
2570 params: Option<IndexMap<String, String>>,
2571 ) {
2572 self.check_registered();
2573
2574 let topic = get_instrument_topic(instrument_id);
2575 self.remove_subscription(topic);
2576
2577 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2578 instrument_id,
2579 client_id,
2580 venue: Some(instrument_id.venue),
2581 command_id: UUID4::new(),
2582 ts_init: self.timestamp_ns(),
2583 params,
2584 });
2585
2586 self.send_data_cmd(DataCommand::Unsubscribe(command));
2587 }
2588
2589 pub fn unsubscribe_book_deltas(
2591 &mut self,
2592 instrument_id: InstrumentId,
2593 client_id: Option<ClientId>,
2594 params: Option<IndexMap<String, String>>,
2595 ) {
2596 self.check_registered();
2597
2598 let topic = get_book_deltas_topic(instrument_id);
2599 self.remove_subscription(topic);
2600
2601 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2602 instrument_id,
2603 client_id,
2604 venue: Some(instrument_id.venue),
2605 command_id: UUID4::new(),
2606 ts_init: self.timestamp_ns(),
2607 params,
2608 });
2609
2610 self.send_data_cmd(DataCommand::Unsubscribe(command));
2611 }
2612
2613 pub fn unsubscribe_book_at_interval(
2615 &mut self,
2616 instrument_id: InstrumentId,
2617 interval_ms: NonZeroUsize,
2618 client_id: Option<ClientId>,
2619 params: Option<IndexMap<String, String>>,
2620 ) {
2621 self.check_registered();
2622
2623 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
2624 self.remove_subscription(topic);
2625
2626 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2627 instrument_id,
2628 client_id,
2629 venue: Some(instrument_id.venue),
2630 command_id: UUID4::new(),
2631 ts_init: self.timestamp_ns(),
2632 params,
2633 });
2634
2635 self.send_data_cmd(DataCommand::Unsubscribe(command));
2636 }
2637
2638 pub fn unsubscribe_quotes(
2640 &mut self,
2641 instrument_id: InstrumentId,
2642 client_id: Option<ClientId>,
2643 params: Option<IndexMap<String, String>>,
2644 ) {
2645 self.check_registered();
2646
2647 let topic = get_quotes_topic(instrument_id);
2648 self.remove_subscription(topic);
2649
2650 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2651 instrument_id,
2652 client_id,
2653 venue: Some(instrument_id.venue),
2654 command_id: UUID4::new(),
2655 ts_init: self.timestamp_ns(),
2656 params,
2657 });
2658
2659 self.send_data_cmd(DataCommand::Unsubscribe(command));
2660 }
2661
2662 pub fn unsubscribe_trades(
2664 &mut self,
2665 instrument_id: InstrumentId,
2666 client_id: Option<ClientId>,
2667 params: Option<IndexMap<String, String>>,
2668 ) {
2669 self.check_registered();
2670
2671 let topic = get_trades_topic(instrument_id);
2672 self.remove_subscription(topic);
2673
2674 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2675 instrument_id,
2676 client_id,
2677 venue: Some(instrument_id.venue),
2678 command_id: UUID4::new(),
2679 ts_init: self.timestamp_ns(),
2680 params,
2681 });
2682
2683 self.send_data_cmd(DataCommand::Unsubscribe(command));
2684 }
2685
2686 pub fn unsubscribe_bars(
2688 &mut self,
2689 bar_type: BarType,
2690 client_id: Option<ClientId>,
2691 params: Option<IndexMap<String, String>>,
2692 ) {
2693 self.check_registered();
2694
2695 let topic = get_bars_topic(bar_type);
2696 self.remove_subscription(topic);
2697
2698 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2699 bar_type,
2700 client_id,
2701 venue: Some(bar_type.instrument_id().venue),
2702 command_id: UUID4::new(),
2703 ts_init: self.timestamp_ns(),
2704 params,
2705 });
2706
2707 self.send_data_cmd(DataCommand::Unsubscribe(command));
2708 }
2709
2710 pub fn unsubscribe_mark_prices(
2712 &mut self,
2713 instrument_id: InstrumentId,
2714 client_id: Option<ClientId>,
2715 params: Option<IndexMap<String, String>>,
2716 ) {
2717 self.check_registered();
2718
2719 let topic = get_mark_price_topic(instrument_id);
2720 self.remove_subscription(topic);
2721
2722 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2723 instrument_id,
2724 client_id,
2725 venue: Some(instrument_id.venue),
2726 command_id: UUID4::new(),
2727 ts_init: self.timestamp_ns(),
2728 params,
2729 });
2730
2731 self.send_data_cmd(DataCommand::Unsubscribe(command));
2732 }
2733
2734 pub fn unsubscribe_index_prices(
2736 &mut self,
2737 instrument_id: InstrumentId,
2738 client_id: Option<ClientId>,
2739 params: Option<IndexMap<String, String>>,
2740 ) {
2741 self.check_registered();
2742
2743 let topic = get_index_price_topic(instrument_id);
2744 self.remove_subscription(topic);
2745
2746 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2747 instrument_id,
2748 client_id,
2749 venue: Some(instrument_id.venue),
2750 command_id: UUID4::new(),
2751 ts_init: self.timestamp_ns(),
2752 params,
2753 });
2754
2755 self.send_data_cmd(DataCommand::Unsubscribe(command));
2756 }
2757
2758 pub fn unsubscribe_funding_rates(
2760 &mut self,
2761 instrument_id: InstrumentId,
2762 client_id: Option<ClientId>,
2763 params: Option<IndexMap<String, String>>,
2764 ) {
2765 self.check_registered();
2766
2767 let topic = get_funding_rate_topic(instrument_id);
2768 self.remove_subscription(topic);
2769
2770 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
2771 instrument_id,
2772 client_id,
2773 venue: Some(instrument_id.venue),
2774 command_id: UUID4::new(),
2775 ts_init: self.timestamp_ns(),
2776 params,
2777 });
2778
2779 self.send_data_cmd(DataCommand::Unsubscribe(command));
2780 }
2781
2782 pub fn unsubscribe_instrument_status(
2784 &mut self,
2785 instrument_id: InstrumentId,
2786 client_id: Option<ClientId>,
2787 params: Option<IndexMap<String, String>>,
2788 ) {
2789 self.check_registered();
2790
2791 let topic = get_instrument_status_topic(instrument_id);
2792 self.remove_subscription(topic);
2793
2794 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2795 instrument_id,
2796 client_id,
2797 venue: Some(instrument_id.venue),
2798 command_id: UUID4::new(),
2799 ts_init: self.timestamp_ns(),
2800 params,
2801 });
2802
2803 self.send_data_cmd(DataCommand::Unsubscribe(command));
2804 }
2805
2806 pub fn unsubscribe_instrument_close(
2808 &mut self,
2809 instrument_id: InstrumentId,
2810 client_id: Option<ClientId>,
2811 params: Option<IndexMap<String, String>>,
2812 ) {
2813 self.check_registered();
2814
2815 let topic = get_instrument_close_topic(instrument_id);
2816 self.remove_subscription(topic);
2817
2818 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
2819 instrument_id,
2820 client_id,
2821 venue: Some(instrument_id.venue),
2822 command_id: UUID4::new(),
2823 ts_init: self.timestamp_ns(),
2824 params,
2825 });
2826
2827 self.send_data_cmd(DataCommand::Unsubscribe(command));
2828 }
2829
2830 #[cfg(feature = "defi")]
2831 pub fn unsubscribe_blocks(
2833 &mut self,
2834 chain: Blockchain,
2835 client_id: Option<ClientId>,
2836 params: Option<IndexMap<String, String>>,
2837 ) {
2838 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribeBlocks};
2839
2840 self.check_registered();
2841
2842 let topic = get_defi_blocks_topic(chain);
2843 self.remove_subscription(topic);
2844
2845 let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
2846 chain,
2847 client_id,
2848 command_id: UUID4::new(),
2849 ts_init: self.timestamp_ns(),
2850 params,
2851 });
2852
2853 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2854 }
2855
2856 #[cfg(feature = "defi")]
2857 pub fn unsubscribe_pool(
2859 &mut self,
2860 instrument_id: InstrumentId,
2861 client_id: Option<ClientId>,
2862 params: Option<IndexMap<String, String>>,
2863 ) {
2864 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePool};
2865
2866 self.check_registered();
2867
2868 let topic = get_defi_pool_topic(instrument_id);
2869 self.remove_subscription(topic);
2870
2871 let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
2872 instrument_id,
2873 client_id,
2874 command_id: UUID4::new(),
2875 ts_init: self.timestamp_ns(),
2876 params,
2877 });
2878
2879 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2880 }
2881
2882 #[cfg(feature = "defi")]
2883 pub fn unsubscribe_pool_swaps(
2885 &mut self,
2886 instrument_id: InstrumentId,
2887 client_id: Option<ClientId>,
2888 params: Option<IndexMap<String, String>>,
2889 ) {
2890 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolSwaps};
2891
2892 self.check_registered();
2893
2894 let topic = get_defi_pool_swaps_topic(instrument_id);
2895 self.remove_subscription(topic);
2896
2897 let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
2898 instrument_id,
2899 client_id,
2900 command_id: UUID4::new(),
2901 ts_init: self.timestamp_ns(),
2902 params,
2903 });
2904
2905 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2906 }
2907
2908 #[cfg(feature = "defi")]
2909 pub fn unsubscribe_pool_liquidity_updates(
2911 &mut self,
2912 instrument_id: InstrumentId,
2913 client_id: Option<ClientId>,
2914 params: Option<IndexMap<String, String>>,
2915 ) {
2916 use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolLiquidityUpdates};
2917
2918 self.check_registered();
2919
2920 let topic = get_defi_liquidity_topic(instrument_id);
2921 self.remove_subscription(topic);
2922
2923 let command =
2924 DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
2925 instrument_id,
2926 client_id,
2927 command_id: UUID4::new(),
2928 ts_init: self.timestamp_ns(),
2929 params,
2930 });
2931
2932 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2933 }
2934
2935 #[allow(clippy::too_many_arguments)]
2941 pub fn request_data(
2942 &self,
2943 data_type: DataType,
2944 client_id: ClientId,
2945 start: Option<DateTime<Utc>>,
2946 end: Option<DateTime<Utc>>,
2947 limit: Option<NonZeroUsize>,
2948 params: Option<IndexMap<String, String>>,
2949 handler: ShareableMessageHandler,
2950 ) -> anyhow::Result<UUID4> {
2951 self.check_registered();
2952
2953 let now = self.clock_ref().utc_now();
2954 check_timestamps(now, start, end)?;
2955
2956 let request_id = UUID4::new();
2957 let command = RequestCommand::Data(RequestCustomData {
2958 client_id,
2959 data_type,
2960 start,
2961 end,
2962 limit,
2963 request_id,
2964 ts_init: self.timestamp_ns(),
2965 params,
2966 });
2967
2968 get_message_bus()
2969 .borrow_mut()
2970 .register_response_handler(command.request_id(), handler)?;
2971
2972 self.send_data_cmd(DataCommand::Request(command));
2973
2974 Ok(request_id)
2975 }
2976
2977 pub fn request_instrument(
2983 &self,
2984 instrument_id: InstrumentId,
2985 start: Option<DateTime<Utc>>,
2986 end: Option<DateTime<Utc>>,
2987 client_id: Option<ClientId>,
2988 params: Option<IndexMap<String, String>>,
2989 handler: ShareableMessageHandler,
2990 ) -> anyhow::Result<UUID4> {
2991 self.check_registered();
2992
2993 let now = self.clock_ref().utc_now();
2994 check_timestamps(now, start, end)?;
2995
2996 let request_id = UUID4::new();
2997 let command = RequestCommand::Instrument(RequestInstrument {
2998 instrument_id,
2999 start,
3000 end,
3001 client_id,
3002 request_id,
3003 ts_init: now.into(),
3004 params,
3005 });
3006
3007 get_message_bus()
3008 .borrow_mut()
3009 .register_response_handler(command.request_id(), handler)?;
3010
3011 self.send_data_cmd(DataCommand::Request(command));
3012
3013 Ok(request_id)
3014 }
3015
3016 pub fn request_instruments(
3022 &self,
3023 venue: Option<Venue>,
3024 start: Option<DateTime<Utc>>,
3025 end: Option<DateTime<Utc>>,
3026 client_id: Option<ClientId>,
3027 params: Option<IndexMap<String, String>>,
3028 handler: ShareableMessageHandler,
3029 ) -> anyhow::Result<UUID4> {
3030 self.check_registered();
3031
3032 let now = self.clock_ref().utc_now();
3033 check_timestamps(now, start, end)?;
3034
3035 let request_id = UUID4::new();
3036 let command = RequestCommand::Instruments(RequestInstruments {
3037 venue,
3038 start,
3039 end,
3040 client_id,
3041 request_id,
3042 ts_init: now.into(),
3043 params,
3044 });
3045
3046 get_message_bus()
3047 .borrow_mut()
3048 .register_response_handler(command.request_id(), handler)?;
3049
3050 self.send_data_cmd(DataCommand::Request(command));
3051
3052 Ok(request_id)
3053 }
3054
3055 pub fn request_book_snapshot(
3061 &self,
3062 instrument_id: InstrumentId,
3063 depth: Option<NonZeroUsize>,
3064 client_id: Option<ClientId>,
3065 params: Option<IndexMap<String, String>>,
3066 handler: ShareableMessageHandler,
3067 ) -> anyhow::Result<UUID4> {
3068 self.check_registered();
3069
3070 let request_id = UUID4::new();
3071 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3072 instrument_id,
3073 depth,
3074 client_id,
3075 request_id,
3076 ts_init: self.timestamp_ns(),
3077 params,
3078 });
3079
3080 get_message_bus()
3081 .borrow_mut()
3082 .register_response_handler(command.request_id(), handler)?;
3083
3084 self.send_data_cmd(DataCommand::Request(command));
3085
3086 Ok(request_id)
3087 }
3088
3089 #[allow(clippy::too_many_arguments)]
3095 pub fn request_quotes(
3096 &self,
3097 instrument_id: InstrumentId,
3098 start: Option<DateTime<Utc>>,
3099 end: Option<DateTime<Utc>>,
3100 limit: Option<NonZeroUsize>,
3101 client_id: Option<ClientId>,
3102 params: Option<IndexMap<String, String>>,
3103 handler: ShareableMessageHandler,
3104 ) -> anyhow::Result<UUID4> {
3105 self.check_registered();
3106
3107 let now = self.clock_ref().utc_now();
3108 check_timestamps(now, start, end)?;
3109
3110 let request_id = UUID4::new();
3111 let command = RequestCommand::Quotes(RequestQuotes {
3112 instrument_id,
3113 start,
3114 end,
3115 limit,
3116 client_id,
3117 request_id,
3118 ts_init: now.into(),
3119 params,
3120 });
3121
3122 get_message_bus()
3123 .borrow_mut()
3124 .register_response_handler(command.request_id(), handler)?;
3125
3126 self.send_data_cmd(DataCommand::Request(command));
3127
3128 Ok(request_id)
3129 }
3130
3131 #[allow(clippy::too_many_arguments)]
3137 pub fn request_trades(
3138 &self,
3139 instrument_id: InstrumentId,
3140 start: Option<DateTime<Utc>>,
3141 end: Option<DateTime<Utc>>,
3142 limit: Option<NonZeroUsize>,
3143 client_id: Option<ClientId>,
3144 params: Option<IndexMap<String, String>>,
3145 handler: ShareableMessageHandler,
3146 ) -> anyhow::Result<UUID4> {
3147 self.check_registered();
3148
3149 let now = self.clock_ref().utc_now();
3150 check_timestamps(now, start, end)?;
3151
3152 let request_id = UUID4::new();
3153 let command = RequestCommand::Trades(RequestTrades {
3154 instrument_id,
3155 start,
3156 end,
3157 limit,
3158 client_id,
3159 request_id,
3160 ts_init: now.into(),
3161 params,
3162 });
3163
3164 get_message_bus()
3165 .borrow_mut()
3166 .register_response_handler(command.request_id(), handler)?;
3167
3168 self.send_data_cmd(DataCommand::Request(command));
3169
3170 Ok(request_id)
3171 }
3172
3173 #[allow(clippy::too_many_arguments)]
3179 pub fn request_bars(
3180 &self,
3181 bar_type: BarType,
3182 start: Option<DateTime<Utc>>,
3183 end: Option<DateTime<Utc>>,
3184 limit: Option<NonZeroUsize>,
3185 client_id: Option<ClientId>,
3186 params: Option<IndexMap<String, String>>,
3187 handler: ShareableMessageHandler,
3188 ) -> anyhow::Result<UUID4> {
3189 self.check_registered();
3190
3191 let now = self.clock_ref().utc_now();
3192 check_timestamps(now, start, end)?;
3193
3194 let request_id = UUID4::new();
3195 let command = RequestCommand::Bars(RequestBars {
3196 bar_type,
3197 start,
3198 end,
3199 limit,
3200 client_id,
3201 request_id,
3202 ts_init: now.into(),
3203 params,
3204 });
3205
3206 get_message_bus()
3207 .borrow_mut()
3208 .register_response_handler(command.request_id(), handler)?;
3209
3210 self.send_data_cmd(DataCommand::Request(command));
3211
3212 Ok(request_id)
3213 }
3214}
3215
3216fn check_timestamps(
3217 now: DateTime<Utc>,
3218 start: Option<DateTime<Utc>>,
3219 end: Option<DateTime<Utc>>,
3220) -> anyhow::Result<()> {
3221 if let Some(start) = start {
3222 check_predicate_true(start <= now, "start was > now")?
3223 }
3224 if let Some(end) = end {
3225 check_predicate_true(end <= now, "end was > now")?
3226 }
3227
3228 if let (Some(start), Some(end)) = (start, end) {
3229 check_predicate_true(start < end, "start was >= end")?
3230 }
3231
3232 Ok(())
3233}
3234
3235fn log_error(e: &anyhow::Error) {
3236 log::error!("{e}");
3237}
3238
3239fn log_not_running<T>(msg: &T)
3240where
3241 T: Debug,
3242{
3243 log::warn!("Received message when not running - skipping {msg:?}");
3245}
3246
3247fn log_received<T>(msg: &T)
3248where
3249 T: Debug,
3250{
3251 log::debug!("{RECV} {msg:?}");
3252}