1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24 sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39 },
40 enums::BookType,
41 events::order::{canceled::OrderCanceled, filled::OrderFilled},
42 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
43 instruments::InstrumentAny,
44 orderbook::OrderBook,
45};
46use ustr::Ustr;
47
48#[cfg(feature = "indicators")]
49use super::indicators::Indicators;
50use super::{
51 Actor,
52 registry::{get_actor_unchecked, try_get_actor_unchecked},
53};
54#[cfg(feature = "defi")]
55use crate::defi;
56#[cfg(feature = "defi")]
57#[allow(unused_imports)]
58use crate::defi::data_actor as _; use crate::{
60 cache::Cache,
61 clock::Clock,
62 component::Component,
63 enums::{ComponentState, ComponentTrigger},
64 logging::{CMD, RECV, REQ, SEND},
65 messages::{
66 data::{
67 BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
68 InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
69 RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
70 SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
71 SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
72 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
73 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
74 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
75 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
76 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
77 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
78 },
79 system::ShutdownSystem,
80 },
81 msgbus::{
82 self, MStr, Topic, get_message_bus,
83 handler::{ShareableMessageHandler, TypedMessageHandler},
84 switchboard::{
85 MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
86 get_custom_topic, get_funding_rate_topic, get_index_price_topic,
87 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
88 get_instruments_topic, get_mark_price_topic, get_order_cancels_topic,
89 get_order_fills_topic, get_quotes_topic, get_trades_topic,
90 },
91 },
92 signal::Signal,
93 timer::{TimeEvent, TimeEventCallback},
94};
95
96#[derive(Debug, Clone)]
98#[cfg_attr(
99 feature = "python",
100 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", subclass)
101)]
102pub struct DataActorConfig {
103 pub actor_id: Option<ActorId>,
105 pub log_events: bool,
107 pub log_commands: bool,
109}
110
111impl Default for DataActorConfig {
112 fn default() -> Self {
113 Self {
114 actor_id: None,
115 log_events: true,
116 log_commands: true,
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
123#[cfg_attr(
124 feature = "python",
125 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
126)]
127pub struct ImportableActorConfig {
128 pub actor_path: String,
130 pub config_path: String,
132 pub config: HashMap<String, serde_json::Value>,
134}
135
136type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
137
138pub trait DataActor:
139 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
140{
141 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
147 Ok(IndexMap::new())
148 }
149
150 #[allow(unused_variables)]
156 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
157 Ok(())
158 }
159
160 fn on_start(&mut self) -> anyhow::Result<()> {
166 log::warn!(
167 "The `on_start` handler was called when not overridden, \
168 it's expected that any actions required when starting the actor \
169 occur here, such as subscribing/requesting data"
170 );
171 Ok(())
172 }
173
174 fn on_stop(&mut self) -> anyhow::Result<()> {
180 log::warn!(
181 "The `on_stop` handler was called when not overridden, \
182 it's expected that any actions required when stopping the actor \
183 occur here, such as unsubscribing from data",
184 );
185 Ok(())
186 }
187
188 fn on_resume(&mut self) -> anyhow::Result<()> {
194 log::warn!(
195 "The `on_resume` handler was called when not overridden, \
196 it's expected that any actions required when resuming the actor \
197 following a stop occur here"
198 );
199 Ok(())
200 }
201
202 fn on_reset(&mut self) -> anyhow::Result<()> {
208 log::warn!(
209 "The `on_reset` handler was called when not overridden, \
210 it's expected that any actions required when resetting the actor \
211 occur here, such as resetting indicators and other state"
212 );
213 Ok(())
214 }
215
216 fn on_dispose(&mut self) -> anyhow::Result<()> {
222 Ok(())
223 }
224
225 fn on_degrade(&mut self) -> anyhow::Result<()> {
231 Ok(())
232 }
233
234 fn on_fault(&mut self) -> anyhow::Result<()> {
240 Ok(())
241 }
242
243 #[allow(unused_variables)]
249 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
250 Ok(())
251 }
252
253 #[allow(unused_variables)]
259 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
260 Ok(())
261 }
262
263 #[allow(unused_variables)]
269 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
270 Ok(())
271 }
272
273 #[allow(unused_variables)]
279 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
280 Ok(())
281 }
282
283 #[allow(unused_variables)]
289 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
290 Ok(())
291 }
292
293 #[allow(unused_variables)]
299 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 #[allow(unused_variables)]
309 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
310 Ok(())
311 }
312
313 #[allow(unused_variables)]
319 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
320 Ok(())
321 }
322
323 #[allow(unused_variables)]
329 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
330 Ok(())
331 }
332
333 #[allow(unused_variables)]
339 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
340 Ok(())
341 }
342
343 #[allow(unused_variables)]
349 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
350 Ok(())
351 }
352
353 #[allow(unused_variables)]
359 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
360 Ok(())
361 }
362
363 #[allow(unused_variables)]
369 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
370 Ok(())
371 }
372
373 #[allow(unused_variables)]
379 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
380 Ok(())
381 }
382
383 #[allow(unused_variables)]
389 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
390 Ok(())
391 }
392
393 #[allow(unused_variables)]
399 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
400 Ok(())
401 }
402
403 #[cfg(feature = "defi")]
404 #[allow(unused_variables)]
410 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
411 Ok(())
412 }
413
414 #[cfg(feature = "defi")]
415 #[allow(unused_variables)]
421 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
422 Ok(())
423 }
424
425 #[cfg(feature = "defi")]
426 #[allow(unused_variables)]
432 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
433 Ok(())
434 }
435
436 #[cfg(feature = "defi")]
437 #[allow(unused_variables)]
443 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
444 Ok(())
445 }
446
447 #[cfg(feature = "defi")]
448 #[allow(unused_variables)]
454 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
455 Ok(())
456 }
457
458 #[cfg(feature = "defi")]
459 #[allow(unused_variables)]
465 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
466 Ok(())
467 }
468
469 #[allow(unused_variables)]
475 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
476 Ok(())
477 }
478
479 #[allow(unused_variables)]
485 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
486 Ok(())
487 }
488
489 #[allow(unused_variables)]
495 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
496 Ok(())
497 }
498
499 #[allow(unused_variables)]
505 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
506 Ok(())
507 }
508
509 #[allow(unused_variables)]
515 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
516 Ok(())
517 }
518
519 #[allow(unused_variables)]
525 fn on_historical_index_prices(
526 &mut self,
527 index_prices: &[IndexPriceUpdate],
528 ) -> anyhow::Result<()> {
529 Ok(())
530 }
531
532 fn handle_time_event(&mut self, event: &TimeEvent) {
534 log_received(&event);
535
536 if let Err(e) = DataActor::on_time_event(self, event) {
537 log_error(&e);
538 }
539 }
540
541 fn handle_data(&mut self, data: &dyn Any) {
543 log_received(&data);
544
545 if self.not_running() {
546 log_not_running(&data);
547 return;
548 }
549
550 if let Err(e) = self.on_data(data) {
551 log_error(&e);
552 }
553 }
554
555 fn handle_signal(&mut self, signal: &Signal) {
557 log_received(&signal);
558
559 if self.not_running() {
560 log_not_running(&signal);
561 return;
562 }
563
564 if let Err(e) = self.on_signal(signal) {
565 log_error(&e);
566 }
567 }
568
569 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
571 log_received(&instrument);
572
573 if self.not_running() {
574 log_not_running(&instrument);
575 return;
576 }
577
578 if let Err(e) = self.on_instrument(instrument) {
579 log_error(&e);
580 }
581 }
582
583 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
585 log_received(&deltas);
586
587 if self.not_running() {
588 log_not_running(&deltas);
589 return;
590 }
591
592 if let Err(e) = self.on_book_deltas(deltas) {
593 log_error(&e);
594 }
595 }
596
597 fn handle_book(&mut self, book: &OrderBook) {
599 log_received(&book);
600
601 if self.not_running() {
602 log_not_running(&book);
603 return;
604 }
605
606 if let Err(e) = self.on_book(book) {
607 log_error(&e);
608 };
609 }
610
611 fn handle_quote(&mut self, quote: &QuoteTick) {
613 log_received("e);
614
615 if self.not_running() {
616 log_not_running("e);
617 return;
618 }
619
620 if let Err(e) = self.on_quote(quote) {
621 log_error(&e);
622 }
623 }
624
625 fn handle_trade(&mut self, trade: &TradeTick) {
627 log_received(&trade);
628
629 if self.not_running() {
630 log_not_running(&trade);
631 return;
632 }
633
634 if let Err(e) = self.on_trade(trade) {
635 log_error(&e);
636 }
637 }
638
639 fn handle_bar(&mut self, bar: &Bar) {
641 log_received(&bar);
642
643 if self.not_running() {
644 log_not_running(&bar);
645 return;
646 }
647
648 if let Err(e) = self.on_bar(bar) {
649 log_error(&e);
650 }
651 }
652
653 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
655 log_received(&mark_price);
656
657 if self.not_running() {
658 log_not_running(&mark_price);
659 return;
660 }
661
662 if let Err(e) = self.on_mark_price(mark_price) {
663 log_error(&e);
664 }
665 }
666
667 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
669 log_received(&index_price);
670
671 if self.not_running() {
672 log_not_running(&index_price);
673 return;
674 }
675
676 if let Err(e) = self.on_index_price(index_price) {
677 log_error(&e);
678 }
679 }
680
681 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
683 log_received(&funding_rate);
684
685 if self.not_running() {
686 log_not_running(&funding_rate);
687 return;
688 }
689
690 if let Err(e) = self.on_funding_rate(funding_rate) {
691 log_error(&e);
692 }
693 }
694
695 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
697 log_received(&status);
698
699 if self.not_running() {
700 log_not_running(&status);
701 return;
702 }
703
704 if let Err(e) = self.on_instrument_status(status) {
705 log_error(&e);
706 }
707 }
708
709 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
711 log_received(&close);
712
713 if self.not_running() {
714 log_not_running(&close);
715 return;
716 }
717
718 if let Err(e) = self.on_instrument_close(close) {
719 log_error(&e);
720 }
721 }
722
723 fn handle_order_filled(&mut self, event: &OrderFilled) {
725 log_received(&event);
726
727 if event.strategy_id.inner() == self.actor_id().inner() {
731 return;
732 }
733
734 if self.not_running() {
735 log_not_running(&event);
736 return;
737 }
738
739 if let Err(e) = self.on_order_filled(event) {
740 log_error(&e);
741 }
742 }
743
744 fn handle_order_canceled(&mut self, event: &OrderCanceled) {
746 log_received(&event);
747
748 if event.strategy_id.inner() == self.actor_id().inner() {
752 return;
753 }
754
755 if self.not_running() {
756 log_not_running(&event);
757 return;
758 }
759
760 if let Err(e) = self.on_order_canceled(event) {
761 log_error(&e);
762 }
763 }
764
765 #[cfg(feature = "defi")]
766 fn handle_block(&mut self, block: &Block) {
768 log_received(&block);
769
770 if self.not_running() {
771 log_not_running(&block);
772 return;
773 }
774
775 if let Err(e) = self.on_block(block) {
776 log_error(&e);
777 }
778 }
779
780 #[cfg(feature = "defi")]
781 fn handle_pool(&mut self, pool: &Pool) {
783 log_received(&pool);
784
785 if self.not_running() {
786 log_not_running(&pool);
787 return;
788 }
789
790 if let Err(e) = self.on_pool(pool) {
791 log_error(&e);
792 }
793 }
794
795 #[cfg(feature = "defi")]
796 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
798 log_received(&swap);
799
800 if self.not_running() {
801 log_not_running(&swap);
802 return;
803 }
804
805 if let Err(e) = self.on_pool_swap(swap) {
806 log_error(&e);
807 }
808 }
809
810 #[cfg(feature = "defi")]
811 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
813 log_received(&update);
814
815 if self.not_running() {
816 log_not_running(&update);
817 return;
818 }
819
820 if let Err(e) = self.on_pool_liquidity_update(update) {
821 log_error(&e);
822 }
823 }
824
825 #[cfg(feature = "defi")]
826 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
828 log_received(&collect);
829
830 if self.not_running() {
831 log_not_running(&collect);
832 return;
833 }
834
835 if let Err(e) = self.on_pool_fee_collect(collect) {
836 log_error(&e);
837 }
838 }
839
840 #[cfg(feature = "defi")]
841 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
843 log_received(&flash);
844
845 if self.not_running() {
846 log_not_running(&flash);
847 return;
848 }
849
850 if let Err(e) = self.on_pool_flash(flash) {
851 log_error(&e);
852 }
853 }
854
855 fn handle_historical_data(&mut self, data: &dyn Any) {
857 log_received(&data);
858
859 if let Err(e) = self.on_historical_data(data) {
860 log_error(&e);
861 }
862 }
863
864 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
866 log_received(&resp);
867
868 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
869 log_error(&e);
870 }
871 }
872
873 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
875 log_received(&resp);
876
877 if let Err(e) = self.on_instrument(&resp.data) {
878 log_error(&e);
879 }
880 }
881
882 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
884 log_received(&resp);
885
886 for inst in &resp.data {
887 if let Err(e) = self.on_instrument(inst) {
888 log_error(&e);
889 }
890 }
891 }
892
893 fn handle_book_response(&mut self, resp: &BookResponse) {
895 log_received(&resp);
896
897 if let Err(e) = self.on_book(&resp.data) {
898 log_error(&e);
899 }
900 }
901
902 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
904 log_received(&resp);
905
906 if let Err(e) = self.on_historical_quotes(&resp.data) {
907 log_error(&e);
908 }
909 }
910
911 fn handle_trades_response(&mut self, resp: &TradesResponse) {
913 log_received(&resp);
914
915 if let Err(e) = self.on_historical_trades(&resp.data) {
916 log_error(&e);
917 }
918 }
919
920 fn handle_bars_response(&mut self, resp: &BarsResponse) {
922 log_received(&resp);
923
924 if let Err(e) = self.on_historical_bars(&resp.data) {
925 log_error(&e);
926 }
927 }
928
929 fn subscribe_data(
931 &mut self,
932 data_type: DataType,
933 client_id: Option<ClientId>,
934 params: Option<IndexMap<String, String>>,
935 ) where
936 Self: 'static + Debug + Sized,
937 {
938 let actor_id = self.actor_id().inner();
939 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
940 move |data: &dyn Any| {
941 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
942 },
943 )));
944
945 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
946 }
947
948 fn subscribe_quotes(
950 &mut self,
951 instrument_id: InstrumentId,
952 client_id: Option<ClientId>,
953 params: Option<IndexMap<String, String>>,
954 ) where
955 Self: 'static + Debug + Sized,
956 {
957 let actor_id = self.actor_id().inner();
958 let topic = get_quotes_topic(instrument_id);
959
960 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
961 move |quote: &QuoteTick| {
962 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
963 actor.handle_quote(quote);
964 } else {
965 log::error!("Actor {actor_id} not found for quote handling");
966 }
967 },
968 )));
969
970 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
971 }
972
973 fn subscribe_instruments(
975 &mut self,
976 venue: Venue,
977 client_id: Option<ClientId>,
978 params: Option<IndexMap<String, String>>,
979 ) where
980 Self: 'static + Debug + Sized,
981 {
982 let actor_id = self.actor_id().inner();
983 let topic = get_instruments_topic(venue);
984
985 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
986 move |instrument: &InstrumentAny| {
987 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
988 actor.handle_instrument(instrument);
989 } else {
990 log::error!("Actor {actor_id} not found for instruments handling");
991 }
992 },
993 )));
994
995 DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
996 }
997
998 fn subscribe_instrument(
1000 &mut self,
1001 instrument_id: InstrumentId,
1002 client_id: Option<ClientId>,
1003 params: Option<IndexMap<String, String>>,
1004 ) where
1005 Self: 'static + Debug + Sized,
1006 {
1007 let actor_id = self.actor_id().inner();
1008 let topic = get_instrument_topic(instrument_id);
1009
1010 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1011 move |instrument: &InstrumentAny| {
1012 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1013 actor.handle_instrument(instrument);
1014 } else {
1015 log::error!("Actor {actor_id} not found for instrument handling");
1016 }
1017 },
1018 )));
1019
1020 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1021 }
1022
1023 fn subscribe_book_deltas(
1025 &mut self,
1026 instrument_id: InstrumentId,
1027 book_type: BookType,
1028 depth: Option<NonZeroUsize>,
1029 client_id: Option<ClientId>,
1030 managed: bool,
1031 params: Option<IndexMap<String, String>>,
1032 ) where
1033 Self: 'static + Debug + Sized,
1034 {
1035 let actor_id = self.actor_id().inner();
1036 let topic = get_book_deltas_topic(instrument_id);
1037
1038 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1039 move |deltas: &OrderBookDeltas| {
1040 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1041 },
1042 )));
1043
1044 DataActorCore::subscribe_book_deltas(
1045 self,
1046 topic,
1047 handler,
1048 instrument_id,
1049 book_type,
1050 depth,
1051 client_id,
1052 managed,
1053 params,
1054 );
1055 }
1056
1057 fn subscribe_book_at_interval(
1059 &mut self,
1060 instrument_id: InstrumentId,
1061 book_type: BookType,
1062 depth: Option<NonZeroUsize>,
1063 interval_ms: NonZeroUsize,
1064 client_id: Option<ClientId>,
1065 params: Option<IndexMap<String, String>>,
1066 ) where
1067 Self: 'static + Debug + Sized,
1068 {
1069 let actor_id = self.actor_id().inner();
1070 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1071
1072 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1073 move |book: &OrderBook| {
1074 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1075 },
1076 )));
1077
1078 DataActorCore::subscribe_book_at_interval(
1079 self,
1080 topic,
1081 handler,
1082 instrument_id,
1083 book_type,
1084 depth,
1085 interval_ms,
1086 client_id,
1087 params,
1088 );
1089 }
1090
1091 fn subscribe_trades(
1093 &mut self,
1094 instrument_id: InstrumentId,
1095 client_id: Option<ClientId>,
1096 params: Option<IndexMap<String, String>>,
1097 ) where
1098 Self: 'static + Debug + Sized,
1099 {
1100 let actor_id = self.actor_id().inner();
1101 let topic = get_trades_topic(instrument_id);
1102
1103 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1104 move |trade: &TradeTick| {
1105 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1106 },
1107 )));
1108
1109 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1110 }
1111
1112 fn subscribe_bars(
1114 &mut self,
1115 bar_type: BarType,
1116 client_id: Option<ClientId>,
1117 params: Option<IndexMap<String, String>>,
1118 ) where
1119 Self: 'static + Debug + Sized,
1120 {
1121 let actor_id = self.actor_id().inner();
1122 let topic = get_bars_topic(bar_type);
1123
1124 let handler =
1125 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1126 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1127 })));
1128
1129 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1130 }
1131
1132 fn subscribe_mark_prices(
1134 &mut self,
1135 instrument_id: InstrumentId,
1136 client_id: Option<ClientId>,
1137 params: Option<IndexMap<String, String>>,
1138 ) where
1139 Self: 'static + Debug + Sized,
1140 {
1141 let actor_id = self.actor_id().inner();
1142 let topic = get_mark_price_topic(instrument_id);
1143
1144 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1145 move |mark_price: &MarkPriceUpdate| {
1146 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1147 },
1148 )));
1149
1150 DataActorCore::subscribe_mark_prices(
1151 self,
1152 topic,
1153 handler,
1154 instrument_id,
1155 client_id,
1156 params,
1157 );
1158 }
1159
1160 fn subscribe_index_prices(
1162 &mut self,
1163 instrument_id: InstrumentId,
1164 client_id: Option<ClientId>,
1165 params: Option<IndexMap<String, String>>,
1166 ) where
1167 Self: 'static + Debug + Sized,
1168 {
1169 let actor_id = self.actor_id().inner();
1170 let topic = get_index_price_topic(instrument_id);
1171
1172 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1173 move |index_price: &IndexPriceUpdate| {
1174 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1175 },
1176 )));
1177
1178 DataActorCore::subscribe_index_prices(
1179 self,
1180 topic,
1181 handler,
1182 instrument_id,
1183 client_id,
1184 params,
1185 );
1186 }
1187
1188 fn subscribe_funding_rates(
1190 &mut self,
1191 instrument_id: InstrumentId,
1192 client_id: Option<ClientId>,
1193 params: Option<IndexMap<String, String>>,
1194 ) where
1195 Self: 'static + Debug + Sized,
1196 {
1197 let actor_id = self.actor_id().inner();
1198 let topic = get_funding_rate_topic(instrument_id);
1199
1200 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1201 move |funding_rate: &FundingRateUpdate| {
1202 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1203 },
1204 )));
1205
1206 DataActorCore::subscribe_funding_rates(
1207 self,
1208 topic,
1209 handler,
1210 instrument_id,
1211 client_id,
1212 params,
1213 );
1214 }
1215
1216 fn subscribe_instrument_status(
1218 &mut self,
1219 instrument_id: InstrumentId,
1220 client_id: Option<ClientId>,
1221 params: Option<IndexMap<String, String>>,
1222 ) where
1223 Self: 'static + Debug + Sized,
1224 {
1225 let actor_id = self.actor_id().inner();
1226 let topic = get_instrument_status_topic(instrument_id);
1227
1228 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1229 move |status: &InstrumentStatus| {
1230 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1231 },
1232 )));
1233
1234 DataActorCore::subscribe_instrument_status(
1235 self,
1236 topic,
1237 handler,
1238 instrument_id,
1239 client_id,
1240 params,
1241 );
1242 }
1243
1244 fn subscribe_instrument_close(
1246 &mut self,
1247 instrument_id: InstrumentId,
1248 client_id: Option<ClientId>,
1249 params: Option<IndexMap<String, String>>,
1250 ) where
1251 Self: 'static + Debug + Sized,
1252 {
1253 let actor_id = self.actor_id().inner();
1254 let topic = get_instrument_close_topic(instrument_id);
1255
1256 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1257 move |close: &InstrumentClose| {
1258 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1259 },
1260 )));
1261
1262 DataActorCore::subscribe_instrument_close(
1263 self,
1264 topic,
1265 handler,
1266 instrument_id,
1267 client_id,
1268 params,
1269 );
1270 }
1271
1272 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1274 where
1275 Self: 'static + Debug + Sized,
1276 {
1277 let actor_id = self.actor_id().inner();
1278 let topic = get_order_fills_topic(instrument_id);
1279
1280 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1281 move |event: &OrderFilled| {
1282 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(event);
1283 },
1284 )));
1285
1286 DataActorCore::subscribe_order_fills(self, topic, handler);
1287 }
1288
1289 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1291 where
1292 Self: 'static + Debug + Sized,
1293 {
1294 let actor_id = self.actor_id().inner();
1295 let topic = get_order_cancels_topic(instrument_id);
1296
1297 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1298 move |event: &OrderCanceled| {
1299 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(event);
1300 },
1301 )));
1302
1303 DataActorCore::subscribe_order_cancels(self, topic, handler);
1304 }
1305
1306 #[cfg(feature = "defi")]
1307 fn subscribe_blocks(
1309 &mut self,
1310 chain: Blockchain,
1311 client_id: Option<ClientId>,
1312 params: Option<IndexMap<String, String>>,
1313 ) where
1314 Self: 'static + Debug + Sized,
1315 {
1316 let actor_id = self.actor_id().inner();
1317 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1318
1319 let handler =
1320 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1321 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1322 })));
1323
1324 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1325 }
1326
1327 #[cfg(feature = "defi")]
1328 fn subscribe_pool(
1330 &mut self,
1331 instrument_id: InstrumentId,
1332 client_id: Option<ClientId>,
1333 params: Option<IndexMap<String, String>>,
1334 ) where
1335 Self: 'static + Debug + Sized,
1336 {
1337 let actor_id = self.actor_id().inner();
1338 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1339
1340 let handler =
1341 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1342 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1343 })));
1344
1345 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1346 }
1347
1348 #[cfg(feature = "defi")]
1349 fn subscribe_pool_swaps(
1351 &mut self,
1352 instrument_id: InstrumentId,
1353 client_id: Option<ClientId>,
1354 params: Option<IndexMap<String, String>>,
1355 ) where
1356 Self: 'static + Debug + Sized,
1357 {
1358 let actor_id = self.actor_id().inner();
1359 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1360
1361 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1362 move |swap: &PoolSwap| {
1363 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1364 },
1365 )));
1366
1367 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1368 }
1369
1370 #[cfg(feature = "defi")]
1371 fn subscribe_pool_liquidity_updates(
1373 &mut self,
1374 instrument_id: InstrumentId,
1375 client_id: Option<ClientId>,
1376 params: Option<IndexMap<String, String>>,
1377 ) where
1378 Self: 'static + Debug + Sized,
1379 {
1380 let actor_id = self.actor_id().inner();
1381 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1382
1383 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1384 move |update: &PoolLiquidityUpdate| {
1385 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1386 },
1387 )));
1388
1389 DataActorCore::subscribe_pool_liquidity_updates(
1390 self,
1391 topic,
1392 handler,
1393 instrument_id,
1394 client_id,
1395 params,
1396 );
1397 }
1398
1399 #[cfg(feature = "defi")]
1400 fn subscribe_pool_fee_collects(
1402 &mut self,
1403 instrument_id: InstrumentId,
1404 client_id: Option<ClientId>,
1405 params: Option<IndexMap<String, String>>,
1406 ) where
1407 Self: 'static + Debug + Sized,
1408 {
1409 let actor_id = self.actor_id().inner();
1410 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1411
1412 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1413 move |collect: &PoolFeeCollect| {
1414 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1415 },
1416 )));
1417
1418 DataActorCore::subscribe_pool_fee_collects(
1419 self,
1420 topic,
1421 handler,
1422 instrument_id,
1423 client_id,
1424 params,
1425 );
1426 }
1427
1428 #[cfg(feature = "defi")]
1429 fn subscribe_pool_flash_events(
1431 &mut self,
1432 instrument_id: InstrumentId,
1433 client_id: Option<ClientId>,
1434 params: Option<IndexMap<String, String>>,
1435 ) where
1436 Self: 'static + Debug + Sized,
1437 {
1438 let actor_id = self.actor_id().inner();
1439 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1440
1441 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1442 move |flash: &PoolFlash| {
1443 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1444 },
1445 )));
1446
1447 DataActorCore::subscribe_pool_flash_events(
1448 self,
1449 topic,
1450 handler,
1451 instrument_id,
1452 client_id,
1453 params,
1454 );
1455 }
1456
1457 fn unsubscribe_data(
1459 &mut self,
1460 data_type: DataType,
1461 client_id: Option<ClientId>,
1462 params: Option<IndexMap<String, String>>,
1463 ) where
1464 Self: 'static + Debug + Sized,
1465 {
1466 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1467 }
1468
1469 fn unsubscribe_instruments(
1471 &mut self,
1472 venue: Venue,
1473 client_id: Option<ClientId>,
1474 params: Option<IndexMap<String, String>>,
1475 ) where
1476 Self: 'static + Debug + Sized,
1477 {
1478 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1479 }
1480
1481 fn unsubscribe_instrument(
1483 &mut self,
1484 instrument_id: InstrumentId,
1485 client_id: Option<ClientId>,
1486 params: Option<IndexMap<String, String>>,
1487 ) where
1488 Self: 'static + Debug + Sized,
1489 {
1490 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1491 }
1492
1493 fn unsubscribe_book_deltas(
1495 &mut self,
1496 instrument_id: InstrumentId,
1497 client_id: Option<ClientId>,
1498 params: Option<IndexMap<String, String>>,
1499 ) where
1500 Self: 'static + Debug + Sized,
1501 {
1502 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1503 }
1504
1505 fn unsubscribe_book_at_interval(
1507 &mut self,
1508 instrument_id: InstrumentId,
1509 interval_ms: NonZeroUsize,
1510 client_id: Option<ClientId>,
1511 params: Option<IndexMap<String, String>>,
1512 ) where
1513 Self: 'static + Debug + Sized,
1514 {
1515 DataActorCore::unsubscribe_book_at_interval(
1516 self,
1517 instrument_id,
1518 interval_ms,
1519 client_id,
1520 params,
1521 );
1522 }
1523
1524 fn unsubscribe_quotes(
1526 &mut self,
1527 instrument_id: InstrumentId,
1528 client_id: Option<ClientId>,
1529 params: Option<IndexMap<String, String>>,
1530 ) where
1531 Self: 'static + Debug + Sized,
1532 {
1533 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1534 }
1535
1536 fn unsubscribe_trades(
1538 &mut self,
1539 instrument_id: InstrumentId,
1540 client_id: Option<ClientId>,
1541 params: Option<IndexMap<String, String>>,
1542 ) where
1543 Self: 'static + Debug + Sized,
1544 {
1545 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1546 }
1547
1548 fn unsubscribe_bars(
1550 &mut self,
1551 bar_type: BarType,
1552 client_id: Option<ClientId>,
1553 params: Option<IndexMap<String, String>>,
1554 ) where
1555 Self: 'static + Debug + Sized,
1556 {
1557 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1558 }
1559
1560 fn unsubscribe_mark_prices(
1562 &mut self,
1563 instrument_id: InstrumentId,
1564 client_id: Option<ClientId>,
1565 params: Option<IndexMap<String, String>>,
1566 ) where
1567 Self: 'static + Debug + Sized,
1568 {
1569 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1570 }
1571
1572 fn unsubscribe_index_prices(
1574 &mut self,
1575 instrument_id: InstrumentId,
1576 client_id: Option<ClientId>,
1577 params: Option<IndexMap<String, String>>,
1578 ) where
1579 Self: 'static + Debug + Sized,
1580 {
1581 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1582 }
1583
1584 fn unsubscribe_funding_rates(
1586 &mut self,
1587 instrument_id: InstrumentId,
1588 client_id: Option<ClientId>,
1589 params: Option<IndexMap<String, String>>,
1590 ) where
1591 Self: 'static + Debug + Sized,
1592 {
1593 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1594 }
1595
1596 fn unsubscribe_instrument_status(
1598 &mut self,
1599 instrument_id: InstrumentId,
1600 client_id: Option<ClientId>,
1601 params: Option<IndexMap<String, String>>,
1602 ) where
1603 Self: 'static + Debug + Sized,
1604 {
1605 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1606 }
1607
1608 fn unsubscribe_instrument_close(
1610 &mut self,
1611 instrument_id: InstrumentId,
1612 client_id: Option<ClientId>,
1613 params: Option<IndexMap<String, String>>,
1614 ) where
1615 Self: 'static + Debug + Sized,
1616 {
1617 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1618 }
1619
1620 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1622 where
1623 Self: 'static + Debug + Sized,
1624 {
1625 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1626 }
1627
1628 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1630 where
1631 Self: 'static + Debug + Sized,
1632 {
1633 DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1634 }
1635
1636 #[cfg(feature = "defi")]
1637 fn unsubscribe_blocks(
1639 &mut self,
1640 chain: Blockchain,
1641 client_id: Option<ClientId>,
1642 params: Option<IndexMap<String, String>>,
1643 ) where
1644 Self: 'static + Debug + Sized,
1645 {
1646 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1647 }
1648
1649 #[cfg(feature = "defi")]
1650 fn unsubscribe_pool(
1652 &mut self,
1653 instrument_id: InstrumentId,
1654 client_id: Option<ClientId>,
1655 params: Option<IndexMap<String, String>>,
1656 ) where
1657 Self: 'static + Debug + Sized,
1658 {
1659 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1660 }
1661
1662 #[cfg(feature = "defi")]
1663 fn unsubscribe_pool_swaps(
1665 &mut self,
1666 instrument_id: InstrumentId,
1667 client_id: Option<ClientId>,
1668 params: Option<IndexMap<String, String>>,
1669 ) where
1670 Self: 'static + Debug + Sized,
1671 {
1672 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1673 }
1674
1675 #[cfg(feature = "defi")]
1676 fn unsubscribe_pool_liquidity_updates(
1678 &mut self,
1679 instrument_id: InstrumentId,
1680 client_id: Option<ClientId>,
1681 params: Option<IndexMap<String, String>>,
1682 ) where
1683 Self: 'static + Debug + Sized,
1684 {
1685 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1686 }
1687
1688 #[cfg(feature = "defi")]
1689 fn unsubscribe_pool_fee_collects(
1691 &mut self,
1692 instrument_id: InstrumentId,
1693 client_id: Option<ClientId>,
1694 params: Option<IndexMap<String, String>>,
1695 ) where
1696 Self: 'static + Debug + Sized,
1697 {
1698 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1699 }
1700
1701 #[cfg(feature = "defi")]
1702 fn unsubscribe_pool_flash_events(
1704 &mut self,
1705 instrument_id: InstrumentId,
1706 client_id: Option<ClientId>,
1707 params: Option<IndexMap<String, String>>,
1708 ) where
1709 Self: 'static + Debug + Sized,
1710 {
1711 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1712 }
1713
1714 fn request_data(
1720 &mut self,
1721 data_type: DataType,
1722 client_id: ClientId,
1723 start: Option<DateTime<Utc>>,
1724 end: Option<DateTime<Utc>>,
1725 limit: Option<NonZeroUsize>,
1726 params: Option<IndexMap<String, String>>,
1727 ) -> anyhow::Result<UUID4>
1728 where
1729 Self: 'static + Debug + Sized,
1730 {
1731 let actor_id = self.actor_id().inner();
1732 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1733 move |resp: &CustomDataResponse| {
1734 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1735 },
1736 )));
1737
1738 DataActorCore::request_data(
1739 self, data_type, client_id, start, end, limit, params, handler,
1740 )
1741 }
1742
1743 fn request_instrument(
1749 &mut self,
1750 instrument_id: InstrumentId,
1751 start: Option<DateTime<Utc>>,
1752 end: Option<DateTime<Utc>>,
1753 client_id: Option<ClientId>,
1754 params: Option<IndexMap<String, String>>,
1755 ) -> anyhow::Result<UUID4>
1756 where
1757 Self: 'static + Debug + Sized,
1758 {
1759 let actor_id = self.actor_id().inner();
1760 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1761 move |resp: &InstrumentResponse| {
1762 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1763 },
1764 )));
1765
1766 DataActorCore::request_instrument(
1767 self,
1768 instrument_id,
1769 start,
1770 end,
1771 client_id,
1772 params,
1773 handler,
1774 )
1775 }
1776
1777 fn request_instruments(
1783 &mut self,
1784 venue: Option<Venue>,
1785 start: Option<DateTime<Utc>>,
1786 end: Option<DateTime<Utc>>,
1787 client_id: Option<ClientId>,
1788 params: Option<IndexMap<String, String>>,
1789 ) -> anyhow::Result<UUID4>
1790 where
1791 Self: 'static + Debug + Sized,
1792 {
1793 let actor_id = self.actor_id().inner();
1794 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1795 move |resp: &InstrumentsResponse| {
1796 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1797 },
1798 )));
1799
1800 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1801 }
1802
1803 fn request_book_snapshot(
1809 &mut self,
1810 instrument_id: InstrumentId,
1811 depth: Option<NonZeroUsize>,
1812 client_id: Option<ClientId>,
1813 params: Option<IndexMap<String, String>>,
1814 ) -> anyhow::Result<UUID4>
1815 where
1816 Self: 'static + Debug + Sized,
1817 {
1818 let actor_id = self.actor_id().inner();
1819 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1820 move |resp: &BookResponse| {
1821 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1822 },
1823 )));
1824
1825 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1826 }
1827
1828 fn request_quotes(
1834 &mut self,
1835 instrument_id: InstrumentId,
1836 start: Option<DateTime<Utc>>,
1837 end: Option<DateTime<Utc>>,
1838 limit: Option<NonZeroUsize>,
1839 client_id: Option<ClientId>,
1840 params: Option<IndexMap<String, String>>,
1841 ) -> anyhow::Result<UUID4>
1842 where
1843 Self: 'static + Debug + Sized,
1844 {
1845 let actor_id = self.actor_id().inner();
1846 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1847 move |resp: &QuotesResponse| {
1848 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1849 },
1850 )));
1851
1852 DataActorCore::request_quotes(
1853 self,
1854 instrument_id,
1855 start,
1856 end,
1857 limit,
1858 client_id,
1859 params,
1860 handler,
1861 )
1862 }
1863
1864 fn request_trades(
1870 &mut self,
1871 instrument_id: InstrumentId,
1872 start: Option<DateTime<Utc>>,
1873 end: Option<DateTime<Utc>>,
1874 limit: Option<NonZeroUsize>,
1875 client_id: Option<ClientId>,
1876 params: Option<IndexMap<String, String>>,
1877 ) -> anyhow::Result<UUID4>
1878 where
1879 Self: 'static + Debug + Sized,
1880 {
1881 let actor_id = self.actor_id().inner();
1882 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1883 move |resp: &TradesResponse| {
1884 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1885 },
1886 )));
1887
1888 DataActorCore::request_trades(
1889 self,
1890 instrument_id,
1891 start,
1892 end,
1893 limit,
1894 client_id,
1895 params,
1896 handler,
1897 )
1898 }
1899
1900 fn request_bars(
1906 &mut self,
1907 bar_type: BarType,
1908 start: Option<DateTime<Utc>>,
1909 end: Option<DateTime<Utc>>,
1910 limit: Option<NonZeroUsize>,
1911 client_id: Option<ClientId>,
1912 params: Option<IndexMap<String, String>>,
1913 ) -> anyhow::Result<UUID4>
1914 where
1915 Self: 'static + Debug + Sized,
1916 {
1917 let actor_id = self.actor_id().inner();
1918 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1919 move |resp: &BarsResponse| {
1920 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1921 },
1922 )));
1923
1924 DataActorCore::request_bars(
1925 self, bar_type, start, end, limit, client_id, params, handler,
1926 )
1927 }
1928}
1929
1930impl<T> Actor for T
1932where
1933 T: DataActor + Debug + 'static,
1934{
1935 fn id(&self) -> Ustr {
1936 self.actor_id.inner()
1937 }
1938
1939 #[allow(unused_variables)]
1940 fn handle(&mut self, msg: &dyn Any) {
1941 }
1943
1944 fn as_any(&self) -> &dyn Any {
1945 self
1946 }
1947}
1948
1949impl<T> Component for T
1951where
1952 T: DataActor + Debug + 'static,
1953{
1954 fn component_id(&self) -> ComponentId {
1955 ComponentId::new(self.actor_id.inner().as_str())
1956 }
1957
1958 fn state(&self) -> ComponentState {
1959 self.state
1960 }
1961
1962 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1963 self.state = self.state.transition(&trigger)?;
1964 log::info!("{}", self.state.variant_name());
1965 Ok(())
1966 }
1967
1968 fn register(
1969 &mut self,
1970 trader_id: TraderId,
1971 clock: Rc<RefCell<dyn Clock>>,
1972 cache: Rc<RefCell<Cache>>,
1973 ) -> anyhow::Result<()> {
1974 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1975
1976 let actor_id = self.actor_id().inner();
1978 let callback = TimeEventCallback::from(move |event: TimeEvent| {
1979 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1980 actor.handle_time_event(&event);
1981 } else {
1982 log::error!("Actor {actor_id} not found for time event handling");
1983 }
1984 });
1985
1986 clock.borrow_mut().register_default_handler(callback);
1987
1988 self.initialize()
1989 }
1990
1991 fn on_start(&mut self) -> anyhow::Result<()> {
1992 DataActor::on_start(self)
1993 }
1994
1995 fn on_stop(&mut self) -> anyhow::Result<()> {
1996 DataActor::on_stop(self)
1997 }
1998
1999 fn on_resume(&mut self) -> anyhow::Result<()> {
2000 DataActor::on_resume(self)
2001 }
2002
2003 fn on_degrade(&mut self) -> anyhow::Result<()> {
2004 DataActor::on_degrade(self)
2005 }
2006
2007 fn on_fault(&mut self) -> anyhow::Result<()> {
2008 DataActor::on_fault(self)
2009 }
2010
2011 fn on_reset(&mut self) -> anyhow::Result<()> {
2012 DataActor::on_reset(self)
2013 }
2014
2015 fn on_dispose(&mut self) -> anyhow::Result<()> {
2016 DataActor::on_dispose(self)
2017 }
2018}
2019
2020#[derive(Clone)]
2022#[allow(
2023 dead_code,
2024 reason = "TODO: Under development (pending_requests, signal_classes)"
2025)]
2026pub struct DataActorCore {
2027 pub actor_id: ActorId,
2029 pub config: DataActorConfig,
2031 trader_id: Option<TraderId>,
2032 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2035 topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
2036 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2038 signal_classes: AHashMap<String, String>,
2039 #[cfg(feature = "indicators")]
2040 indicators: Indicators,
2041}
2042
2043impl Debug for DataActorCore {
2044 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2045 f.debug_struct(stringify!(DataActorCore))
2046 .field("actor_id", &self.actor_id)
2047 .field("config", &self.config)
2048 .field("state", &self.state)
2049 .field("trader_id", &self.trader_id)
2050 .finish()
2051 }
2052}
2053
2054impl DataActorCore {
2055 pub(crate) fn add_subscription(
2059 &mut self,
2060 topic: MStr<Topic>,
2061 handler: ShareableMessageHandler,
2062 ) {
2063 if self.topic_handlers.contains_key(&topic) {
2064 log::warn!(
2065 "Actor {} attempted duplicate subscription to topic '{topic}'",
2066 self.actor_id,
2067 );
2068 return;
2069 }
2070
2071 self.topic_handlers.insert(topic, handler.clone());
2072 msgbus::subscribe_topic(topic, handler, None);
2073 }
2074
2075 pub(crate) fn remove_subscription(&mut self, topic: MStr<Topic>) {
2079 if let Some(handler) = self.topic_handlers.remove(&topic) {
2080 msgbus::unsubscribe_topic(topic, handler);
2081 } else {
2082 log::warn!(
2083 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2084 self.actor_id,
2085 );
2086 }
2087 }
2088
2089 pub fn new(config: DataActorConfig) -> Self {
2091 let actor_id = config
2092 .actor_id
2093 .unwrap_or_else(|| Self::default_actor_id(&config));
2094
2095 Self {
2096 actor_id,
2097 config,
2098 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2102 topic_handlers: AHashMap::new(),
2103 warning_events: AHashSet::new(),
2104 pending_requests: AHashMap::new(),
2105 signal_classes: AHashMap::new(),
2106 #[cfg(feature = "indicators")]
2107 indicators: Indicators::default(),
2108 }
2109 }
2110
2111 #[must_use]
2113 pub fn mem_address(&self) -> String {
2114 format!("{self:p}")
2115 }
2116
2117 pub fn state(&self) -> ComponentState {
2119 self.state
2120 }
2121
2122 pub fn trader_id(&self) -> Option<TraderId> {
2124 self.trader_id
2125 }
2126
2127 pub fn actor_id(&self) -> ActorId {
2129 self.actor_id
2130 }
2131
2132 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2133 let memory_address = std::ptr::from_ref(config) as *const _ as usize;
2134 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2135 }
2136
2137 pub fn timestamp_ns(&self) -> UnixNanos {
2139 self.clock_ref().timestamp_ns()
2140 }
2141
2142 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2148 self.clock
2149 .as_ref()
2150 .unwrap_or_else(|| {
2151 panic!(
2152 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2153 self.actor_id, self.trader_id
2154 )
2155 })
2156 .borrow_mut()
2157 }
2158
2159 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2165 self.clock
2166 .as_ref()
2167 .expect("DataActor must be registered before accessing clock")
2168 .clone()
2169 }
2170
2171 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2172 self.clock
2173 .as_ref()
2174 .unwrap_or_else(|| {
2175 panic!(
2176 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2177 self.actor_id, self.trader_id
2178 )
2179 })
2180 .borrow()
2181 }
2182
2183 pub fn cache(&self) -> Ref<'_, Cache> {
2189 self.cache
2190 .as_ref()
2191 .expect("DataActor must be registered before accessing cache")
2192 .borrow()
2193 }
2194
2195 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2201 self.cache
2202 .as_ref()
2203 .expect("DataActor must be registered before accessing cache")
2204 .clone()
2205 }
2206
2207 pub fn register(
2216 &mut self,
2217 trader_id: TraderId,
2218 clock: Rc<RefCell<dyn Clock>>,
2219 cache: Rc<RefCell<Cache>>,
2220 ) -> anyhow::Result<()> {
2221 if let Some(existing_trader_id) = self.trader_id {
2222 anyhow::bail!(
2223 "DataActor {} already registered with trader {existing_trader_id}",
2224 self.actor_id
2225 );
2226 }
2227
2228 {
2230 let _timestamp = clock.borrow().timestamp_ns();
2231 }
2232
2233 {
2235 let _cache_borrow = cache.borrow();
2236 }
2237
2238 self.trader_id = Some(trader_id);
2239 self.clock = Some(clock);
2240 self.cache = Some(cache);
2241
2242 if !self.is_properly_registered() {
2244 anyhow::bail!(
2245 "DataActor {} registration incomplete - validation failed",
2246 self.actor_id
2247 );
2248 }
2249
2250 log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2251 Ok(())
2252 }
2253
2254 pub fn register_warning_event(&mut self, event_type: &str) {
2256 self.warning_events.insert(event_type.to_string());
2257 log::debug!("Registered event type '{event_type}' for warning logs");
2258 }
2259
2260 pub fn deregister_warning_event(&mut self, event_type: &str) {
2262 self.warning_events.remove(event_type);
2263 log::debug!("Deregistered event type '{event_type}' from warning logs");
2264 }
2265
2266 pub fn is_registered(&self) -> bool {
2267 self.trader_id.is_some()
2268 }
2269
2270 pub(crate) fn check_registered(&self) {
2271 assert!(
2272 self.is_registered(),
2273 "Actor has not been registered with a Trader"
2274 );
2275 }
2276
2277 fn is_properly_registered(&self) -> bool {
2279 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2280 }
2281
2282 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2283 if self.config.log_commands {
2284 log::info!("{CMD}{SEND} {command:?}");
2285 }
2286
2287 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2288 msgbus::send_any(endpoint, command.as_any());
2289 }
2290
2291 #[allow(dead_code)]
2292 fn send_data_req(&self, request: RequestCommand) {
2293 if self.config.log_commands {
2294 log::info!("{REQ}{SEND} {request:?}");
2295 }
2296
2297 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2300 msgbus::send_any(endpoint, request.as_any());
2301 }
2302
2303 pub fn shutdown_system(&self, reason: Option<String>) {
2309 self.check_registered();
2310
2311 let command = ShutdownSystem::new(
2313 self.trader_id().unwrap(),
2314 self.actor_id.inner(),
2315 reason,
2316 UUID4::new(),
2317 self.timestamp_ns(),
2318 );
2319
2320 let endpoint = "command.system.shutdown".into();
2321 msgbus::send_any(endpoint, command.as_any());
2322 }
2323
2324 pub fn subscribe_data(
2332 &mut self,
2333 handler: ShareableMessageHandler,
2334 data_type: DataType,
2335 client_id: Option<ClientId>,
2336 params: Option<IndexMap<String, String>>,
2337 ) {
2338 assert!(
2339 self.is_properly_registered(),
2340 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2341 self.actor_id,
2342 self.trader_id,
2343 self.clock.is_some(),
2344 self.cache.is_some()
2345 );
2346
2347 let topic = get_custom_topic(&data_type);
2348 self.add_subscription(topic, handler);
2349
2350 if client_id.is_none() {
2352 return;
2353 }
2354
2355 let command = SubscribeCommand::Data(SubscribeCustomData {
2356 data_type,
2357 client_id,
2358 venue: None,
2359 command_id: UUID4::new(),
2360 ts_init: self.timestamp_ns(),
2361 correlation_id: None,
2362 params,
2363 });
2364
2365 self.send_data_cmd(DataCommand::Subscribe(command));
2366 }
2367
2368 pub fn subscribe_quotes(
2370 &mut self,
2371 topic: MStr<Topic>,
2372 handler: ShareableMessageHandler,
2373 instrument_id: InstrumentId,
2374 client_id: Option<ClientId>,
2375 params: Option<IndexMap<String, String>>,
2376 ) {
2377 self.check_registered();
2378
2379 self.add_subscription(topic, handler);
2380
2381 let command = SubscribeCommand::Quotes(SubscribeQuotes {
2382 instrument_id,
2383 client_id,
2384 venue: Some(instrument_id.venue),
2385 command_id: UUID4::new(),
2386 ts_init: self.timestamp_ns(),
2387 correlation_id: None,
2388 params,
2389 });
2390
2391 self.send_data_cmd(DataCommand::Subscribe(command));
2392 }
2393
2394 pub fn subscribe_instruments(
2396 &mut self,
2397 topic: MStr<Topic>,
2398 handler: ShareableMessageHandler,
2399 venue: Venue,
2400 client_id: Option<ClientId>,
2401 params: Option<IndexMap<String, String>>,
2402 ) {
2403 self.check_registered();
2404
2405 self.add_subscription(topic, handler);
2406
2407 let command = SubscribeCommand::Instruments(SubscribeInstruments {
2408 client_id,
2409 venue,
2410 command_id: UUID4::new(),
2411 ts_init: self.timestamp_ns(),
2412 correlation_id: None,
2413 params,
2414 });
2415
2416 self.send_data_cmd(DataCommand::Subscribe(command));
2417 }
2418
2419 pub fn subscribe_instrument(
2421 &mut self,
2422 topic: MStr<Topic>,
2423 handler: ShareableMessageHandler,
2424 instrument_id: InstrumentId,
2425 client_id: Option<ClientId>,
2426 params: Option<IndexMap<String, String>>,
2427 ) {
2428 self.check_registered();
2429
2430 self.add_subscription(topic, handler);
2431
2432 let command = SubscribeCommand::Instrument(SubscribeInstrument {
2433 instrument_id,
2434 client_id,
2435 venue: Some(instrument_id.venue),
2436 command_id: UUID4::new(),
2437 ts_init: self.timestamp_ns(),
2438 correlation_id: None,
2439 params,
2440 });
2441
2442 self.send_data_cmd(DataCommand::Subscribe(command));
2443 }
2444
2445 #[allow(clippy::too_many_arguments)]
2447 pub fn subscribe_book_deltas(
2448 &mut self,
2449 topic: MStr<Topic>,
2450 handler: ShareableMessageHandler,
2451 instrument_id: InstrumentId,
2452 book_type: BookType,
2453 depth: Option<NonZeroUsize>,
2454 client_id: Option<ClientId>,
2455 managed: bool,
2456 params: Option<IndexMap<String, String>>,
2457 ) {
2458 self.check_registered();
2459
2460 self.add_subscription(topic, handler);
2461
2462 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2463 instrument_id,
2464 book_type,
2465 client_id,
2466 venue: Some(instrument_id.venue),
2467 command_id: UUID4::new(),
2468 ts_init: self.timestamp_ns(),
2469 depth,
2470 managed,
2471 correlation_id: None,
2472 params,
2473 });
2474
2475 self.send_data_cmd(DataCommand::Subscribe(command));
2476 }
2477
2478 #[allow(clippy::too_many_arguments)]
2480 pub fn subscribe_book_at_interval(
2481 &mut self,
2482 topic: MStr<Topic>,
2483 handler: ShareableMessageHandler,
2484 instrument_id: InstrumentId,
2485 book_type: BookType,
2486 depth: Option<NonZeroUsize>,
2487 interval_ms: NonZeroUsize,
2488 client_id: Option<ClientId>,
2489 params: Option<IndexMap<String, String>>,
2490 ) {
2491 self.check_registered();
2492
2493 self.add_subscription(topic, handler);
2494
2495 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2496 instrument_id,
2497 book_type,
2498 client_id,
2499 venue: Some(instrument_id.venue),
2500 command_id: UUID4::new(),
2501 ts_init: self.timestamp_ns(),
2502 depth,
2503 interval_ms,
2504 correlation_id: None,
2505 params,
2506 });
2507
2508 self.send_data_cmd(DataCommand::Subscribe(command));
2509 }
2510
2511 pub fn subscribe_trades(
2513 &mut self,
2514 topic: MStr<Topic>,
2515 handler: ShareableMessageHandler,
2516 instrument_id: InstrumentId,
2517 client_id: Option<ClientId>,
2518 params: Option<IndexMap<String, String>>,
2519 ) {
2520 self.check_registered();
2521
2522 self.add_subscription(topic, handler);
2523
2524 let command = SubscribeCommand::Trades(SubscribeTrades {
2525 instrument_id,
2526 client_id,
2527 venue: Some(instrument_id.venue),
2528 command_id: UUID4::new(),
2529 ts_init: self.timestamp_ns(),
2530 correlation_id: None,
2531 params,
2532 });
2533
2534 self.send_data_cmd(DataCommand::Subscribe(command));
2535 }
2536
2537 pub fn subscribe_bars(
2539 &mut self,
2540 topic: MStr<Topic>,
2541 handler: ShareableMessageHandler,
2542 bar_type: BarType,
2543 client_id: Option<ClientId>,
2544 params: Option<IndexMap<String, String>>,
2545 ) {
2546 self.check_registered();
2547
2548 self.add_subscription(topic, handler);
2549
2550 let command = SubscribeCommand::Bars(SubscribeBars {
2551 bar_type,
2552 client_id,
2553 venue: Some(bar_type.instrument_id().venue),
2554 command_id: UUID4::new(),
2555 ts_init: self.timestamp_ns(),
2556 correlation_id: None,
2557 params,
2558 });
2559
2560 self.send_data_cmd(DataCommand::Subscribe(command));
2561 }
2562
2563 pub fn subscribe_mark_prices(
2565 &mut self,
2566 topic: MStr<Topic>,
2567 handler: ShareableMessageHandler,
2568 instrument_id: InstrumentId,
2569 client_id: Option<ClientId>,
2570 params: Option<IndexMap<String, String>>,
2571 ) {
2572 self.check_registered();
2573
2574 self.add_subscription(topic, handler);
2575
2576 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2577 instrument_id,
2578 client_id,
2579 venue: Some(instrument_id.venue),
2580 command_id: UUID4::new(),
2581 ts_init: self.timestamp_ns(),
2582 correlation_id: None,
2583 params,
2584 });
2585
2586 self.send_data_cmd(DataCommand::Subscribe(command));
2587 }
2588
2589 pub fn subscribe_index_prices(
2591 &mut self,
2592 topic: MStr<Topic>,
2593 handler: ShareableMessageHandler,
2594 instrument_id: InstrumentId,
2595 client_id: Option<ClientId>,
2596 params: Option<IndexMap<String, String>>,
2597 ) {
2598 self.check_registered();
2599
2600 self.add_subscription(topic, handler);
2601
2602 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2603 instrument_id,
2604 client_id,
2605 venue: Some(instrument_id.venue),
2606 command_id: UUID4::new(),
2607 ts_init: self.timestamp_ns(),
2608 correlation_id: None,
2609 params,
2610 });
2611
2612 self.send_data_cmd(DataCommand::Subscribe(command));
2613 }
2614
2615 pub fn subscribe_funding_rates(
2617 &mut self,
2618 topic: MStr<Topic>,
2619 handler: ShareableMessageHandler,
2620 instrument_id: InstrumentId,
2621 client_id: Option<ClientId>,
2622 params: Option<IndexMap<String, String>>,
2623 ) {
2624 self.check_registered();
2625
2626 self.add_subscription(topic, handler);
2627
2628 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
2629 instrument_id,
2630 client_id,
2631 venue: Some(instrument_id.venue),
2632 command_id: UUID4::new(),
2633 ts_init: self.timestamp_ns(),
2634 correlation_id: None,
2635 params,
2636 });
2637
2638 self.send_data_cmd(DataCommand::Subscribe(command));
2639 }
2640
2641 pub fn subscribe_instrument_status(
2643 &mut self,
2644 topic: MStr<Topic>,
2645 handler: ShareableMessageHandler,
2646 instrument_id: InstrumentId,
2647 client_id: Option<ClientId>,
2648 params: Option<IndexMap<String, String>>,
2649 ) {
2650 self.check_registered();
2651
2652 self.add_subscription(topic, handler);
2653
2654 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2655 instrument_id,
2656 client_id,
2657 venue: Some(instrument_id.venue),
2658 command_id: UUID4::new(),
2659 ts_init: self.timestamp_ns(),
2660 correlation_id: None,
2661 params,
2662 });
2663
2664 self.send_data_cmd(DataCommand::Subscribe(command));
2665 }
2666
2667 pub fn subscribe_instrument_close(
2669 &mut self,
2670 topic: MStr<Topic>,
2671 handler: ShareableMessageHandler,
2672 instrument_id: InstrumentId,
2673 client_id: Option<ClientId>,
2674 params: Option<IndexMap<String, String>>,
2675 ) {
2676 self.check_registered();
2677
2678 self.add_subscription(topic, handler);
2679
2680 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2681 instrument_id,
2682 client_id,
2683 venue: Some(instrument_id.venue),
2684 command_id: UUID4::new(),
2685 ts_init: self.timestamp_ns(),
2686 correlation_id: None,
2687 params,
2688 });
2689
2690 self.send_data_cmd(DataCommand::Subscribe(command));
2691 }
2692
2693 pub fn subscribe_order_fills(&mut self, topic: MStr<Topic>, handler: ShareableMessageHandler) {
2695 self.check_registered();
2696 self.add_subscription(topic, handler);
2697 }
2698
2699 pub fn subscribe_order_cancels(
2701 &mut self,
2702 topic: MStr<Topic>,
2703 handler: ShareableMessageHandler,
2704 ) {
2705 self.check_registered();
2706 self.add_subscription(topic, handler);
2707 }
2708
2709 pub fn unsubscribe_data(
2711 &mut self,
2712 data_type: DataType,
2713 client_id: Option<ClientId>,
2714 params: Option<IndexMap<String, String>>,
2715 ) {
2716 self.check_registered();
2717
2718 let topic = get_custom_topic(&data_type);
2719 self.remove_subscription(topic);
2720
2721 if client_id.is_none() {
2722 return;
2723 }
2724
2725 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2726 data_type,
2727 client_id,
2728 venue: None,
2729 command_id: UUID4::new(),
2730 ts_init: self.timestamp_ns(),
2731 correlation_id: None,
2732 params,
2733 });
2734
2735 self.send_data_cmd(DataCommand::Unsubscribe(command));
2736 }
2737
2738 pub fn unsubscribe_instruments(
2740 &mut self,
2741 venue: Venue,
2742 client_id: Option<ClientId>,
2743 params: Option<IndexMap<String, String>>,
2744 ) {
2745 self.check_registered();
2746
2747 let topic = get_instruments_topic(venue);
2748 self.remove_subscription(topic);
2749
2750 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2751 client_id,
2752 venue,
2753 command_id: UUID4::new(),
2754 ts_init: self.timestamp_ns(),
2755 correlation_id: None,
2756 params,
2757 });
2758
2759 self.send_data_cmd(DataCommand::Unsubscribe(command));
2760 }
2761
2762 pub fn unsubscribe_instrument(
2764 &mut self,
2765 instrument_id: InstrumentId,
2766 client_id: Option<ClientId>,
2767 params: Option<IndexMap<String, String>>,
2768 ) {
2769 self.check_registered();
2770
2771 let topic = get_instrument_topic(instrument_id);
2772 self.remove_subscription(topic);
2773
2774 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2775 instrument_id,
2776 client_id,
2777 venue: Some(instrument_id.venue),
2778 command_id: UUID4::new(),
2779 ts_init: self.timestamp_ns(),
2780 correlation_id: None,
2781 params,
2782 });
2783
2784 self.send_data_cmd(DataCommand::Unsubscribe(command));
2785 }
2786
2787 pub fn unsubscribe_book_deltas(
2789 &mut self,
2790 instrument_id: InstrumentId,
2791 client_id: Option<ClientId>,
2792 params: Option<IndexMap<String, String>>,
2793 ) {
2794 self.check_registered();
2795
2796 let topic = get_book_deltas_topic(instrument_id);
2797 self.remove_subscription(topic);
2798
2799 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2800 instrument_id,
2801 client_id,
2802 venue: Some(instrument_id.venue),
2803 command_id: UUID4::new(),
2804 ts_init: self.timestamp_ns(),
2805 correlation_id: None,
2806 params,
2807 });
2808
2809 self.send_data_cmd(DataCommand::Unsubscribe(command));
2810 }
2811
2812 pub fn unsubscribe_book_at_interval(
2814 &mut self,
2815 instrument_id: InstrumentId,
2816 interval_ms: NonZeroUsize,
2817 client_id: Option<ClientId>,
2818 params: Option<IndexMap<String, String>>,
2819 ) {
2820 self.check_registered();
2821
2822 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
2823 self.remove_subscription(topic);
2824
2825 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2826 instrument_id,
2827 client_id,
2828 venue: Some(instrument_id.venue),
2829 command_id: UUID4::new(),
2830 ts_init: self.timestamp_ns(),
2831 correlation_id: None,
2832 params,
2833 });
2834
2835 self.send_data_cmd(DataCommand::Unsubscribe(command));
2836 }
2837
2838 pub fn unsubscribe_quotes(
2840 &mut self,
2841 instrument_id: InstrumentId,
2842 client_id: Option<ClientId>,
2843 params: Option<IndexMap<String, String>>,
2844 ) {
2845 self.check_registered();
2846
2847 let topic = get_quotes_topic(instrument_id);
2848 self.remove_subscription(topic);
2849
2850 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2851 instrument_id,
2852 client_id,
2853 venue: Some(instrument_id.venue),
2854 command_id: UUID4::new(),
2855 ts_init: self.timestamp_ns(),
2856 correlation_id: None,
2857 params,
2858 });
2859
2860 self.send_data_cmd(DataCommand::Unsubscribe(command));
2861 }
2862
2863 pub fn unsubscribe_trades(
2865 &mut self,
2866 instrument_id: InstrumentId,
2867 client_id: Option<ClientId>,
2868 params: Option<IndexMap<String, String>>,
2869 ) {
2870 self.check_registered();
2871
2872 let topic = get_trades_topic(instrument_id);
2873 self.remove_subscription(topic);
2874
2875 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2876 instrument_id,
2877 client_id,
2878 venue: Some(instrument_id.venue),
2879 command_id: UUID4::new(),
2880 ts_init: self.timestamp_ns(),
2881 correlation_id: None,
2882 params,
2883 });
2884
2885 self.send_data_cmd(DataCommand::Unsubscribe(command));
2886 }
2887
2888 pub fn unsubscribe_bars(
2890 &mut self,
2891 bar_type: BarType,
2892 client_id: Option<ClientId>,
2893 params: Option<IndexMap<String, String>>,
2894 ) {
2895 self.check_registered();
2896
2897 let topic = get_bars_topic(bar_type);
2898 self.remove_subscription(topic);
2899
2900 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2901 bar_type,
2902 client_id,
2903 venue: Some(bar_type.instrument_id().venue),
2904 command_id: UUID4::new(),
2905 ts_init: self.timestamp_ns(),
2906 correlation_id: None,
2907 params,
2908 });
2909
2910 self.send_data_cmd(DataCommand::Unsubscribe(command));
2911 }
2912
2913 pub fn unsubscribe_mark_prices(
2915 &mut self,
2916 instrument_id: InstrumentId,
2917 client_id: Option<ClientId>,
2918 params: Option<IndexMap<String, String>>,
2919 ) {
2920 self.check_registered();
2921
2922 let topic = get_mark_price_topic(instrument_id);
2923 self.remove_subscription(topic);
2924
2925 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2926 instrument_id,
2927 client_id,
2928 venue: Some(instrument_id.venue),
2929 command_id: UUID4::new(),
2930 ts_init: self.timestamp_ns(),
2931 correlation_id: None,
2932 params,
2933 });
2934
2935 self.send_data_cmd(DataCommand::Unsubscribe(command));
2936 }
2937
2938 pub fn unsubscribe_index_prices(
2940 &mut self,
2941 instrument_id: InstrumentId,
2942 client_id: Option<ClientId>,
2943 params: Option<IndexMap<String, String>>,
2944 ) {
2945 self.check_registered();
2946
2947 let topic = get_index_price_topic(instrument_id);
2948 self.remove_subscription(topic);
2949
2950 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2951 instrument_id,
2952 client_id,
2953 venue: Some(instrument_id.venue),
2954 command_id: UUID4::new(),
2955 ts_init: self.timestamp_ns(),
2956 correlation_id: None,
2957 params,
2958 });
2959
2960 self.send_data_cmd(DataCommand::Unsubscribe(command));
2961 }
2962
2963 pub fn unsubscribe_funding_rates(
2965 &mut self,
2966 instrument_id: InstrumentId,
2967 client_id: Option<ClientId>,
2968 params: Option<IndexMap<String, String>>,
2969 ) {
2970 self.check_registered();
2971
2972 let topic = get_funding_rate_topic(instrument_id);
2973 self.remove_subscription(topic);
2974
2975 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
2976 instrument_id,
2977 client_id,
2978 venue: Some(instrument_id.venue),
2979 command_id: UUID4::new(),
2980 ts_init: self.timestamp_ns(),
2981 correlation_id: None,
2982 params,
2983 });
2984
2985 self.send_data_cmd(DataCommand::Unsubscribe(command));
2986 }
2987
2988 pub fn unsubscribe_instrument_status(
2990 &mut self,
2991 instrument_id: InstrumentId,
2992 client_id: Option<ClientId>,
2993 params: Option<IndexMap<String, String>>,
2994 ) {
2995 self.check_registered();
2996
2997 let topic = get_instrument_status_topic(instrument_id);
2998 self.remove_subscription(topic);
2999
3000 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3001 instrument_id,
3002 client_id,
3003 venue: Some(instrument_id.venue),
3004 command_id: UUID4::new(),
3005 ts_init: self.timestamp_ns(),
3006 correlation_id: None,
3007 params,
3008 });
3009
3010 self.send_data_cmd(DataCommand::Unsubscribe(command));
3011 }
3012
3013 pub fn unsubscribe_instrument_close(
3015 &mut self,
3016 instrument_id: InstrumentId,
3017 client_id: Option<ClientId>,
3018 params: Option<IndexMap<String, String>>,
3019 ) {
3020 self.check_registered();
3021
3022 let topic = get_instrument_close_topic(instrument_id);
3023 self.remove_subscription(topic);
3024
3025 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3026 instrument_id,
3027 client_id,
3028 venue: Some(instrument_id.venue),
3029 command_id: UUID4::new(),
3030 ts_init: self.timestamp_ns(),
3031 correlation_id: None,
3032 params,
3033 });
3034
3035 self.send_data_cmd(DataCommand::Unsubscribe(command));
3036 }
3037
3038 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3040 self.check_registered();
3041
3042 let topic = get_order_fills_topic(instrument_id);
3043 self.remove_subscription(topic);
3044 }
3045
3046 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3048 self.check_registered();
3049
3050 let topic = get_order_cancels_topic(instrument_id);
3051 self.remove_subscription(topic);
3052 }
3053
3054 #[allow(clippy::too_many_arguments)]
3060 pub fn request_data(
3061 &self,
3062 data_type: DataType,
3063 client_id: ClientId,
3064 start: Option<DateTime<Utc>>,
3065 end: Option<DateTime<Utc>>,
3066 limit: Option<NonZeroUsize>,
3067 params: Option<IndexMap<String, String>>,
3068 handler: ShareableMessageHandler,
3069 ) -> anyhow::Result<UUID4> {
3070 self.check_registered();
3071
3072 let now = self.clock_ref().utc_now();
3073 check_timestamps(now, start, end)?;
3074
3075 let request_id = UUID4::new();
3076 let command = RequestCommand::Data(RequestCustomData {
3077 client_id,
3078 data_type,
3079 start,
3080 end,
3081 limit,
3082 request_id,
3083 ts_init: self.timestamp_ns(),
3084 params,
3085 });
3086
3087 get_message_bus()
3088 .borrow_mut()
3089 .register_response_handler(command.request_id(), handler)?;
3090
3091 self.send_data_cmd(DataCommand::Request(command));
3092
3093 Ok(request_id)
3094 }
3095
3096 pub fn request_instrument(
3102 &self,
3103 instrument_id: InstrumentId,
3104 start: Option<DateTime<Utc>>,
3105 end: Option<DateTime<Utc>>,
3106 client_id: Option<ClientId>,
3107 params: Option<IndexMap<String, String>>,
3108 handler: ShareableMessageHandler,
3109 ) -> anyhow::Result<UUID4> {
3110 self.check_registered();
3111
3112 let now = self.clock_ref().utc_now();
3113 check_timestamps(now, start, end)?;
3114
3115 let request_id = UUID4::new();
3116 let command = RequestCommand::Instrument(RequestInstrument {
3117 instrument_id,
3118 start,
3119 end,
3120 client_id,
3121 request_id,
3122 ts_init: now.into(),
3123 params,
3124 });
3125
3126 get_message_bus()
3127 .borrow_mut()
3128 .register_response_handler(command.request_id(), handler)?;
3129
3130 self.send_data_cmd(DataCommand::Request(command));
3131
3132 Ok(request_id)
3133 }
3134
3135 pub fn request_instruments(
3141 &self,
3142 venue: Option<Venue>,
3143 start: Option<DateTime<Utc>>,
3144 end: Option<DateTime<Utc>>,
3145 client_id: Option<ClientId>,
3146 params: Option<IndexMap<String, String>>,
3147 handler: ShareableMessageHandler,
3148 ) -> anyhow::Result<UUID4> {
3149 self.check_registered();
3150
3151 let now = self.clock_ref().utc_now();
3152 check_timestamps(now, start, end)?;
3153
3154 let request_id = UUID4::new();
3155 let command = RequestCommand::Instruments(RequestInstruments {
3156 venue,
3157 start,
3158 end,
3159 client_id,
3160 request_id,
3161 ts_init: now.into(),
3162 params,
3163 });
3164
3165 get_message_bus()
3166 .borrow_mut()
3167 .register_response_handler(command.request_id(), handler)?;
3168
3169 self.send_data_cmd(DataCommand::Request(command));
3170
3171 Ok(request_id)
3172 }
3173
3174 pub fn request_book_snapshot(
3180 &self,
3181 instrument_id: InstrumentId,
3182 depth: Option<NonZeroUsize>,
3183 client_id: Option<ClientId>,
3184 params: Option<IndexMap<String, String>>,
3185 handler: ShareableMessageHandler,
3186 ) -> anyhow::Result<UUID4> {
3187 self.check_registered();
3188
3189 let request_id = UUID4::new();
3190 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3191 instrument_id,
3192 depth,
3193 client_id,
3194 request_id,
3195 ts_init: self.timestamp_ns(),
3196 params,
3197 });
3198
3199 get_message_bus()
3200 .borrow_mut()
3201 .register_response_handler(command.request_id(), handler)?;
3202
3203 self.send_data_cmd(DataCommand::Request(command));
3204
3205 Ok(request_id)
3206 }
3207
3208 #[allow(clippy::too_many_arguments)]
3214 pub fn request_quotes(
3215 &self,
3216 instrument_id: InstrumentId,
3217 start: Option<DateTime<Utc>>,
3218 end: Option<DateTime<Utc>>,
3219 limit: Option<NonZeroUsize>,
3220 client_id: Option<ClientId>,
3221 params: Option<IndexMap<String, String>>,
3222 handler: ShareableMessageHandler,
3223 ) -> anyhow::Result<UUID4> {
3224 self.check_registered();
3225
3226 let now = self.clock_ref().utc_now();
3227 check_timestamps(now, start, end)?;
3228
3229 let request_id = UUID4::new();
3230 let command = RequestCommand::Quotes(RequestQuotes {
3231 instrument_id,
3232 start,
3233 end,
3234 limit,
3235 client_id,
3236 request_id,
3237 ts_init: now.into(),
3238 params,
3239 });
3240
3241 get_message_bus()
3242 .borrow_mut()
3243 .register_response_handler(command.request_id(), handler)?;
3244
3245 self.send_data_cmd(DataCommand::Request(command));
3246
3247 Ok(request_id)
3248 }
3249
3250 #[allow(clippy::too_many_arguments)]
3256 pub fn request_trades(
3257 &self,
3258 instrument_id: InstrumentId,
3259 start: Option<DateTime<Utc>>,
3260 end: Option<DateTime<Utc>>,
3261 limit: Option<NonZeroUsize>,
3262 client_id: Option<ClientId>,
3263 params: Option<IndexMap<String, String>>,
3264 handler: ShareableMessageHandler,
3265 ) -> anyhow::Result<UUID4> {
3266 self.check_registered();
3267
3268 let now = self.clock_ref().utc_now();
3269 check_timestamps(now, start, end)?;
3270
3271 let request_id = UUID4::new();
3272 let command = RequestCommand::Trades(RequestTrades {
3273 instrument_id,
3274 start,
3275 end,
3276 limit,
3277 client_id,
3278 request_id,
3279 ts_init: now.into(),
3280 params,
3281 });
3282
3283 get_message_bus()
3284 .borrow_mut()
3285 .register_response_handler(command.request_id(), handler)?;
3286
3287 self.send_data_cmd(DataCommand::Request(command));
3288
3289 Ok(request_id)
3290 }
3291
3292 #[allow(clippy::too_many_arguments)]
3298 pub fn request_bars(
3299 &self,
3300 bar_type: BarType,
3301 start: Option<DateTime<Utc>>,
3302 end: Option<DateTime<Utc>>,
3303 limit: Option<NonZeroUsize>,
3304 client_id: Option<ClientId>,
3305 params: Option<IndexMap<String, String>>,
3306 handler: ShareableMessageHandler,
3307 ) -> anyhow::Result<UUID4> {
3308 self.check_registered();
3309
3310 let now = self.clock_ref().utc_now();
3311 check_timestamps(now, start, end)?;
3312
3313 let request_id = UUID4::new();
3314 let command = RequestCommand::Bars(RequestBars {
3315 bar_type,
3316 start,
3317 end,
3318 limit,
3319 client_id,
3320 request_id,
3321 ts_init: now.into(),
3322 params,
3323 });
3324
3325 get_message_bus()
3326 .borrow_mut()
3327 .register_response_handler(command.request_id(), handler)?;
3328
3329 self.send_data_cmd(DataCommand::Request(command));
3330
3331 Ok(request_id)
3332 }
3333}
3334
3335fn check_timestamps(
3336 now: DateTime<Utc>,
3337 start: Option<DateTime<Utc>>,
3338 end: Option<DateTime<Utc>>,
3339) -> anyhow::Result<()> {
3340 if let Some(start) = start {
3341 check_predicate_true(start <= now, "start was > now")?;
3342 }
3343 if let Some(end) = end {
3344 check_predicate_true(end <= now, "end was > now")?;
3345 }
3346
3347 if let (Some(start), Some(end)) = (start, end) {
3348 check_predicate_true(start < end, "start was >= end")?;
3349 }
3350
3351 Ok(())
3352}
3353
3354fn log_error(e: &anyhow::Error) {
3355 log::error!("{e}");
3356}
3357
3358fn log_not_running<T>(msg: &T)
3359where
3360 T: Debug,
3361{
3362 log::warn!("Received message when not running - skipping {msg:?}");
3364}
3365
3366fn log_received<T>(msg: &T)
3367where
3368 T: Debug,
3369{
3370 log::debug!("{RECV} {msg:?}");
3371}