1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use chrono::{DateTime, Utc};
28use indexmap::IndexMap;
29use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
30#[cfg(feature = "defi")]
31use nautilus_model::defi::{
32 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
33};
34use nautilus_model::{
35 data::{
36 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
37 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
38 },
39 enums::BookType,
40 events::order::filled::OrderFilled,
41 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
42 instruments::InstrumentAny,
43 orderbook::OrderBook,
44};
45use ustr::Ustr;
46
47#[cfg(feature = "indicators")]
48use super::indicators::Indicators;
49use super::{
50 Actor,
51 registry::{get_actor_unchecked, try_get_actor_unchecked},
52};
53#[cfg(feature = "defi")]
54use crate::defi;
55#[cfg(feature = "defi")]
56#[allow(unused_imports)]
57use crate::defi::data_actor as _; use crate::{
59 cache::Cache,
60 clock::Clock,
61 component::Component,
62 enums::{ComponentState, ComponentTrigger},
63 logging::{CMD, RECV, REQ, SEND},
64 messages::{
65 data::{
66 BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
67 InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
68 RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
69 SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
70 SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
71 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
72 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
73 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
74 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
75 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
76 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
77 },
78 system::ShutdownSystem,
79 },
80 msgbus::{
81 self, MStr, Topic, get_message_bus,
82 handler::{ShareableMessageHandler, TypedMessageHandler},
83 switchboard::{
84 MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
85 get_custom_topic, get_funding_rate_topic, get_index_price_topic,
86 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
87 get_instruments_topic, get_mark_price_topic, get_order_fills_topic, get_quotes_topic,
88 get_trades_topic,
89 },
90 },
91 signal::Signal,
92 timer::{TimeEvent, TimeEventCallback},
93};
94
95#[derive(Debug, Clone)]
97#[cfg_attr(
98 feature = "python",
99 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", subclass)
100)]
101pub struct DataActorConfig {
102 pub actor_id: Option<ActorId>,
104 pub log_events: bool,
106 pub log_commands: bool,
108}
109
110impl Default for DataActorConfig {
111 fn default() -> Self {
112 Self {
113 actor_id: None,
114 log_events: true,
115 log_commands: true,
116 }
117 }
118}
119
120#[derive(Debug, Clone)]
122#[cfg_attr(
123 feature = "python",
124 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
125)]
126pub struct ImportableActorConfig {
127 pub actor_path: String,
129 pub config_path: String,
131 pub config: HashMap<String, serde_json::Value>,
133}
134
135type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; pub trait DataActor:
138 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
139{
140 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
146 Ok(IndexMap::new())
147 }
148
149 #[allow(unused_variables)]
155 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
156 Ok(())
157 }
158
159 fn on_start(&mut self) -> anyhow::Result<()> {
165 log::warn!(
166 "The `on_start` handler was called when not overridden, \
167 it's expected that any actions required when starting the actor \
168 occur here, such as subscribing/requesting data"
169 );
170 Ok(())
171 }
172
173 fn on_stop(&mut self) -> anyhow::Result<()> {
179 log::warn!(
180 "The `on_stop` handler was called when not overridden, \
181 it's expected that any actions required when stopping the actor \
182 occur here, such as unsubscribing from data",
183 );
184 Ok(())
185 }
186
187 fn on_resume(&mut self) -> anyhow::Result<()> {
193 log::warn!(
194 "The `on_resume` handler was called when not overridden, \
195 it's expected that any actions required when resuming the actor \
196 following a stop occur here"
197 );
198 Ok(())
199 }
200
201 fn on_reset(&mut self) -> anyhow::Result<()> {
207 log::warn!(
208 "The `on_reset` handler was called when not overridden, \
209 it's expected that any actions required when resetting the actor \
210 occur here, such as resetting indicators and other state"
211 );
212 Ok(())
213 }
214
215 fn on_dispose(&mut self) -> anyhow::Result<()> {
221 Ok(())
222 }
223
224 fn on_degrade(&mut self) -> anyhow::Result<()> {
230 Ok(())
231 }
232
233 fn on_fault(&mut self) -> anyhow::Result<()> {
239 Ok(())
240 }
241
242 #[allow(unused_variables)]
248 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
249 Ok(())
250 }
251
252 #[allow(unused_variables)]
258 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
259 Ok(())
260 }
261
262 #[allow(unused_variables)]
268 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
269 Ok(())
270 }
271
272 #[allow(unused_variables)]
278 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
279 Ok(())
280 }
281
282 #[allow(unused_variables)]
288 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
289 Ok(())
290 }
291
292 #[allow(unused_variables)]
298 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
299 Ok(())
300 }
301
302 #[allow(unused_variables)]
308 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
309 Ok(())
310 }
311
312 #[allow(unused_variables)]
318 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
319 Ok(())
320 }
321
322 #[allow(unused_variables)]
328 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
329 Ok(())
330 }
331
332 #[allow(unused_variables)]
338 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
339 Ok(())
340 }
341
342 #[allow(unused_variables)]
348 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
349 Ok(())
350 }
351
352 #[allow(unused_variables)]
358 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
359 Ok(())
360 }
361
362 #[allow(unused_variables)]
368 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
369 Ok(())
370 }
371
372 #[allow(unused_variables)]
378 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
379 Ok(())
380 }
381
382 #[allow(unused_variables)]
388 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
389 Ok(())
390 }
391
392 #[cfg(feature = "defi")]
393 #[allow(unused_variables)]
399 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
400 Ok(())
401 }
402
403 #[cfg(feature = "defi")]
404 #[allow(unused_variables)]
410 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
411 Ok(())
412 }
413
414 #[cfg(feature = "defi")]
415 #[allow(unused_variables)]
421 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
422 Ok(())
423 }
424
425 #[cfg(feature = "defi")]
426 #[allow(unused_variables)]
432 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
433 Ok(())
434 }
435
436 #[cfg(feature = "defi")]
437 #[allow(unused_variables)]
443 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
444 Ok(())
445 }
446
447 #[cfg(feature = "defi")]
448 #[allow(unused_variables)]
454 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
455 Ok(())
456 }
457
458 #[allow(unused_variables)]
464 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
465 Ok(())
466 }
467
468 #[allow(unused_variables)]
474 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
475 Ok(())
476 }
477
478 #[allow(unused_variables)]
484 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
485 Ok(())
486 }
487
488 #[allow(unused_variables)]
494 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
495 Ok(())
496 }
497
498 #[allow(unused_variables)]
504 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
505 Ok(())
506 }
507
508 #[allow(unused_variables)]
514 fn on_historical_index_prices(
515 &mut self,
516 index_prices: &[IndexPriceUpdate],
517 ) -> anyhow::Result<()> {
518 Ok(())
519 }
520
521 fn handle_time_event(&mut self, event: &TimeEvent) {
523 log_received(&event);
524
525 if let Err(e) = DataActor::on_time_event(self, event) {
526 log_error(&e);
527 }
528 }
529
530 fn handle_data(&mut self, data: &dyn Any) {
532 log_received(&data);
533
534 if self.not_running() {
535 log_not_running(&data);
536 return;
537 }
538
539 if let Err(e) = self.on_data(data) {
540 log_error(&e);
541 }
542 }
543
544 fn handle_signal(&mut self, signal: &Signal) {
546 log_received(&signal);
547
548 if self.not_running() {
549 log_not_running(&signal);
550 return;
551 }
552
553 if let Err(e) = self.on_signal(signal) {
554 log_error(&e);
555 }
556 }
557
558 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
560 log_received(&instrument);
561
562 if self.not_running() {
563 log_not_running(&instrument);
564 return;
565 }
566
567 if let Err(e) = self.on_instrument(instrument) {
568 log_error(&e);
569 }
570 }
571
572 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
574 log_received(&deltas);
575
576 if self.not_running() {
577 log_not_running(&deltas);
578 return;
579 }
580
581 if let Err(e) = self.on_book_deltas(deltas) {
582 log_error(&e);
583 }
584 }
585
586 fn handle_book(&mut self, book: &OrderBook) {
588 log_received(&book);
589
590 if self.not_running() {
591 log_not_running(&book);
592 return;
593 }
594
595 if let Err(e) = self.on_book(book) {
596 log_error(&e);
597 };
598 }
599
600 fn handle_quote(&mut self, quote: &QuoteTick) {
602 log_received("e);
603
604 if self.not_running() {
605 log_not_running("e);
606 return;
607 }
608
609 if let Err(e) = self.on_quote(quote) {
610 log_error(&e);
611 }
612 }
613
614 fn handle_trade(&mut self, trade: &TradeTick) {
616 log_received(&trade);
617
618 if self.not_running() {
619 log_not_running(&trade);
620 return;
621 }
622
623 if let Err(e) = self.on_trade(trade) {
624 log_error(&e);
625 }
626 }
627
628 fn handle_bar(&mut self, bar: &Bar) {
630 log_received(&bar);
631
632 if self.not_running() {
633 log_not_running(&bar);
634 return;
635 }
636
637 if let Err(e) = self.on_bar(bar) {
638 log_error(&e);
639 }
640 }
641
642 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
644 log_received(&mark_price);
645
646 if self.not_running() {
647 log_not_running(&mark_price);
648 return;
649 }
650
651 if let Err(e) = self.on_mark_price(mark_price) {
652 log_error(&e);
653 }
654 }
655
656 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
658 log_received(&index_price);
659
660 if self.not_running() {
661 log_not_running(&index_price);
662 return;
663 }
664
665 if let Err(e) = self.on_index_price(index_price) {
666 log_error(&e);
667 }
668 }
669
670 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
672 log_received(&funding_rate);
673
674 if self.not_running() {
675 log_not_running(&funding_rate);
676 return;
677 }
678
679 if let Err(e) = self.on_funding_rate(funding_rate) {
680 log_error(&e);
681 }
682 }
683
684 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
686 log_received(&status);
687
688 if self.not_running() {
689 log_not_running(&status);
690 return;
691 }
692
693 if let Err(e) = self.on_instrument_status(status) {
694 log_error(&e);
695 }
696 }
697
698 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
700 log_received(&close);
701
702 if self.not_running() {
703 log_not_running(&close);
704 return;
705 }
706
707 if let Err(e) = self.on_instrument_close(close) {
708 log_error(&e);
709 }
710 }
711
712 fn handle_order_filled(&mut self, event: &OrderFilled) {
714 log_received(&event);
715
716 if event.strategy_id.inner() == self.actor_id().inner() {
720 return;
721 }
722
723 if self.not_running() {
724 log_not_running(&event);
725 return;
726 }
727
728 if let Err(e) = self.on_order_filled(event) {
729 log_error(&e);
730 }
731 }
732
733 #[cfg(feature = "defi")]
734 fn handle_block(&mut self, block: &Block) {
736 log_received(&block);
737
738 if self.not_running() {
739 log_not_running(&block);
740 return;
741 }
742
743 if let Err(e) = self.on_block(block) {
744 log_error(&e);
745 }
746 }
747
748 #[cfg(feature = "defi")]
749 fn handle_pool(&mut self, pool: &Pool) {
751 log_received(&pool);
752
753 if self.not_running() {
754 log_not_running(&pool);
755 return;
756 }
757
758 if let Err(e) = self.on_pool(pool) {
759 log_error(&e);
760 }
761 }
762
763 #[cfg(feature = "defi")]
764 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
766 log_received(&swap);
767
768 if self.not_running() {
769 log_not_running(&swap);
770 return;
771 }
772
773 if let Err(e) = self.on_pool_swap(swap) {
774 log_error(&e);
775 }
776 }
777
778 #[cfg(feature = "defi")]
779 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
781 log_received(&update);
782
783 if self.not_running() {
784 log_not_running(&update);
785 return;
786 }
787
788 if let Err(e) = self.on_pool_liquidity_update(update) {
789 log_error(&e);
790 }
791 }
792
793 #[cfg(feature = "defi")]
794 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
796 log_received(&collect);
797
798 if self.not_running() {
799 log_not_running(&collect);
800 return;
801 }
802
803 if let Err(e) = self.on_pool_fee_collect(collect) {
804 log_error(&e);
805 }
806 }
807
808 #[cfg(feature = "defi")]
809 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
811 log_received(&flash);
812
813 if self.not_running() {
814 log_not_running(&flash);
815 return;
816 }
817
818 if let Err(e) = self.on_pool_flash(flash) {
819 log_error(&e);
820 }
821 }
822
823 fn handle_historical_data(&mut self, data: &dyn Any) {
825 log_received(&data);
826
827 if let Err(e) = self.on_historical_data(data) {
828 log_error(&e);
829 }
830 }
831
832 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
834 log_received(&resp);
835
836 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
837 log_error(&e);
838 }
839 }
840
841 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
843 log_received(&resp);
844
845 if let Err(e) = self.on_instrument(&resp.data) {
846 log_error(&e);
847 }
848 }
849
850 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
852 log_received(&resp);
853
854 for inst in &resp.data {
855 if let Err(e) = self.on_instrument(inst) {
856 log_error(&e);
857 }
858 }
859 }
860
861 fn handle_book_response(&mut self, resp: &BookResponse) {
863 log_received(&resp);
864
865 if let Err(e) = self.on_book(&resp.data) {
866 log_error(&e);
867 }
868 }
869
870 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
872 log_received(&resp);
873
874 if let Err(e) = self.on_historical_quotes(&resp.data) {
875 log_error(&e);
876 }
877 }
878
879 fn handle_trades_response(&mut self, resp: &TradesResponse) {
881 log_received(&resp);
882
883 if let Err(e) = self.on_historical_trades(&resp.data) {
884 log_error(&e);
885 }
886 }
887
888 fn handle_bars_response(&mut self, resp: &BarsResponse) {
890 log_received(&resp);
891
892 if let Err(e) = self.on_historical_bars(&resp.data) {
893 log_error(&e);
894 }
895 }
896
897 fn subscribe_data(
899 &mut self,
900 data_type: DataType,
901 client_id: Option<ClientId>,
902 params: Option<IndexMap<String, String>>,
903 ) where
904 Self: 'static + Debug + Sized,
905 {
906 let actor_id = self.actor_id().inner();
907 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
908 move |data: &dyn Any| {
909 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
910 },
911 )));
912
913 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
914 }
915
916 fn subscribe_quotes(
918 &mut self,
919 instrument_id: InstrumentId,
920 client_id: Option<ClientId>,
921 params: Option<IndexMap<String, String>>,
922 ) where
923 Self: 'static + Debug + Sized,
924 {
925 let actor_id = self.actor_id().inner();
926 let topic = get_quotes_topic(instrument_id);
927
928 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
929 move |quote: &QuoteTick| {
930 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
931 actor.handle_quote(quote);
932 } else {
933 log::error!("Actor {actor_id} not found for quote handling");
934 }
935 },
936 )));
937
938 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
939 }
940
941 fn subscribe_instruments(
943 &mut self,
944 venue: Venue,
945 client_id: Option<ClientId>,
946 params: Option<IndexMap<String, String>>,
947 ) where
948 Self: 'static + Debug + Sized,
949 {
950 let actor_id = self.actor_id().inner();
951 let topic = get_instruments_topic(venue);
952
953 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
954 move |instrument: &InstrumentAny| {
955 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
956 actor.handle_instrument(instrument);
957 } else {
958 log::error!("Actor {actor_id} not found for instruments handling");
959 }
960 },
961 )));
962
963 DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
964 }
965
966 fn subscribe_instrument(
968 &mut self,
969 instrument_id: InstrumentId,
970 client_id: Option<ClientId>,
971 params: Option<IndexMap<String, String>>,
972 ) where
973 Self: 'static + Debug + Sized,
974 {
975 let actor_id = self.actor_id().inner();
976 let topic = get_instrument_topic(instrument_id);
977
978 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
979 move |instrument: &InstrumentAny| {
980 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
981 actor.handle_instrument(instrument);
982 } else {
983 log::error!("Actor {actor_id} not found for instrument handling");
984 }
985 },
986 )));
987
988 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
989 }
990
991 fn subscribe_book_deltas(
993 &mut self,
994 instrument_id: InstrumentId,
995 book_type: BookType,
996 depth: Option<NonZeroUsize>,
997 client_id: Option<ClientId>,
998 managed: bool,
999 params: Option<IndexMap<String, String>>,
1000 ) where
1001 Self: 'static + Debug + Sized,
1002 {
1003 let actor_id = self.actor_id().inner();
1004 let topic = get_book_deltas_topic(instrument_id);
1005
1006 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1007 move |deltas: &OrderBookDeltas| {
1008 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1009 },
1010 )));
1011
1012 DataActorCore::subscribe_book_deltas(
1013 self,
1014 topic,
1015 handler,
1016 instrument_id,
1017 book_type,
1018 depth,
1019 client_id,
1020 managed,
1021 params,
1022 );
1023 }
1024
1025 fn subscribe_book_at_interval(
1027 &mut self,
1028 instrument_id: InstrumentId,
1029 book_type: BookType,
1030 depth: Option<NonZeroUsize>,
1031 interval_ms: NonZeroUsize,
1032 client_id: Option<ClientId>,
1033 params: Option<IndexMap<String, String>>,
1034 ) where
1035 Self: 'static + Debug + Sized,
1036 {
1037 let actor_id = self.actor_id().inner();
1038 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1039
1040 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1041 move |book: &OrderBook| {
1042 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1043 },
1044 )));
1045
1046 DataActorCore::subscribe_book_at_interval(
1047 self,
1048 topic,
1049 handler,
1050 instrument_id,
1051 book_type,
1052 depth,
1053 interval_ms,
1054 client_id,
1055 params,
1056 );
1057 }
1058
1059 fn subscribe_trades(
1061 &mut self,
1062 instrument_id: InstrumentId,
1063 client_id: Option<ClientId>,
1064 params: Option<IndexMap<String, String>>,
1065 ) where
1066 Self: 'static + Debug + Sized,
1067 {
1068 let actor_id = self.actor_id().inner();
1069 let topic = get_trades_topic(instrument_id);
1070
1071 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1072 move |trade: &TradeTick| {
1073 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1074 },
1075 )));
1076
1077 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1078 }
1079
1080 fn subscribe_bars(
1082 &mut self,
1083 bar_type: BarType,
1084 client_id: Option<ClientId>,
1085 params: Option<IndexMap<String, String>>,
1086 ) where
1087 Self: 'static + Debug + Sized,
1088 {
1089 let actor_id = self.actor_id().inner();
1090 let topic = get_bars_topic(bar_type);
1091
1092 let handler =
1093 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1094 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1095 })));
1096
1097 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1098 }
1099
1100 fn subscribe_mark_prices(
1102 &mut self,
1103 instrument_id: InstrumentId,
1104 client_id: Option<ClientId>,
1105 params: Option<IndexMap<String, String>>,
1106 ) where
1107 Self: 'static + Debug + Sized,
1108 {
1109 let actor_id = self.actor_id().inner();
1110 let topic = get_mark_price_topic(instrument_id);
1111
1112 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1113 move |mark_price: &MarkPriceUpdate| {
1114 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1115 },
1116 )));
1117
1118 DataActorCore::subscribe_mark_prices(
1119 self,
1120 topic,
1121 handler,
1122 instrument_id,
1123 client_id,
1124 params,
1125 );
1126 }
1127
1128 fn subscribe_index_prices(
1130 &mut self,
1131 instrument_id: InstrumentId,
1132 client_id: Option<ClientId>,
1133 params: Option<IndexMap<String, String>>,
1134 ) where
1135 Self: 'static + Debug + Sized,
1136 {
1137 let actor_id = self.actor_id().inner();
1138 let topic = get_index_price_topic(instrument_id);
1139
1140 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1141 move |index_price: &IndexPriceUpdate| {
1142 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1143 },
1144 )));
1145
1146 DataActorCore::subscribe_index_prices(
1147 self,
1148 topic,
1149 handler,
1150 instrument_id,
1151 client_id,
1152 params,
1153 );
1154 }
1155
1156 fn subscribe_funding_rates(
1158 &mut self,
1159 instrument_id: InstrumentId,
1160 client_id: Option<ClientId>,
1161 params: Option<IndexMap<String, String>>,
1162 ) where
1163 Self: 'static + Debug + Sized,
1164 {
1165 let actor_id = self.actor_id().inner();
1166 let topic = get_funding_rate_topic(instrument_id);
1167
1168 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1169 move |funding_rate: &FundingRateUpdate| {
1170 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1171 },
1172 )));
1173
1174 DataActorCore::subscribe_funding_rates(
1175 self,
1176 topic,
1177 handler,
1178 instrument_id,
1179 client_id,
1180 params,
1181 );
1182 }
1183
1184 fn subscribe_instrument_status(
1186 &mut self,
1187 instrument_id: InstrumentId,
1188 client_id: Option<ClientId>,
1189 params: Option<IndexMap<String, String>>,
1190 ) where
1191 Self: 'static + Debug + Sized,
1192 {
1193 let actor_id = self.actor_id().inner();
1194 let topic = get_instrument_status_topic(instrument_id);
1195
1196 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1197 move |status: &InstrumentStatus| {
1198 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1199 },
1200 )));
1201
1202 DataActorCore::subscribe_instrument_status(
1203 self,
1204 topic,
1205 handler,
1206 instrument_id,
1207 client_id,
1208 params,
1209 );
1210 }
1211
1212 fn subscribe_instrument_close(
1214 &mut self,
1215 instrument_id: InstrumentId,
1216 client_id: Option<ClientId>,
1217 params: Option<IndexMap<String, String>>,
1218 ) where
1219 Self: 'static + Debug + Sized,
1220 {
1221 let actor_id = self.actor_id().inner();
1222 let topic = get_instrument_close_topic(instrument_id);
1223
1224 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1225 move |close: &InstrumentClose| {
1226 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1227 },
1228 )));
1229
1230 DataActorCore::subscribe_instrument_close(
1231 self,
1232 topic,
1233 handler,
1234 instrument_id,
1235 client_id,
1236 params,
1237 );
1238 }
1239
1240 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1242 where
1243 Self: 'static + Debug + Sized,
1244 {
1245 let actor_id = self.actor_id().inner();
1246 let topic = get_order_fills_topic(instrument_id);
1247
1248 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1249 move |event: &OrderFilled| {
1250 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(event);
1251 },
1252 )));
1253
1254 DataActorCore::subscribe_order_fills(self, topic, handler);
1255 }
1256
1257 #[cfg(feature = "defi")]
1258 fn subscribe_blocks(
1260 &mut self,
1261 chain: Blockchain,
1262 client_id: Option<ClientId>,
1263 params: Option<IndexMap<String, String>>,
1264 ) where
1265 Self: 'static + Debug + Sized,
1266 {
1267 let actor_id = self.actor_id().inner();
1268 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1269
1270 let handler =
1271 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1272 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1273 })));
1274
1275 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1276 }
1277
1278 #[cfg(feature = "defi")]
1279 fn subscribe_pool(
1281 &mut self,
1282 instrument_id: InstrumentId,
1283 client_id: Option<ClientId>,
1284 params: Option<IndexMap<String, String>>,
1285 ) where
1286 Self: 'static + Debug + Sized,
1287 {
1288 let actor_id = self.actor_id().inner();
1289 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1290
1291 let handler =
1292 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1293 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1294 })));
1295
1296 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1297 }
1298
1299 #[cfg(feature = "defi")]
1300 fn subscribe_pool_swaps(
1302 &mut self,
1303 instrument_id: InstrumentId,
1304 client_id: Option<ClientId>,
1305 params: Option<IndexMap<String, String>>,
1306 ) where
1307 Self: 'static + Debug + Sized,
1308 {
1309 let actor_id = self.actor_id().inner();
1310 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1311
1312 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1313 move |swap: &PoolSwap| {
1314 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1315 },
1316 )));
1317
1318 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1319 }
1320
1321 #[cfg(feature = "defi")]
1322 fn subscribe_pool_liquidity_updates(
1324 &mut self,
1325 instrument_id: InstrumentId,
1326 client_id: Option<ClientId>,
1327 params: Option<IndexMap<String, String>>,
1328 ) where
1329 Self: 'static + Debug + Sized,
1330 {
1331 let actor_id = self.actor_id().inner();
1332 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1333
1334 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1335 move |update: &PoolLiquidityUpdate| {
1336 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1337 },
1338 )));
1339
1340 DataActorCore::subscribe_pool_liquidity_updates(
1341 self,
1342 topic,
1343 handler,
1344 instrument_id,
1345 client_id,
1346 params,
1347 );
1348 }
1349
1350 #[cfg(feature = "defi")]
1351 fn subscribe_pool_fee_collects(
1353 &mut self,
1354 instrument_id: InstrumentId,
1355 client_id: Option<ClientId>,
1356 params: Option<IndexMap<String, String>>,
1357 ) where
1358 Self: 'static + Debug + Sized,
1359 {
1360 let actor_id = self.actor_id().inner();
1361 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1362
1363 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1364 move |collect: &PoolFeeCollect| {
1365 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1366 },
1367 )));
1368
1369 DataActorCore::subscribe_pool_fee_collects(
1370 self,
1371 topic,
1372 handler,
1373 instrument_id,
1374 client_id,
1375 params,
1376 );
1377 }
1378
1379 #[cfg(feature = "defi")]
1380 fn subscribe_pool_flash_events(
1382 &mut self,
1383 instrument_id: InstrumentId,
1384 client_id: Option<ClientId>,
1385 params: Option<IndexMap<String, String>>,
1386 ) where
1387 Self: 'static + Debug + Sized,
1388 {
1389 let actor_id = self.actor_id().inner();
1390 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1391
1392 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1393 move |flash: &PoolFlash| {
1394 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1395 },
1396 )));
1397
1398 DataActorCore::subscribe_pool_flash_events(
1399 self,
1400 topic,
1401 handler,
1402 instrument_id,
1403 client_id,
1404 params,
1405 );
1406 }
1407
1408 fn unsubscribe_data(
1410 &mut self,
1411 data_type: DataType,
1412 client_id: Option<ClientId>,
1413 params: Option<IndexMap<String, String>>,
1414 ) where
1415 Self: 'static + Debug + Sized,
1416 {
1417 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1418 }
1419
1420 fn unsubscribe_instruments(
1422 &mut self,
1423 venue: Venue,
1424 client_id: Option<ClientId>,
1425 params: Option<IndexMap<String, String>>,
1426 ) where
1427 Self: 'static + Debug + Sized,
1428 {
1429 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1430 }
1431
1432 fn unsubscribe_instrument(
1434 &mut self,
1435 instrument_id: InstrumentId,
1436 client_id: Option<ClientId>,
1437 params: Option<IndexMap<String, String>>,
1438 ) where
1439 Self: 'static + Debug + Sized,
1440 {
1441 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1442 }
1443
1444 fn unsubscribe_book_deltas(
1446 &mut self,
1447 instrument_id: InstrumentId,
1448 client_id: Option<ClientId>,
1449 params: Option<IndexMap<String, String>>,
1450 ) where
1451 Self: 'static + Debug + Sized,
1452 {
1453 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1454 }
1455
1456 fn unsubscribe_book_at_interval(
1458 &mut self,
1459 instrument_id: InstrumentId,
1460 interval_ms: NonZeroUsize,
1461 client_id: Option<ClientId>,
1462 params: Option<IndexMap<String, String>>,
1463 ) where
1464 Self: 'static + Debug + Sized,
1465 {
1466 DataActorCore::unsubscribe_book_at_interval(
1467 self,
1468 instrument_id,
1469 interval_ms,
1470 client_id,
1471 params,
1472 );
1473 }
1474
1475 fn unsubscribe_quotes(
1477 &mut self,
1478 instrument_id: InstrumentId,
1479 client_id: Option<ClientId>,
1480 params: Option<IndexMap<String, String>>,
1481 ) where
1482 Self: 'static + Debug + Sized,
1483 {
1484 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1485 }
1486
1487 fn unsubscribe_trades(
1489 &mut self,
1490 instrument_id: InstrumentId,
1491 client_id: Option<ClientId>,
1492 params: Option<IndexMap<String, String>>,
1493 ) where
1494 Self: 'static + Debug + Sized,
1495 {
1496 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1497 }
1498
1499 fn unsubscribe_bars(
1501 &mut self,
1502 bar_type: BarType,
1503 client_id: Option<ClientId>,
1504 params: Option<IndexMap<String, String>>,
1505 ) where
1506 Self: 'static + Debug + Sized,
1507 {
1508 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1509 }
1510
1511 fn unsubscribe_mark_prices(
1513 &mut self,
1514 instrument_id: InstrumentId,
1515 client_id: Option<ClientId>,
1516 params: Option<IndexMap<String, String>>,
1517 ) where
1518 Self: 'static + Debug + Sized,
1519 {
1520 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1521 }
1522
1523 fn unsubscribe_index_prices(
1525 &mut self,
1526 instrument_id: InstrumentId,
1527 client_id: Option<ClientId>,
1528 params: Option<IndexMap<String, String>>,
1529 ) where
1530 Self: 'static + Debug + Sized,
1531 {
1532 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1533 }
1534
1535 fn unsubscribe_funding_rates(
1537 &mut self,
1538 instrument_id: InstrumentId,
1539 client_id: Option<ClientId>,
1540 params: Option<IndexMap<String, String>>,
1541 ) where
1542 Self: 'static + Debug + Sized,
1543 {
1544 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1545 }
1546
1547 fn unsubscribe_instrument_status(
1549 &mut self,
1550 instrument_id: InstrumentId,
1551 client_id: Option<ClientId>,
1552 params: Option<IndexMap<String, String>>,
1553 ) where
1554 Self: 'static + Debug + Sized,
1555 {
1556 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1557 }
1558
1559 fn unsubscribe_instrument_close(
1561 &mut self,
1562 instrument_id: InstrumentId,
1563 client_id: Option<ClientId>,
1564 params: Option<IndexMap<String, String>>,
1565 ) where
1566 Self: 'static + Debug + Sized,
1567 {
1568 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1569 }
1570
1571 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1573 where
1574 Self: 'static + Debug + Sized,
1575 {
1576 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1577 }
1578
1579 #[cfg(feature = "defi")]
1580 fn unsubscribe_blocks(
1582 &mut self,
1583 chain: Blockchain,
1584 client_id: Option<ClientId>,
1585 params: Option<IndexMap<String, String>>,
1586 ) where
1587 Self: 'static + Debug + Sized,
1588 {
1589 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1590 }
1591
1592 #[cfg(feature = "defi")]
1593 fn unsubscribe_pool(
1595 &mut self,
1596 instrument_id: InstrumentId,
1597 client_id: Option<ClientId>,
1598 params: Option<IndexMap<String, String>>,
1599 ) where
1600 Self: 'static + Debug + Sized,
1601 {
1602 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1603 }
1604
1605 #[cfg(feature = "defi")]
1606 fn unsubscribe_pool_swaps(
1608 &mut self,
1609 instrument_id: InstrumentId,
1610 client_id: Option<ClientId>,
1611 params: Option<IndexMap<String, String>>,
1612 ) where
1613 Self: 'static + Debug + Sized,
1614 {
1615 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1616 }
1617
1618 #[cfg(feature = "defi")]
1619 fn unsubscribe_pool_liquidity_updates(
1621 &mut self,
1622 instrument_id: InstrumentId,
1623 client_id: Option<ClientId>,
1624 params: Option<IndexMap<String, String>>,
1625 ) where
1626 Self: 'static + Debug + Sized,
1627 {
1628 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1629 }
1630
1631 #[cfg(feature = "defi")]
1632 fn unsubscribe_pool_fee_collects(
1634 &mut self,
1635 instrument_id: InstrumentId,
1636 client_id: Option<ClientId>,
1637 params: Option<IndexMap<String, String>>,
1638 ) where
1639 Self: 'static + Debug + Sized,
1640 {
1641 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1642 }
1643
1644 #[cfg(feature = "defi")]
1645 fn unsubscribe_pool_flash_events(
1647 &mut self,
1648 instrument_id: InstrumentId,
1649 client_id: Option<ClientId>,
1650 params: Option<IndexMap<String, String>>,
1651 ) where
1652 Self: 'static + Debug + Sized,
1653 {
1654 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1655 }
1656
1657 fn request_data(
1663 &mut self,
1664 data_type: DataType,
1665 client_id: ClientId,
1666 start: Option<DateTime<Utc>>,
1667 end: Option<DateTime<Utc>>,
1668 limit: Option<NonZeroUsize>,
1669 params: Option<IndexMap<String, String>>,
1670 ) -> anyhow::Result<UUID4>
1671 where
1672 Self: 'static + Debug + Sized,
1673 {
1674 let actor_id = self.actor_id().inner();
1675 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1676 move |resp: &CustomDataResponse| {
1677 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1678 },
1679 )));
1680
1681 DataActorCore::request_data(
1682 self, data_type, client_id, start, end, limit, params, handler,
1683 )
1684 }
1685
1686 fn request_instrument(
1692 &mut self,
1693 instrument_id: InstrumentId,
1694 start: Option<DateTime<Utc>>,
1695 end: Option<DateTime<Utc>>,
1696 client_id: Option<ClientId>,
1697 params: Option<IndexMap<String, String>>,
1698 ) -> anyhow::Result<UUID4>
1699 where
1700 Self: 'static + Debug + Sized,
1701 {
1702 let actor_id = self.actor_id().inner();
1703 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1704 move |resp: &InstrumentResponse| {
1705 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1706 },
1707 )));
1708
1709 DataActorCore::request_instrument(
1710 self,
1711 instrument_id,
1712 start,
1713 end,
1714 client_id,
1715 params,
1716 handler,
1717 )
1718 }
1719
1720 fn request_instruments(
1726 &mut self,
1727 venue: Option<Venue>,
1728 start: Option<DateTime<Utc>>,
1729 end: Option<DateTime<Utc>>,
1730 client_id: Option<ClientId>,
1731 params: Option<IndexMap<String, String>>,
1732 ) -> anyhow::Result<UUID4>
1733 where
1734 Self: 'static + Debug + Sized,
1735 {
1736 let actor_id = self.actor_id().inner();
1737 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1738 move |resp: &InstrumentsResponse| {
1739 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1740 },
1741 )));
1742
1743 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1744 }
1745
1746 fn request_book_snapshot(
1752 &mut self,
1753 instrument_id: InstrumentId,
1754 depth: Option<NonZeroUsize>,
1755 client_id: Option<ClientId>,
1756 params: Option<IndexMap<String, String>>,
1757 ) -> anyhow::Result<UUID4>
1758 where
1759 Self: 'static + Debug + Sized,
1760 {
1761 let actor_id = self.actor_id().inner();
1762 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1763 move |resp: &BookResponse| {
1764 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1765 },
1766 )));
1767
1768 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1769 }
1770
1771 fn request_quotes(
1777 &mut self,
1778 instrument_id: InstrumentId,
1779 start: Option<DateTime<Utc>>,
1780 end: Option<DateTime<Utc>>,
1781 limit: Option<NonZeroUsize>,
1782 client_id: Option<ClientId>,
1783 params: Option<IndexMap<String, String>>,
1784 ) -> anyhow::Result<UUID4>
1785 where
1786 Self: 'static + Debug + Sized,
1787 {
1788 let actor_id = self.actor_id().inner();
1789 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1790 move |resp: &QuotesResponse| {
1791 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1792 },
1793 )));
1794
1795 DataActorCore::request_quotes(
1796 self,
1797 instrument_id,
1798 start,
1799 end,
1800 limit,
1801 client_id,
1802 params,
1803 handler,
1804 )
1805 }
1806
1807 fn request_trades(
1813 &mut self,
1814 instrument_id: InstrumentId,
1815 start: Option<DateTime<Utc>>,
1816 end: Option<DateTime<Utc>>,
1817 limit: Option<NonZeroUsize>,
1818 client_id: Option<ClientId>,
1819 params: Option<IndexMap<String, String>>,
1820 ) -> anyhow::Result<UUID4>
1821 where
1822 Self: 'static + Debug + Sized,
1823 {
1824 let actor_id = self.actor_id().inner();
1825 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1826 move |resp: &TradesResponse| {
1827 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1828 },
1829 )));
1830
1831 DataActorCore::request_trades(
1832 self,
1833 instrument_id,
1834 start,
1835 end,
1836 limit,
1837 client_id,
1838 params,
1839 handler,
1840 )
1841 }
1842
1843 fn request_bars(
1849 &mut self,
1850 bar_type: BarType,
1851 start: Option<DateTime<Utc>>,
1852 end: Option<DateTime<Utc>>,
1853 limit: Option<NonZeroUsize>,
1854 client_id: Option<ClientId>,
1855 params: Option<IndexMap<String, String>>,
1856 ) -> anyhow::Result<UUID4>
1857 where
1858 Self: 'static + Debug + Sized,
1859 {
1860 let actor_id = self.actor_id().inner();
1861 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1862 move |resp: &BarsResponse| {
1863 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1864 },
1865 )));
1866
1867 DataActorCore::request_bars(
1868 self, bar_type, start, end, limit, client_id, params, handler,
1869 )
1870 }
1871}
1872
1873impl<T> Actor for T
1875where
1876 T: DataActor + Debug + 'static,
1877{
1878 fn id(&self) -> Ustr {
1879 self.actor_id.inner()
1880 }
1881
1882 #[allow(unused_variables)]
1883 fn handle(&mut self, msg: &dyn Any) {
1884 }
1886
1887 fn as_any(&self) -> &dyn Any {
1888 self
1889 }
1890}
1891
1892impl<T> Component for T
1894where
1895 T: DataActor + Debug + 'static,
1896{
1897 fn component_id(&self) -> ComponentId {
1898 ComponentId::new(self.actor_id.inner().as_str())
1899 }
1900
1901 fn state(&self) -> ComponentState {
1902 self.state
1903 }
1904
1905 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1906 self.state = self.state.transition(&trigger)?;
1907 log::info!("{}", self.state.variant_name());
1908 Ok(())
1909 }
1910
1911 fn register(
1912 &mut self,
1913 trader_id: TraderId,
1914 clock: Rc<RefCell<dyn Clock>>,
1915 cache: Rc<RefCell<Cache>>,
1916 ) -> anyhow::Result<()> {
1917 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1918
1919 let actor_id = self.actor_id().inner();
1921 let callback = TimeEventCallback::from(move |event: TimeEvent| {
1922 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1923 actor.handle_time_event(&event);
1924 } else {
1925 log::error!("Actor {actor_id} not found for time event handling");
1926 }
1927 });
1928
1929 clock.borrow_mut().register_default_handler(callback);
1930
1931 self.initialize()
1932 }
1933
1934 fn on_start(&mut self) -> anyhow::Result<()> {
1935 DataActor::on_start(self)
1936 }
1937
1938 fn on_stop(&mut self) -> anyhow::Result<()> {
1939 DataActor::on_stop(self)
1940 }
1941
1942 fn on_resume(&mut self) -> anyhow::Result<()> {
1943 DataActor::on_resume(self)
1944 }
1945
1946 fn on_degrade(&mut self) -> anyhow::Result<()> {
1947 DataActor::on_degrade(self)
1948 }
1949
1950 fn on_fault(&mut self) -> anyhow::Result<()> {
1951 DataActor::on_fault(self)
1952 }
1953
1954 fn on_reset(&mut self) -> anyhow::Result<()> {
1955 DataActor::on_reset(self)
1956 }
1957
1958 fn on_dispose(&mut self) -> anyhow::Result<()> {
1959 DataActor::on_dispose(self)
1960 }
1961}
1962
1963#[allow(
1965 dead_code,
1966 reason = "TODO: Under development (pending_requests, signal_classes)"
1967)]
1968pub struct DataActorCore {
1969 pub actor_id: ActorId,
1971 pub config: DataActorConfig,
1973 trader_id: Option<TraderId>,
1974 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
1977 topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
1978 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
1980 signal_classes: AHashMap<String, String>,
1981 #[cfg(feature = "indicators")]
1982 indicators: Indicators,
1983}
1984
1985impl Debug for DataActorCore {
1986 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1987 f.debug_struct(stringify!(DataActorCore))
1988 .field("actor_id", &self.actor_id)
1989 .field("config", &self.config)
1990 .field("state", &self.state)
1991 .field("trader_id", &self.trader_id)
1992 .finish()
1993 }
1994}
1995
1996impl DataActorCore {
1997 pub(crate) fn add_subscription(
2001 &mut self,
2002 topic: MStr<Topic>,
2003 handler: ShareableMessageHandler,
2004 ) {
2005 if self.topic_handlers.contains_key(&topic) {
2006 log::warn!(
2007 "Actor {} attempted duplicate subscription to topic '{topic}'",
2008 self.actor_id,
2009 );
2010 return;
2011 }
2012
2013 self.topic_handlers.insert(topic, handler.clone());
2014 msgbus::subscribe_topic(topic, handler, None);
2015 }
2016
2017 pub(crate) fn remove_subscription(&mut self, topic: MStr<Topic>) {
2021 if let Some(handler) = self.topic_handlers.remove(&topic) {
2022 msgbus::unsubscribe_topic(topic, handler.clone());
2023 } else {
2024 log::warn!(
2025 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2026 self.actor_id,
2027 );
2028 }
2029 }
2030
2031 pub fn new(config: DataActorConfig) -> Self {
2033 let actor_id = config
2034 .actor_id
2035 .unwrap_or_else(|| Self::default_actor_id(&config));
2036
2037 Self {
2038 actor_id,
2039 config,
2040 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2044 topic_handlers: AHashMap::new(),
2045 warning_events: AHashSet::new(),
2046 pending_requests: AHashMap::new(),
2047 signal_classes: AHashMap::new(),
2048 #[cfg(feature = "indicators")]
2049 indicators: Indicators::default(),
2050 }
2051 }
2052
2053 #[must_use]
2055 pub fn mem_address(&self) -> String {
2056 format!("{self:p}")
2057 }
2058
2059 pub fn state(&self) -> ComponentState {
2061 self.state
2062 }
2063
2064 pub fn trader_id(&self) -> Option<TraderId> {
2066 self.trader_id
2067 }
2068
2069 pub fn actor_id(&self) -> ActorId {
2071 self.actor_id
2072 }
2073
2074 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2075 let memory_address = std::ptr::from_ref(config) as *const _ as usize;
2076 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2077 }
2078
2079 pub fn timestamp_ns(&self) -> UnixNanos {
2081 self.clock_ref().timestamp_ns()
2082 }
2083
2084 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2090 self.clock
2091 .as_ref()
2092 .unwrap_or_else(|| {
2093 panic!(
2094 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2095 self.actor_id, self.trader_id
2096 )
2097 })
2098 .borrow_mut()
2099 }
2100
2101 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2107 self.clock
2108 .as_ref()
2109 .expect("DataActor must be registered before accessing clock")
2110 .clone()
2111 }
2112
2113 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2114 self.clock
2115 .as_ref()
2116 .unwrap_or_else(|| {
2117 panic!(
2118 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2119 self.actor_id, self.trader_id
2120 )
2121 })
2122 .borrow()
2123 }
2124
2125 pub fn cache(&self) -> Ref<'_, Cache> {
2131 self.cache
2132 .as_ref()
2133 .expect("DataActor must be registered before accessing cache")
2134 .borrow()
2135 }
2136
2137 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2143 self.cache
2144 .as_ref()
2145 .expect("DataActor must be registered before accessing cache")
2146 .clone()
2147 }
2148
2149 pub fn register(
2158 &mut self,
2159 trader_id: TraderId,
2160 clock: Rc<RefCell<dyn Clock>>,
2161 cache: Rc<RefCell<Cache>>,
2162 ) -> anyhow::Result<()> {
2163 if let Some(existing_trader_id) = self.trader_id {
2164 anyhow::bail!(
2165 "DataActor {} already registered with trader {existing_trader_id}",
2166 self.actor_id
2167 );
2168 }
2169
2170 {
2172 let _timestamp = clock.borrow().timestamp_ns();
2173 }
2174
2175 {
2177 let _cache_borrow = cache.borrow();
2178 }
2179
2180 self.trader_id = Some(trader_id);
2181 self.clock = Some(clock);
2182 self.cache = Some(cache);
2183
2184 if !self.is_properly_registered() {
2186 anyhow::bail!(
2187 "DataActor {} registration incomplete - validation failed",
2188 self.actor_id
2189 );
2190 }
2191
2192 log::info!("Registered {} with trader {trader_id}", self.actor_id);
2193 Ok(())
2194 }
2195
2196 pub fn register_warning_event(&mut self, event_type: &str) {
2198 self.warning_events.insert(event_type.to_string());
2199 log::debug!("Registered event type '{event_type}' for warning logs")
2200 }
2201
2202 pub fn deregister_warning_event(&mut self, event_type: &str) {
2204 self.warning_events.remove(event_type);
2205 log::debug!("Deregistered event type '{event_type}' from warning logs")
2206 }
2207
2208 pub fn is_registered(&self) -> bool {
2209 self.trader_id.is_some()
2210 }
2211
2212 pub(crate) fn check_registered(&self) {
2213 assert!(
2214 self.is_registered(),
2215 "Actor has not been registered with a Trader"
2216 );
2217 }
2218
2219 fn is_properly_registered(&self) -> bool {
2221 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2222 }
2223
2224 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2225 if self.config.log_commands {
2226 log::info!("{CMD}{SEND} {command:?}");
2227 }
2228
2229 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2230 msgbus::send_any(endpoint, command.as_any())
2231 }
2232
2233 #[allow(dead_code, reason = "TODO: Under development")]
2234 fn send_data_req(&self, request: RequestCommand) {
2235 if self.config.log_commands {
2236 log::info!("{REQ}{SEND} {request:?}");
2237 }
2238
2239 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2242 msgbus::send_any(endpoint, request.as_any())
2243 }
2244
2245 pub fn shutdown_system(&self, reason: Option<String>) {
2251 self.check_registered();
2252
2253 let command = ShutdownSystem::new(
2255 self.trader_id().unwrap(),
2256 self.actor_id.inner(),
2257 reason,
2258 UUID4::new(),
2259 self.timestamp_ns(),
2260 );
2261
2262 let endpoint = "command.system.shutdown".into();
2263 msgbus::send_any(endpoint, command.as_any());
2264 }
2265
2266 pub fn subscribe_data(
2274 &mut self,
2275 handler: ShareableMessageHandler,
2276 data_type: DataType,
2277 client_id: Option<ClientId>,
2278 params: Option<IndexMap<String, String>>,
2279 ) {
2280 if !self.is_properly_registered() {
2281 panic!(
2282 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2283 self.actor_id,
2284 self.trader_id,
2285 self.clock.is_some(),
2286 self.cache.is_some()
2287 );
2288 }
2289
2290 let topic = get_custom_topic(&data_type);
2291 self.add_subscription(topic, handler);
2292
2293 if client_id.is_none() {
2295 return;
2296 }
2297
2298 let command = SubscribeCommand::Data(SubscribeCustomData {
2299 data_type,
2300 client_id,
2301 venue: None,
2302 command_id: UUID4::new(),
2303 ts_init: self.timestamp_ns(),
2304 params,
2305 });
2306
2307 self.send_data_cmd(DataCommand::Subscribe(command));
2308 }
2309
2310 pub fn subscribe_quotes(
2312 &mut self,
2313 topic: MStr<Topic>,
2314 handler: ShareableMessageHandler,
2315 instrument_id: InstrumentId,
2316 client_id: Option<ClientId>,
2317 params: Option<IndexMap<String, String>>,
2318 ) {
2319 self.check_registered();
2320
2321 self.add_subscription(topic, handler);
2322
2323 let command = SubscribeCommand::Quotes(SubscribeQuotes {
2324 instrument_id,
2325 client_id,
2326 venue: Some(instrument_id.venue),
2327 command_id: UUID4::new(),
2328 ts_init: self.timestamp_ns(),
2329 params,
2330 });
2331
2332 self.send_data_cmd(DataCommand::Subscribe(command));
2333 }
2334
2335 pub fn subscribe_instruments(
2337 &mut self,
2338 topic: MStr<Topic>,
2339 handler: ShareableMessageHandler,
2340 venue: Venue,
2341 client_id: Option<ClientId>,
2342 params: Option<IndexMap<String, String>>,
2343 ) {
2344 self.check_registered();
2345
2346 self.add_subscription(topic, handler);
2347
2348 let command = SubscribeCommand::Instruments(SubscribeInstruments {
2349 client_id,
2350 venue,
2351 command_id: UUID4::new(),
2352 ts_init: self.timestamp_ns(),
2353 params,
2354 });
2355
2356 self.send_data_cmd(DataCommand::Subscribe(command));
2357 }
2358
2359 pub fn subscribe_instrument(
2361 &mut self,
2362 topic: MStr<Topic>,
2363 handler: ShareableMessageHandler,
2364 instrument_id: InstrumentId,
2365 client_id: Option<ClientId>,
2366 params: Option<IndexMap<String, String>>,
2367 ) {
2368 self.check_registered();
2369
2370 self.add_subscription(topic, handler);
2371
2372 let command = SubscribeCommand::Instrument(SubscribeInstrument {
2373 instrument_id,
2374 client_id,
2375 venue: Some(instrument_id.venue),
2376 command_id: UUID4::new(),
2377 ts_init: self.timestamp_ns(),
2378 params,
2379 });
2380
2381 self.send_data_cmd(DataCommand::Subscribe(command));
2382 }
2383
2384 #[allow(clippy::too_many_arguments)]
2386 pub fn subscribe_book_deltas(
2387 &mut self,
2388 topic: MStr<Topic>,
2389 handler: ShareableMessageHandler,
2390 instrument_id: InstrumentId,
2391 book_type: BookType,
2392 depth: Option<NonZeroUsize>,
2393 client_id: Option<ClientId>,
2394 managed: bool,
2395 params: Option<IndexMap<String, String>>,
2396 ) {
2397 self.check_registered();
2398
2399 self.add_subscription(topic, handler);
2400
2401 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2402 instrument_id,
2403 book_type,
2404 client_id,
2405 venue: Some(instrument_id.venue),
2406 command_id: UUID4::new(),
2407 ts_init: self.timestamp_ns(),
2408 depth,
2409 managed,
2410 params,
2411 });
2412
2413 self.send_data_cmd(DataCommand::Subscribe(command));
2414 }
2415
2416 #[allow(clippy::too_many_arguments)]
2418 pub fn subscribe_book_at_interval(
2419 &mut self,
2420 topic: MStr<Topic>,
2421 handler: ShareableMessageHandler,
2422 instrument_id: InstrumentId,
2423 book_type: BookType,
2424 depth: Option<NonZeroUsize>,
2425 interval_ms: NonZeroUsize,
2426 client_id: Option<ClientId>,
2427 params: Option<IndexMap<String, String>>,
2428 ) {
2429 self.check_registered();
2430
2431 self.add_subscription(topic, handler);
2432
2433 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2434 instrument_id,
2435 book_type,
2436 client_id,
2437 venue: Some(instrument_id.venue),
2438 command_id: UUID4::new(),
2439 ts_init: self.timestamp_ns(),
2440 depth,
2441 interval_ms,
2442 params,
2443 });
2444
2445 self.send_data_cmd(DataCommand::Subscribe(command));
2446 }
2447
2448 pub fn subscribe_trades(
2450 &mut self,
2451 topic: MStr<Topic>,
2452 handler: ShareableMessageHandler,
2453 instrument_id: InstrumentId,
2454 client_id: Option<ClientId>,
2455 params: Option<IndexMap<String, String>>,
2456 ) {
2457 self.check_registered();
2458
2459 self.add_subscription(topic, handler);
2460
2461 let command = SubscribeCommand::Trades(SubscribeTrades {
2462 instrument_id,
2463 client_id,
2464 venue: Some(instrument_id.venue),
2465 command_id: UUID4::new(),
2466 ts_init: self.timestamp_ns(),
2467 params,
2468 });
2469
2470 self.send_data_cmd(DataCommand::Subscribe(command));
2471 }
2472
2473 pub fn subscribe_bars(
2475 &mut self,
2476 topic: MStr<Topic>,
2477 handler: ShareableMessageHandler,
2478 bar_type: BarType,
2479 client_id: Option<ClientId>,
2480 params: Option<IndexMap<String, String>>,
2481 ) {
2482 self.check_registered();
2483
2484 self.add_subscription(topic, handler);
2485
2486 let command = SubscribeCommand::Bars(SubscribeBars {
2487 bar_type,
2488 client_id,
2489 venue: Some(bar_type.instrument_id().venue),
2490 command_id: UUID4::new(),
2491 ts_init: self.timestamp_ns(),
2492 params,
2493 });
2494
2495 self.send_data_cmd(DataCommand::Subscribe(command));
2496 }
2497
2498 pub fn subscribe_mark_prices(
2500 &mut self,
2501 topic: MStr<Topic>,
2502 handler: ShareableMessageHandler,
2503 instrument_id: InstrumentId,
2504 client_id: Option<ClientId>,
2505 params: Option<IndexMap<String, String>>,
2506 ) {
2507 self.check_registered();
2508
2509 self.add_subscription(topic, handler);
2510
2511 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2512 instrument_id,
2513 client_id,
2514 venue: Some(instrument_id.venue),
2515 command_id: UUID4::new(),
2516 ts_init: self.timestamp_ns(),
2517 params,
2518 });
2519
2520 self.send_data_cmd(DataCommand::Subscribe(command));
2521 }
2522
2523 pub fn subscribe_index_prices(
2525 &mut self,
2526 topic: MStr<Topic>,
2527 handler: ShareableMessageHandler,
2528 instrument_id: InstrumentId,
2529 client_id: Option<ClientId>,
2530 params: Option<IndexMap<String, String>>,
2531 ) {
2532 self.check_registered();
2533
2534 self.add_subscription(topic, handler);
2535
2536 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2537 instrument_id,
2538 client_id,
2539 venue: Some(instrument_id.venue),
2540 command_id: UUID4::new(),
2541 ts_init: self.timestamp_ns(),
2542 params,
2543 });
2544
2545 self.send_data_cmd(DataCommand::Subscribe(command));
2546 }
2547
2548 pub fn subscribe_funding_rates(
2550 &mut self,
2551 topic: MStr<Topic>,
2552 handler: ShareableMessageHandler,
2553 instrument_id: InstrumentId,
2554 client_id: Option<ClientId>,
2555 params: Option<IndexMap<String, String>>,
2556 ) {
2557 self.check_registered();
2558
2559 self.add_subscription(topic, handler);
2560
2561 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
2562 instrument_id,
2563 client_id,
2564 venue: Some(instrument_id.venue),
2565 command_id: UUID4::new(),
2566 ts_init: self.timestamp_ns(),
2567 params,
2568 });
2569
2570 self.send_data_cmd(DataCommand::Subscribe(command));
2571 }
2572
2573 pub fn subscribe_instrument_status(
2575 &mut self,
2576 topic: MStr<Topic>,
2577 handler: ShareableMessageHandler,
2578 instrument_id: InstrumentId,
2579 client_id: Option<ClientId>,
2580 params: Option<IndexMap<String, String>>,
2581 ) {
2582 self.check_registered();
2583
2584 self.add_subscription(topic, handler);
2585
2586 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2587 instrument_id,
2588 client_id,
2589 venue: Some(instrument_id.venue),
2590 command_id: UUID4::new(),
2591 ts_init: self.timestamp_ns(),
2592 params,
2593 });
2594
2595 self.send_data_cmd(DataCommand::Subscribe(command));
2596 }
2597
2598 pub fn subscribe_instrument_close(
2600 &mut self,
2601 topic: MStr<Topic>,
2602 handler: ShareableMessageHandler,
2603 instrument_id: InstrumentId,
2604 client_id: Option<ClientId>,
2605 params: Option<IndexMap<String, String>>,
2606 ) {
2607 self.check_registered();
2608
2609 self.add_subscription(topic, handler);
2610
2611 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2612 instrument_id,
2613 client_id,
2614 venue: Some(instrument_id.venue),
2615 command_id: UUID4::new(),
2616 ts_init: self.timestamp_ns(),
2617 params,
2618 });
2619
2620 self.send_data_cmd(DataCommand::Subscribe(command));
2621 }
2622
2623 pub fn subscribe_order_fills(&mut self, topic: MStr<Topic>, handler: ShareableMessageHandler) {
2625 self.check_registered();
2626 self.add_subscription(topic, handler);
2627 }
2628
2629 pub fn unsubscribe_data(
2631 &mut self,
2632 data_type: DataType,
2633 client_id: Option<ClientId>,
2634 params: Option<IndexMap<String, String>>,
2635 ) {
2636 self.check_registered();
2637
2638 let topic = get_custom_topic(&data_type);
2639 self.remove_subscription(topic);
2640
2641 if client_id.is_none() {
2642 return;
2643 }
2644
2645 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2646 data_type,
2647 client_id,
2648 venue: None,
2649 command_id: UUID4::new(),
2650 ts_init: self.timestamp_ns(),
2651 params,
2652 });
2653
2654 self.send_data_cmd(DataCommand::Unsubscribe(command));
2655 }
2656
2657 pub fn unsubscribe_instruments(
2659 &mut self,
2660 venue: Venue,
2661 client_id: Option<ClientId>,
2662 params: Option<IndexMap<String, String>>,
2663 ) {
2664 self.check_registered();
2665
2666 let topic = get_instruments_topic(venue);
2667 self.remove_subscription(topic);
2668
2669 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2670 client_id,
2671 venue,
2672 command_id: UUID4::new(),
2673 ts_init: self.timestamp_ns(),
2674 params,
2675 });
2676
2677 self.send_data_cmd(DataCommand::Unsubscribe(command));
2678 }
2679
2680 pub fn unsubscribe_instrument(
2682 &mut self,
2683 instrument_id: InstrumentId,
2684 client_id: Option<ClientId>,
2685 params: Option<IndexMap<String, String>>,
2686 ) {
2687 self.check_registered();
2688
2689 let topic = get_instrument_topic(instrument_id);
2690 self.remove_subscription(topic);
2691
2692 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2693 instrument_id,
2694 client_id,
2695 venue: Some(instrument_id.venue),
2696 command_id: UUID4::new(),
2697 ts_init: self.timestamp_ns(),
2698 params,
2699 });
2700
2701 self.send_data_cmd(DataCommand::Unsubscribe(command));
2702 }
2703
2704 pub fn unsubscribe_book_deltas(
2706 &mut self,
2707 instrument_id: InstrumentId,
2708 client_id: Option<ClientId>,
2709 params: Option<IndexMap<String, String>>,
2710 ) {
2711 self.check_registered();
2712
2713 let topic = get_book_deltas_topic(instrument_id);
2714 self.remove_subscription(topic);
2715
2716 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2717 instrument_id,
2718 client_id,
2719 venue: Some(instrument_id.venue),
2720 command_id: UUID4::new(),
2721 ts_init: self.timestamp_ns(),
2722 params,
2723 });
2724
2725 self.send_data_cmd(DataCommand::Unsubscribe(command));
2726 }
2727
2728 pub fn unsubscribe_book_at_interval(
2730 &mut self,
2731 instrument_id: InstrumentId,
2732 interval_ms: NonZeroUsize,
2733 client_id: Option<ClientId>,
2734 params: Option<IndexMap<String, String>>,
2735 ) {
2736 self.check_registered();
2737
2738 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
2739 self.remove_subscription(topic);
2740
2741 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2742 instrument_id,
2743 client_id,
2744 venue: Some(instrument_id.venue),
2745 command_id: UUID4::new(),
2746 ts_init: self.timestamp_ns(),
2747 params,
2748 });
2749
2750 self.send_data_cmd(DataCommand::Unsubscribe(command));
2751 }
2752
2753 pub fn unsubscribe_quotes(
2755 &mut self,
2756 instrument_id: InstrumentId,
2757 client_id: Option<ClientId>,
2758 params: Option<IndexMap<String, String>>,
2759 ) {
2760 self.check_registered();
2761
2762 let topic = get_quotes_topic(instrument_id);
2763 self.remove_subscription(topic);
2764
2765 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2766 instrument_id,
2767 client_id,
2768 venue: Some(instrument_id.venue),
2769 command_id: UUID4::new(),
2770 ts_init: self.timestamp_ns(),
2771 params,
2772 });
2773
2774 self.send_data_cmd(DataCommand::Unsubscribe(command));
2775 }
2776
2777 pub fn unsubscribe_trades(
2779 &mut self,
2780 instrument_id: InstrumentId,
2781 client_id: Option<ClientId>,
2782 params: Option<IndexMap<String, String>>,
2783 ) {
2784 self.check_registered();
2785
2786 let topic = get_trades_topic(instrument_id);
2787 self.remove_subscription(topic);
2788
2789 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2790 instrument_id,
2791 client_id,
2792 venue: Some(instrument_id.venue),
2793 command_id: UUID4::new(),
2794 ts_init: self.timestamp_ns(),
2795 params,
2796 });
2797
2798 self.send_data_cmd(DataCommand::Unsubscribe(command));
2799 }
2800
2801 pub fn unsubscribe_bars(
2803 &mut self,
2804 bar_type: BarType,
2805 client_id: Option<ClientId>,
2806 params: Option<IndexMap<String, String>>,
2807 ) {
2808 self.check_registered();
2809
2810 let topic = get_bars_topic(bar_type);
2811 self.remove_subscription(topic);
2812
2813 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2814 bar_type,
2815 client_id,
2816 venue: Some(bar_type.instrument_id().venue),
2817 command_id: UUID4::new(),
2818 ts_init: self.timestamp_ns(),
2819 params,
2820 });
2821
2822 self.send_data_cmd(DataCommand::Unsubscribe(command));
2823 }
2824
2825 pub fn unsubscribe_mark_prices(
2827 &mut self,
2828 instrument_id: InstrumentId,
2829 client_id: Option<ClientId>,
2830 params: Option<IndexMap<String, String>>,
2831 ) {
2832 self.check_registered();
2833
2834 let topic = get_mark_price_topic(instrument_id);
2835 self.remove_subscription(topic);
2836
2837 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2838 instrument_id,
2839 client_id,
2840 venue: Some(instrument_id.venue),
2841 command_id: UUID4::new(),
2842 ts_init: self.timestamp_ns(),
2843 params,
2844 });
2845
2846 self.send_data_cmd(DataCommand::Unsubscribe(command));
2847 }
2848
2849 pub fn unsubscribe_index_prices(
2851 &mut self,
2852 instrument_id: InstrumentId,
2853 client_id: Option<ClientId>,
2854 params: Option<IndexMap<String, String>>,
2855 ) {
2856 self.check_registered();
2857
2858 let topic = get_index_price_topic(instrument_id);
2859 self.remove_subscription(topic);
2860
2861 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2862 instrument_id,
2863 client_id,
2864 venue: Some(instrument_id.venue),
2865 command_id: UUID4::new(),
2866 ts_init: self.timestamp_ns(),
2867 params,
2868 });
2869
2870 self.send_data_cmd(DataCommand::Unsubscribe(command));
2871 }
2872
2873 pub fn unsubscribe_funding_rates(
2875 &mut self,
2876 instrument_id: InstrumentId,
2877 client_id: Option<ClientId>,
2878 params: Option<IndexMap<String, String>>,
2879 ) {
2880 self.check_registered();
2881
2882 let topic = get_funding_rate_topic(instrument_id);
2883 self.remove_subscription(topic);
2884
2885 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
2886 instrument_id,
2887 client_id,
2888 venue: Some(instrument_id.venue),
2889 command_id: UUID4::new(),
2890 ts_init: self.timestamp_ns(),
2891 params,
2892 });
2893
2894 self.send_data_cmd(DataCommand::Unsubscribe(command));
2895 }
2896
2897 pub fn unsubscribe_instrument_status(
2899 &mut self,
2900 instrument_id: InstrumentId,
2901 client_id: Option<ClientId>,
2902 params: Option<IndexMap<String, String>>,
2903 ) {
2904 self.check_registered();
2905
2906 let topic = get_instrument_status_topic(instrument_id);
2907 self.remove_subscription(topic);
2908
2909 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2910 instrument_id,
2911 client_id,
2912 venue: Some(instrument_id.venue),
2913 command_id: UUID4::new(),
2914 ts_init: self.timestamp_ns(),
2915 params,
2916 });
2917
2918 self.send_data_cmd(DataCommand::Unsubscribe(command));
2919 }
2920
2921 pub fn unsubscribe_instrument_close(
2923 &mut self,
2924 instrument_id: InstrumentId,
2925 client_id: Option<ClientId>,
2926 params: Option<IndexMap<String, String>>,
2927 ) {
2928 self.check_registered();
2929
2930 let topic = get_instrument_close_topic(instrument_id);
2931 self.remove_subscription(topic);
2932
2933 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
2934 instrument_id,
2935 client_id,
2936 venue: Some(instrument_id.venue),
2937 command_id: UUID4::new(),
2938 ts_init: self.timestamp_ns(),
2939 params,
2940 });
2941
2942 self.send_data_cmd(DataCommand::Unsubscribe(command));
2943 }
2944
2945 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
2947 self.check_registered();
2948
2949 let topic = get_order_fills_topic(instrument_id);
2950 self.remove_subscription(topic);
2951 }
2952
2953 #[allow(clippy::too_many_arguments)]
2959 pub fn request_data(
2960 &self,
2961 data_type: DataType,
2962 client_id: ClientId,
2963 start: Option<DateTime<Utc>>,
2964 end: Option<DateTime<Utc>>,
2965 limit: Option<NonZeroUsize>,
2966 params: Option<IndexMap<String, String>>,
2967 handler: ShareableMessageHandler,
2968 ) -> anyhow::Result<UUID4> {
2969 self.check_registered();
2970
2971 let now = self.clock_ref().utc_now();
2972 check_timestamps(now, start, end)?;
2973
2974 let request_id = UUID4::new();
2975 let command = RequestCommand::Data(RequestCustomData {
2976 client_id,
2977 data_type,
2978 start,
2979 end,
2980 limit,
2981 request_id,
2982 ts_init: self.timestamp_ns(),
2983 params,
2984 });
2985
2986 get_message_bus()
2987 .borrow_mut()
2988 .register_response_handler(command.request_id(), handler)?;
2989
2990 self.send_data_cmd(DataCommand::Request(command));
2991
2992 Ok(request_id)
2993 }
2994
2995 pub fn request_instrument(
3001 &self,
3002 instrument_id: InstrumentId,
3003 start: Option<DateTime<Utc>>,
3004 end: Option<DateTime<Utc>>,
3005 client_id: Option<ClientId>,
3006 params: Option<IndexMap<String, String>>,
3007 handler: ShareableMessageHandler,
3008 ) -> anyhow::Result<UUID4> {
3009 self.check_registered();
3010
3011 let now = self.clock_ref().utc_now();
3012 check_timestamps(now, start, end)?;
3013
3014 let request_id = UUID4::new();
3015 let command = RequestCommand::Instrument(RequestInstrument {
3016 instrument_id,
3017 start,
3018 end,
3019 client_id,
3020 request_id,
3021 ts_init: now.into(),
3022 params,
3023 });
3024
3025 get_message_bus()
3026 .borrow_mut()
3027 .register_response_handler(command.request_id(), handler)?;
3028
3029 self.send_data_cmd(DataCommand::Request(command));
3030
3031 Ok(request_id)
3032 }
3033
3034 pub fn request_instruments(
3040 &self,
3041 venue: Option<Venue>,
3042 start: Option<DateTime<Utc>>,
3043 end: Option<DateTime<Utc>>,
3044 client_id: Option<ClientId>,
3045 params: Option<IndexMap<String, String>>,
3046 handler: ShareableMessageHandler,
3047 ) -> anyhow::Result<UUID4> {
3048 self.check_registered();
3049
3050 let now = self.clock_ref().utc_now();
3051 check_timestamps(now, start, end)?;
3052
3053 let request_id = UUID4::new();
3054 let command = RequestCommand::Instruments(RequestInstruments {
3055 venue,
3056 start,
3057 end,
3058 client_id,
3059 request_id,
3060 ts_init: now.into(),
3061 params,
3062 });
3063
3064 get_message_bus()
3065 .borrow_mut()
3066 .register_response_handler(command.request_id(), handler)?;
3067
3068 self.send_data_cmd(DataCommand::Request(command));
3069
3070 Ok(request_id)
3071 }
3072
3073 pub fn request_book_snapshot(
3079 &self,
3080 instrument_id: InstrumentId,
3081 depth: Option<NonZeroUsize>,
3082 client_id: Option<ClientId>,
3083 params: Option<IndexMap<String, String>>,
3084 handler: ShareableMessageHandler,
3085 ) -> anyhow::Result<UUID4> {
3086 self.check_registered();
3087
3088 let request_id = UUID4::new();
3089 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3090 instrument_id,
3091 depth,
3092 client_id,
3093 request_id,
3094 ts_init: self.timestamp_ns(),
3095 params,
3096 });
3097
3098 get_message_bus()
3099 .borrow_mut()
3100 .register_response_handler(command.request_id(), handler)?;
3101
3102 self.send_data_cmd(DataCommand::Request(command));
3103
3104 Ok(request_id)
3105 }
3106
3107 #[allow(clippy::too_many_arguments)]
3113 pub fn request_quotes(
3114 &self,
3115 instrument_id: InstrumentId,
3116 start: Option<DateTime<Utc>>,
3117 end: Option<DateTime<Utc>>,
3118 limit: Option<NonZeroUsize>,
3119 client_id: Option<ClientId>,
3120 params: Option<IndexMap<String, String>>,
3121 handler: ShareableMessageHandler,
3122 ) -> anyhow::Result<UUID4> {
3123 self.check_registered();
3124
3125 let now = self.clock_ref().utc_now();
3126 check_timestamps(now, start, end)?;
3127
3128 let request_id = UUID4::new();
3129 let command = RequestCommand::Quotes(RequestQuotes {
3130 instrument_id,
3131 start,
3132 end,
3133 limit,
3134 client_id,
3135 request_id,
3136 ts_init: now.into(),
3137 params,
3138 });
3139
3140 get_message_bus()
3141 .borrow_mut()
3142 .register_response_handler(command.request_id(), handler)?;
3143
3144 self.send_data_cmd(DataCommand::Request(command));
3145
3146 Ok(request_id)
3147 }
3148
3149 #[allow(clippy::too_many_arguments)]
3155 pub fn request_trades(
3156 &self,
3157 instrument_id: InstrumentId,
3158 start: Option<DateTime<Utc>>,
3159 end: Option<DateTime<Utc>>,
3160 limit: Option<NonZeroUsize>,
3161 client_id: Option<ClientId>,
3162 params: Option<IndexMap<String, String>>,
3163 handler: ShareableMessageHandler,
3164 ) -> anyhow::Result<UUID4> {
3165 self.check_registered();
3166
3167 let now = self.clock_ref().utc_now();
3168 check_timestamps(now, start, end)?;
3169
3170 let request_id = UUID4::new();
3171 let command = RequestCommand::Trades(RequestTrades {
3172 instrument_id,
3173 start,
3174 end,
3175 limit,
3176 client_id,
3177 request_id,
3178 ts_init: now.into(),
3179 params,
3180 });
3181
3182 get_message_bus()
3183 .borrow_mut()
3184 .register_response_handler(command.request_id(), handler)?;
3185
3186 self.send_data_cmd(DataCommand::Request(command));
3187
3188 Ok(request_id)
3189 }
3190
3191 #[allow(clippy::too_many_arguments)]
3197 pub fn request_bars(
3198 &self,
3199 bar_type: BarType,
3200 start: Option<DateTime<Utc>>,
3201 end: Option<DateTime<Utc>>,
3202 limit: Option<NonZeroUsize>,
3203 client_id: Option<ClientId>,
3204 params: Option<IndexMap<String, String>>,
3205 handler: ShareableMessageHandler,
3206 ) -> anyhow::Result<UUID4> {
3207 self.check_registered();
3208
3209 let now = self.clock_ref().utc_now();
3210 check_timestamps(now, start, end)?;
3211
3212 let request_id = UUID4::new();
3213 let command = RequestCommand::Bars(RequestBars {
3214 bar_type,
3215 start,
3216 end,
3217 limit,
3218 client_id,
3219 request_id,
3220 ts_init: now.into(),
3221 params,
3222 });
3223
3224 get_message_bus()
3225 .borrow_mut()
3226 .register_response_handler(command.request_id(), handler)?;
3227
3228 self.send_data_cmd(DataCommand::Request(command));
3229
3230 Ok(request_id)
3231 }
3232}
3233
3234fn check_timestamps(
3235 now: DateTime<Utc>,
3236 start: Option<DateTime<Utc>>,
3237 end: Option<DateTime<Utc>>,
3238) -> anyhow::Result<()> {
3239 if let Some(start) = start {
3240 check_predicate_true(start <= now, "start was > now")?
3241 }
3242 if let Some(end) = end {
3243 check_predicate_true(end <= now, "end was > now")?
3244 }
3245
3246 if let (Some(start), Some(end)) = (start, end) {
3247 check_predicate_true(start < end, "start was >= end")?
3248 }
3249
3250 Ok(())
3251}
3252
3253fn log_error(e: &anyhow::Error) {
3254 log::error!("{e}");
3255}
3256
3257fn log_not_running<T>(msg: &T)
3258where
3259 T: Debug,
3260{
3261 log::warn!("Received message when not running - skipping {msg:?}");
3263}
3264
3265fn log_received<T>(msg: &T)
3266where
3267 T: Debug,
3268{
3269 log::debug!("{RECV} {msg:?}");
3270}