1#![allow(dead_code)]
18#![allow(unused_variables)]
19#![allow(unused_imports)]
20
21use std::{
22 any::{Any, TypeId},
23 cell::{RefCell, UnsafeCell},
24 collections::HashSet,
25 fmt::Debug,
26 num::NonZeroUsize,
27 ops::{Deref, DerefMut},
28 rc::Rc,
29 sync::Arc,
30};
31
32use ahash::{AHashMap, AHashSet};
33use chrono::{DateTime, Utc};
34use indexmap::IndexMap;
35use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
36use nautilus_model::{
37 data::{
38 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
39 OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
40 },
41 enums::BookType,
42 identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
43 instruments::{Instrument, InstrumentAny},
44 orderbook::OrderBook,
45};
46use ustr::Ustr;
47use uuid::Uuid;
48
49#[cfg(feature = "indicators")]
50use super::indicators::Indicators;
51use super::{Actor, registry::get_actor_unchecked};
52use crate::{
53 cache::Cache,
54 clock::Clock,
55 enums::{ComponentState, ComponentTrigger},
56 logging::{CMD, RECV, REQ, SEND},
57 messages::{
58 data::{
59 BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
60 InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
61 RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
62 SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
63 SubscribeCustomData, SubscribeIndexPrices, SubscribeInstrument,
64 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
65 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
66 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
67 UnsubscribeCustomData, UnsubscribeIndexPrices, UnsubscribeInstrument,
68 UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
69 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
70 },
71 system::ShutdownSystem,
72 },
73 msgbus::{
74 self, MStr, Pattern, Topic, get_message_bus,
75 handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
76 switchboard::{
77 self, MessagingSwitchboard, get_bars_topic, get_book_deltas_topic,
78 get_book_snapshots_topic, get_custom_topic, get_index_price_topic,
79 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
80 get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
81 },
82 },
83 signal::Signal,
84 timer::TimeEvent,
85};
86
87#[derive(Debug, Clone)]
89pub struct DataActorConfig {
90 pub actor_id: Option<ActorId>,
92 pub log_events: bool,
94 pub log_commands: bool,
96}
97
98impl Default for DataActorConfig {
99 fn default() -> Self {
100 Self {
101 actor_id: None,
102 log_events: true,
103 log_commands: true,
104 }
105 }
106}
107
108type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; impl Actor for DataActorCore {
111 fn id(&self) -> Ustr {
112 self.actor_id.inner()
113 }
114
115 fn handle(&mut self, msg: &dyn Any) {}
116
117 fn as_any(&self) -> &dyn Any {
118 self
119 }
120}
121
122pub trait DataActor: Actor {
123 fn state(&self) -> ComponentState;
125
126 fn is_ready(&self) -> bool {
128 self.state() == ComponentState::Ready
129 }
130
131 fn is_running(&self) -> bool {
133 self.state() == ComponentState::Running
134 }
135
136 fn is_stopped(&self) -> bool {
138 self.state() == ComponentState::Stopped
139 }
140
141 fn is_disposed(&self) -> bool {
143 self.state() == ComponentState::Disposed
144 }
145
146 fn is_degraded(&self) -> bool {
148 self.state() == ComponentState::Degraded
149 }
150
151 fn is_faulted(&self) -> bool {
153 self.state() == ComponentState::Faulted
154 }
155
156 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
162 Ok(IndexMap::new())
163 }
164
165 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
171 Ok(())
172 }
173
174 fn on_start(&mut self) -> anyhow::Result<()> {
180 log::warn!(
181 "The `on_start` handler was called when not overridden, \
182 it's expected that any actions required when starting the actor \
183 occur here, such as subscribing/requesting data"
184 );
185 Ok(())
186 }
187
188 fn on_stop(&mut self) -> anyhow::Result<()> {
194 log::warn!(
195 "The `on_stop` handler was called when not overridden, \
196 it's expected that any actions required when stopping the actor \
197 occur here, such as unsubscribing from data",
198 );
199 Ok(())
200 }
201
202 fn on_resume(&mut self) -> anyhow::Result<()> {
208 log::warn!(
209 "The `on_resume` handler was called when not overridden, \
210 it's expected that any actions required when resuming the actor \
211 following a stop occur here"
212 );
213 Ok(())
214 }
215
216 fn on_reset(&mut self) -> anyhow::Result<()> {
222 log::warn!(
223 "The `on_reset` handler was called when not overridden, \
224 it's expected that any actions required when resetting the actor \
225 occur here, such as resetting indicators and other state"
226 );
227 Ok(())
228 }
229
230 fn on_dispose(&mut self) -> anyhow::Result<()> {
236 Ok(())
237 }
238
239 fn on_degrade(&mut self) -> anyhow::Result<()> {
245 Ok(())
246 }
247
248 fn on_fault(&mut self) -> anyhow::Result<()> {
254 Ok(())
255 }
256
257 fn on_event(&mut self, event: &dyn Any) -> anyhow::Result<()> {
263 Ok(())
265 }
266
267 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
273 Ok(())
274 }
275
276 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
282 Ok(())
283 }
284
285 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
291 Ok(())
292 }
293
294 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
309 Ok(())
310 }
311
312 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
318 Ok(())
319 }
320
321 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
327 Ok(())
328 }
329
330 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
336 Ok(())
337 }
338
339 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
345 Ok(())
346 }
347
348 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
354 Ok(())
355 }
356
357 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
363 Ok(())
364 }
365
366 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
372 Ok(())
373 }
374
375 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
381 Ok(())
382 }
383
384 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
390 Ok(())
391 }
392
393 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
399 Ok(())
400 }
401
402 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
408 Ok(())
409 }
410
411 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
417 Ok(())
418 }
419
420 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
426 Ok(())
427 }
428
429 fn on_historical_index_prices(
435 &mut self,
436 index_prices: &[IndexPriceUpdate],
437 ) -> anyhow::Result<()> {
438 Ok(())
439 }
440
441 fn handle_data(&mut self, data: &dyn Any) {
443 log_received(&data);
444
445 if !self.is_running() {
446 log_not_running(&data);
447 return;
448 }
449
450 if let Err(e) = self.on_data(data) {
451 log_error(&e);
452 }
453 }
454
455 fn handle_signal(&mut self, signal: &Signal) {
457 log_received(&signal);
458
459 if !self.is_running() {
460 log_not_running(&signal);
461 return;
462 }
463
464 if let Err(e) = self.on_signal(signal) {
465 log_error(&e);
466 }
467 }
468
469 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
471 log_received(&instrument);
472
473 if !self.is_running() {
474 log_not_running(&instrument);
475 return;
476 }
477
478 if let Err(e) = self.on_instrument(instrument) {
479 log_error(&e);
480 }
481 }
482
483 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
485 log_received(&deltas);
486
487 if !self.is_running() {
488 log_not_running(&deltas);
489 return;
490 }
491
492 if let Err(e) = self.on_book_deltas(deltas) {
493 log_error(&e);
494 }
495 }
496
497 fn handle_book(&mut self, book: &OrderBook) {
499 log_received(&book);
500
501 if !self.is_running() {
502 log_not_running(&book);
503 return;
504 }
505
506 if let Err(e) = self.on_book(book) {
507 log_error(&e);
508 };
509 }
510
511 fn handle_quote(&mut self, quote: &QuoteTick) {
513 log_received("e);
514
515 if !self.is_running() {
516 log_not_running("e);
517 return;
518 }
519
520 if let Err(e) = self.on_quote(quote) {
521 log_error(&e);
522 }
523 }
524
525 fn handle_trade(&mut self, trade: &TradeTick) {
527 log_received(&trade);
528
529 if !self.is_running() {
530 log_not_running(&trade);
531 return;
532 }
533
534 if let Err(e) = self.on_trade(trade) {
535 log_error(&e);
536 }
537 }
538
539 fn handle_bar(&mut self, bar: &Bar) {
541 log_received(&bar);
542
543 if !self.is_running() {
544 log_not_running(&bar);
545 return;
546 }
547
548 if let Err(e) = self.on_bar(bar) {
549 log_error(&e);
550 }
551 }
552
553 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
555 log_received(&mark_price);
556
557 if !self.is_running() {
558 log_not_running(&mark_price);
559 return;
560 }
561
562 if let Err(e) = self.on_mark_price(mark_price) {
563 log_error(&e);
564 }
565 }
566
567 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
569 log_received(&index_price);
570
571 if !self.is_running() {
572 log_not_running(&index_price);
573 return;
574 }
575
576 if let Err(e) = self.on_index_price(index_price) {
577 log_error(&e);
578 }
579 }
580
581 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
583 log_received(&status);
584
585 if !self.is_running() {
586 log_not_running(&status);
587 return;
588 }
589
590 if let Err(e) = self.on_instrument_status(status) {
591 log_error(&e);
592 }
593 }
594
595 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
597 log_received(&close);
598
599 if !self.is_running() {
600 log_not_running(&close);
601 return;
602 }
603
604 if let Err(e) = self.on_instrument_close(close) {
605 log_error(&e);
606 }
607 }
608
609 fn handle_historical_data(&mut self, data: &dyn Any) {
611 log_received(&data);
612
613 if let Err(e) = self.on_historical_data(data) {
614 log_error(&e);
615 }
616 }
617
618 fn handle_time_event(&mut self, event: &TimeEvent) {
620 log_received(&event);
621
622 if let Err(e) = self.on_time_event(event) {
623 log_error(&e);
624 }
625 }
626
627 fn handle_event(&mut self, event: &dyn Any) {
629 log_received(&event);
630
631 if let Err(e) = self.on_event(event) {
632 log_error(&e);
633 }
634 }
635
636 fn handle_data_response(&mut self, response: &CustomDataResponse) {
638 log_received(&response);
639
640 if let Err(e) = self.on_historical_data(response.data.as_ref()) {
641 log_error(&e);
642 }
643 }
644
645 fn handle_instrument_response(&mut self, response: &InstrumentResponse) {
647 log_received(&response);
648
649 if let Err(e) = self.on_instrument(&response.data) {
650 log_error(&e);
651 }
652 }
653
654 fn handle_instruments_response(&mut self, response: &InstrumentsResponse) {
656 log_received(&response);
657
658 for inst in &response.data {
659 if let Err(e) = self.on_instrument(inst) {
660 log_error(&e);
661 }
662 }
663 }
664
665 fn handle_book_response(&mut self, response: &BookResponse) {
667 log_received(&response);
668
669 if let Err(e) = self.on_book(&response.data) {
670 log_error(&e);
671 }
672 }
673
674 fn handle_quotes_response(&mut self, response: &QuotesResponse) {
676 log_received(&response);
677
678 if let Err(e) = self.on_historical_quotes(&response.data) {
679 log_error(&e);
680 }
681 }
682
683 fn handle_trades_response(&mut self, response: &TradesResponse) {
685 log_received(&response);
686
687 if let Err(e) = self.on_historical_trades(&response.data) {
688 log_error(&e);
689 }
690 }
691
692 fn handle_bars_response(&mut self, response: &BarsResponse) {
694 log_received(&response);
695
696 if let Err(e) = self.on_historical_bars(&response.data) {
697 log_error(&e);
698 }
699 }
700}
701
702pub struct DataActorCore {
704 pub actor_id: ActorId,
706 pub config: DataActorConfig,
708 pub clock: Rc<RefCell<dyn Clock>>,
710 pub cache: Rc<RefCell<Cache>>,
712 state: ComponentState,
713 trader_id: Option<TraderId>,
714 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
716 signal_classes: AHashMap<String, String>,
717 #[cfg(feature = "indicators")]
718 indicators: Indicators,
719 topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
720}
721
722impl Debug for DataActorCore {
723 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
724 f.debug_struct(stringify!(DataActorCore))
725 .field("actor_id", &self.actor_id)
726 .field("config", &self.config)
727 .field("state", &self.state)
728 .field("trader_id", &self.trader_id)
729 .finish()
730 }
731}
732
733impl DataActor for DataActorCore {
734 fn state(&self) -> ComponentState {
735 self.state
736 }
737}
738
739impl DataActorCore {
740 pub fn new(
742 config: DataActorConfig,
743 cache: Rc<RefCell<Cache>>,
744 clock: Rc<RefCell<dyn Clock>>,
745 ) -> Self {
746 let actor_id = config
747 .actor_id
748 .unwrap_or_else(|| Self::default_actor_id(&config));
749
750 Self {
751 actor_id,
752 config,
753 clock,
754 cache,
755 state: ComponentState::default(),
756 trader_id: None, warning_events: AHashSet::new(),
758 pending_requests: AHashMap::new(),
759 signal_classes: AHashMap::new(),
760 #[cfg(feature = "indicators")]
761 indicators: Indicators::default(),
762 topic_handlers: AHashMap::new(),
763 }
764 }
765
766 fn default_actor_id(config: &DataActorConfig) -> ActorId {
767 let memory_address = std::ptr::from_ref(config) as *const _ as usize;
768 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
769 }
770
771 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
772 self.state = self.state.transition(&trigger)?;
773 log::info!("{}", self.state);
774 Ok(())
775 }
776
777 pub fn initialize(&mut self) -> anyhow::Result<()> {
785 self.transition_state(ComponentTrigger::Initialize)
786 }
787
788 pub fn trader_id(&self) -> Option<TraderId> {
790 self.trader_id
791 }
792
793 pub fn state(&self) -> ComponentState {
795 self.state
796 }
797
798 pub fn register_warning_event(&mut self, event_type: &str) {
802 self.warning_events.insert(event_type.to_string());
803 }
804
805 pub fn deregister_warning_event(&mut self, event_type: &str) {
807 self.warning_events.remove(event_type);
808 }
810
811 pub(crate) fn set_trader_id(&mut self, trader_id: TraderId) {
817 if let Some(existing_trader_id) = self.trader_id {
818 panic!("trader_id {existing_trader_id} already set");
819 }
820
821 self.trader_id = Some(trader_id)
822 }
823
824 fn check_registered(&self) {
825 assert!(
826 self.trader_id.is_some(),
827 "Actor has not been registered with a Trader"
828 );
829 }
830
831 fn generate_ts_init(&self) -> UnixNanos {
832 self.clock.borrow().timestamp_ns()
833 }
834
835 fn send_data_cmd(&self, command: DataCommand) {
836 if self.config.log_commands {
837 log::info!("{CMD}{SEND} {command:?}");
838 }
839
840 let endpoint = MessagingSwitchboard::data_engine_execute();
841 msgbus::send(endpoint, command.as_any())
842 }
843
844 fn send_data_req<A: DataActor>(&self, request: RequestCommand) {
845 if self.config.log_commands {
846 log::info!("{REQ}{SEND} {request:?}");
847 }
848
849 let actor_id = self.actor_id.inner();
850 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
851 move |response: &CustomDataResponse| {
852 get_actor_unchecked::<A>(&actor_id).handle_data_response(response);
853 },
854 )));
855
856 let msgbus = get_message_bus()
857 .borrow_mut()
858 .register_response_handler(request.request_id(), handler);
859
860 let endpoint = MessagingSwitchboard::data_engine_execute();
861 msgbus::send(endpoint, request.as_any())
862 }
863
864 pub fn start(&mut self) -> anyhow::Result<()> {
870 self.transition_state(ComponentTrigger::Start)?; if let Err(e) = self.on_start() {
873 log_error(&e);
874 return Err(e); }
876
877 self.transition_state(ComponentTrigger::StartCompleted)?;
878
879 Ok(())
880 }
881
882 pub fn stop(&mut self) -> anyhow::Result<()> {
888 self.transition_state(ComponentTrigger::Stop)?; if let Err(e) = self.on_stop() {
891 log_error(&e);
892 return Err(e); }
894
895 self.transition_state(ComponentTrigger::StopCompleted)?;
896
897 Ok(())
898 }
899
900 pub fn resume(&mut self) -> anyhow::Result<()> {
906 self.transition_state(ComponentTrigger::Resume)?; if let Err(e) = self.on_stop() {
909 log_error(&e);
910 return Err(e); }
912
913 self.transition_state(ComponentTrigger::ResumeCompleted)?;
914
915 Ok(())
916 }
917
918 pub fn reset(&mut self) -> anyhow::Result<()> {
924 self.transition_state(ComponentTrigger::Reset)?; if let Err(e) = self.on_reset() {
927 log_error(&e);
928 return Err(e); }
930
931 self.transition_state(ComponentTrigger::ResetCompleted)?;
932
933 Ok(())
934 }
935
936 pub fn dispose(&mut self) -> anyhow::Result<()> {
942 self.transition_state(ComponentTrigger::Dispose)?; if let Err(e) = self.on_dispose() {
945 log_error(&e);
946 return Err(e); }
948
949 self.transition_state(ComponentTrigger::DisposeCompleted)?;
950
951 Ok(())
952 }
953
954 pub fn degrade(&mut self) -> anyhow::Result<()> {
960 self.transition_state(ComponentTrigger::Degrade)?; if let Err(e) = self.on_degrade() {
963 log_error(&e);
964 return Err(e); }
966
967 self.transition_state(ComponentTrigger::DegradeCompleted)?;
968
969 Ok(())
970 }
971
972 pub fn fault(&mut self) -> anyhow::Result<()> {
978 self.transition_state(ComponentTrigger::Fault)?; if let Err(e) = self.on_fault() {
981 log_error(&e);
982 return Err(e); }
984
985 self.transition_state(ComponentTrigger::FaultCompleted)?;
986
987 Ok(())
988 }
989
990 pub fn shutdown_system(&self, reason: Option<String>) {
996 self.check_registered();
997
998 let command = ShutdownSystem::new(
1000 self.trader_id().unwrap(),
1001 self.actor_id.inner(),
1002 reason,
1003 UUID4::new(),
1004 self.clock.borrow().timestamp_ns(),
1005 );
1006
1007 let endpoint = "command.system.shutdown".into();
1008 msgbus::send(endpoint, command.as_any());
1009 }
1010
1011 fn get_or_create_handler_for_topic<F>(
1014 &mut self,
1015 topic: MStr<Topic>,
1016 create_handler: F,
1017 ) -> ShareableMessageHandler
1018 where
1019 F: FnOnce() -> ShareableMessageHandler,
1020 {
1021 if let Some(existing_handler) = self.topic_handlers.get(&topic) {
1022 existing_handler.clone()
1023 } else {
1024 let new_handler = create_handler();
1025 self.topic_handlers.insert(topic, new_handler.clone());
1026 new_handler
1027 }
1028 }
1029
1030 fn get_handler_for_topic(&self, topic: MStr<Topic>) -> Option<ShareableMessageHandler> {
1031 self.topic_handlers.get(&topic).cloned()
1032 }
1033
1034 pub fn subscribe_data<A: DataActor>(
1036 &mut self,
1037 data_type: DataType,
1038 client_id: Option<ClientId>,
1039 params: Option<IndexMap<String, String>>,
1040 ) {
1041 self.check_registered();
1042
1043 let topic = get_custom_topic(&data_type);
1044 let actor_id = self.actor_id.inner();
1045 let handler = if let Some(existing_handler) = self.topic_handlers.get(&topic) {
1046 existing_handler.clone()
1047 } else {
1048 let new_handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
1049 move |data: &dyn Any| {
1050 get_actor_unchecked::<A>(&actor_id).handle_data(data);
1051 },
1052 )));
1053
1054 self.topic_handlers.insert(topic, new_handler.clone());
1055 new_handler
1056 };
1057
1058 msgbus::subscribe_topic(topic, handler, None);
1059
1060 if client_id.is_none() {
1061 return;
1063 }
1064
1065 let command = SubscribeCommand::Data(SubscribeCustomData {
1066 data_type,
1067 client_id,
1068 venue: None,
1069 command_id: UUID4::new(),
1070 ts_init: self.generate_ts_init(),
1071 params,
1072 });
1073
1074 self.send_data_cmd(DataCommand::Subscribe(command));
1075 }
1076
1077 pub fn subscribe_instruments<A: DataActor>(
1079 &mut self,
1080 venue: Venue,
1081 client_id: Option<ClientId>,
1082 params: Option<IndexMap<String, String>>,
1083 ) {
1084 self.check_registered();
1085
1086 let topic = get_instruments_topic(venue);
1087 let actor_id = self.actor_id.inner();
1088 let handler = self.get_or_create_handler_for_topic(topic, || {
1089 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1090 move |instrument: &InstrumentAny| {
1091 get_actor_unchecked::<A>(&actor_id).handle_instrument(instrument);
1092 },
1093 )))
1094 });
1095
1096 msgbus::subscribe_topic(topic, handler, None);
1097
1098 let command = SubscribeCommand::Instruments(SubscribeInstruments {
1099 client_id,
1100 venue,
1101 command_id: UUID4::new(),
1102 ts_init: self.generate_ts_init(),
1103 params,
1104 });
1105
1106 self.send_data_cmd(DataCommand::Subscribe(command));
1107 }
1108
1109 pub fn subscribe_instrument<A: DataActor>(
1111 &mut self,
1112 instrument_id: InstrumentId,
1113 client_id: Option<ClientId>,
1114 params: Option<IndexMap<String, String>>,
1115 ) {
1116 self.check_registered();
1117
1118 let topic = get_instrument_topic(instrument_id);
1119 let actor_id = self.actor_id.inner();
1120 let handler = self.get_or_create_handler_for_topic(topic, || {
1121 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1122 move |instrument: &InstrumentAny| {
1123 get_actor_unchecked::<A>(&actor_id).handle_instrument(instrument);
1124 },
1125 )))
1126 });
1127
1128 msgbus::subscribe_topic(topic, handler, None);
1129
1130 let command = SubscribeCommand::Instrument(SubscribeInstrument {
1131 instrument_id,
1132 client_id,
1133 venue: Some(instrument_id.venue),
1134 command_id: UUID4::new(),
1135 ts_init: self.generate_ts_init(),
1136 params,
1137 });
1138
1139 self.send_data_cmd(DataCommand::Subscribe(command));
1140 }
1141
1142 pub fn subscribe_book_deltas<A: DataActor>(
1147 &mut self,
1148 instrument_id: InstrumentId,
1149 book_type: BookType,
1150 depth: Option<NonZeroUsize>,
1151 client_id: Option<ClientId>,
1152 managed: bool,
1153 params: Option<IndexMap<String, String>>,
1154 ) {
1155 self.check_registered();
1156
1157 let topic = get_book_deltas_topic(instrument_id);
1158 let actor_id = self.actor_id.inner();
1159 let handler = self.get_or_create_handler_for_topic(topic, || {
1160 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1161 move |deltas: &OrderBookDeltas| {
1162 get_actor_unchecked::<A>(&actor_id).handle_book_deltas(deltas);
1163 },
1164 )))
1165 });
1166
1167 msgbus::subscribe_topic(topic, handler, None);
1168
1169 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
1170 instrument_id,
1171 book_type,
1172 client_id,
1173 venue: Some(instrument_id.venue),
1174 command_id: UUID4::new(),
1175 ts_init: self.generate_ts_init(),
1176 depth,
1177 managed,
1178 params,
1179 });
1180
1181 self.send_data_cmd(DataCommand::Subscribe(command));
1182 }
1183
1184 pub fn subscribe_book_at_interval<A: DataActor>(
1193 &mut self,
1194 instrument_id: InstrumentId,
1195 book_type: BookType,
1196 depth: Option<NonZeroUsize>,
1197 interval_ms: NonZeroUsize,
1198 client_id: Option<ClientId>,
1199 params: Option<IndexMap<String, String>>,
1200 ) {
1201 self.check_registered();
1202
1203 if book_type == BookType::L1_MBP && depth.is_some_and(|d| d.get() > 1) {
1204 log::error!(
1205 "Cannot subscribe to order book snapshots: L1 MBP book subscription depth > 1, was {:?}",
1206 depth,
1207 );
1208 return;
1209 }
1210
1211 let topic = get_book_snapshots_topic(instrument_id);
1212 let actor_id = self.actor_id.inner();
1213 let handler = self.get_or_create_handler_for_topic(topic, || {
1214 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1215 move |book: &OrderBook| {
1216 get_actor_unchecked::<A>(&actor_id).handle_book(book);
1217 },
1218 )))
1219 });
1220
1221 msgbus::subscribe_topic(topic, handler, None);
1222
1223 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
1224 instrument_id,
1225 book_type,
1226 client_id,
1227 venue: Some(instrument_id.venue),
1228 command_id: UUID4::new(),
1229 ts_init: self.generate_ts_init(),
1230 depth,
1231 interval_ms,
1232 params,
1233 });
1234
1235 self.send_data_cmd(DataCommand::Subscribe(command));
1236 }
1237
1238 pub fn subscribe_quotes<A: DataActor>(
1240 &mut self,
1241 instrument_id: InstrumentId,
1242 client_id: Option<ClientId>,
1243 params: Option<IndexMap<String, String>>,
1244 ) {
1245 self.check_registered();
1246
1247 let topic = get_quotes_topic(instrument_id);
1248 let actor_id = self.actor_id.inner();
1249 let handler = self.get_or_create_handler_for_topic(topic, || {
1250 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1251 move |quote: &QuoteTick| {
1252 get_actor_unchecked::<A>(&actor_id).handle_quote(quote);
1253 },
1254 )))
1255 });
1256
1257 msgbus::subscribe_topic(topic, handler, None);
1258
1259 let command = SubscribeCommand::Quotes(SubscribeQuotes {
1260 instrument_id,
1261 client_id,
1262 venue: Some(instrument_id.venue),
1263 command_id: UUID4::new(),
1264 ts_init: self.generate_ts_init(),
1265 params,
1266 });
1267
1268 self.send_data_cmd(DataCommand::Subscribe(command));
1269 }
1270
1271 pub fn subscribe_trades<A: DataActor>(
1273 &mut self,
1274 instrument_id: InstrumentId,
1275 client_id: Option<ClientId>,
1276 params: Option<IndexMap<String, String>>,
1277 ) {
1278 self.check_registered();
1279
1280 let topic = get_trades_topic(instrument_id);
1281 let actor_id = self.actor_id.inner();
1282 let handler = self.get_or_create_handler_for_topic(topic, || {
1283 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1284 move |trade: &TradeTick| {
1285 get_actor_unchecked::<A>(&actor_id).handle_trade(trade);
1286 },
1287 )))
1288 });
1289
1290 msgbus::subscribe_topic(topic, handler, None);
1291
1292 let command = SubscribeCommand::Trades(SubscribeTrades {
1293 instrument_id,
1294 client_id,
1295 venue: Some(instrument_id.venue),
1296 command_id: UUID4::new(),
1297 ts_init: self.generate_ts_init(),
1298 params,
1299 });
1300
1301 self.send_data_cmd(DataCommand::Subscribe(command));
1302 }
1303
1304 pub fn subscribe_bars<A: DataActor>(
1309 &mut self,
1310 bar_type: BarType,
1311 client_id: Option<ClientId>,
1312 await_partial: bool,
1313 params: Option<IndexMap<String, String>>,
1314 ) {
1315 self.check_registered();
1316
1317 let topic = get_bars_topic(bar_type);
1318 let actor_id = self.actor_id.inner();
1319 let handler = self.get_or_create_handler_for_topic(topic, || {
1320 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1321 get_actor_unchecked::<A>(&actor_id).handle_bar(bar);
1322 })))
1323 });
1324
1325 msgbus::subscribe_topic(topic, handler, None);
1326
1327 let command = SubscribeCommand::Bars(SubscribeBars {
1328 bar_type,
1329 client_id,
1330 venue: Some(bar_type.instrument_id().venue),
1331 command_id: UUID4::new(),
1332 ts_init: self.generate_ts_init(),
1333 await_partial,
1334 params,
1335 });
1336
1337 self.send_data_cmd(DataCommand::Subscribe(command));
1338 }
1339
1340 pub fn subscribe_mark_prices<A: DataActor>(
1345 &mut self,
1346 instrument_id: InstrumentId,
1347 client_id: Option<ClientId>,
1348 params: Option<IndexMap<String, String>>,
1349 ) {
1350 self.check_registered();
1351
1352 let topic = get_mark_price_topic(instrument_id);
1353 let actor_id = self.actor_id.inner();
1354 let handler = self.get_or_create_handler_for_topic(topic, || {
1355 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1356 move |mark_price: &MarkPriceUpdate| {
1357 get_actor_unchecked::<A>(&actor_id).handle_mark_price(mark_price);
1358 },
1359 )))
1360 });
1361
1362 msgbus::subscribe_topic(topic, handler, None);
1363
1364 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
1365 instrument_id,
1366 client_id,
1367 venue: Some(instrument_id.venue),
1368 command_id: UUID4::new(),
1369 ts_init: self.generate_ts_init(),
1370 params,
1371 });
1372
1373 self.send_data_cmd(DataCommand::Subscribe(command));
1374 }
1375
1376 pub fn subscribe_index_prices<A: DataActor>(
1381 &mut self,
1382 instrument_id: InstrumentId,
1383 client_id: Option<ClientId>,
1384 params: Option<IndexMap<String, String>>,
1385 ) {
1386 self.check_registered();
1387
1388 let topic = get_index_price_topic(instrument_id);
1389 let actor_id = self.actor_id.inner();
1390 let handler = self.get_or_create_handler_for_topic(topic, || {
1391 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1392 move |index_price: &IndexPriceUpdate| {
1393 get_actor_unchecked::<A>(&actor_id).handle_index_price(index_price);
1394 },
1395 )))
1396 });
1397
1398 msgbus::subscribe_topic(topic, handler, None);
1399
1400 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
1401 instrument_id,
1402 client_id,
1403 venue: Some(instrument_id.venue),
1404 command_id: UUID4::new(),
1405 ts_init: self.generate_ts_init(),
1406 params,
1407 });
1408
1409 self.send_data_cmd(DataCommand::Subscribe(command));
1410 }
1411
1412 pub fn subscribe_instrument_status<A: DataActor>(
1417 &mut self,
1418 instrument_id: InstrumentId,
1419 client_id: Option<ClientId>,
1420 params: Option<IndexMap<String, String>>,
1421 ) {
1422 self.check_registered();
1423
1424 let topic = get_instrument_status_topic(instrument_id);
1425 let actor_id = self.actor_id.inner();
1426 let handler = self.get_or_create_handler_for_topic(topic, || {
1427 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1428 move |status: &InstrumentStatus| {
1429 get_actor_unchecked::<A>(&actor_id).handle_instrument_status(status);
1430 },
1431 )))
1432 });
1433
1434 msgbus::subscribe_topic(topic, handler, None);
1435
1436 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
1437 instrument_id,
1438 client_id,
1439 venue: Some(instrument_id.venue),
1440 command_id: UUID4::new(),
1441 ts_init: self.generate_ts_init(),
1442 params,
1443 });
1444
1445 self.send_data_cmd(DataCommand::Subscribe(command));
1446 }
1447
1448 pub fn subscribe_instrument_close<A: DataActor>(
1453 &mut self,
1454 instrument_id: InstrumentId,
1455 client_id: Option<ClientId>,
1456 params: Option<IndexMap<String, String>>,
1457 ) {
1458 self.check_registered();
1459
1460 let topic = get_instrument_close_topic(instrument_id);
1461 let actor_id = self.actor_id.inner();
1462 let handler = self.get_or_create_handler_for_topic(topic, || {
1463 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1464 move |close: &InstrumentClose| {
1465 get_actor_unchecked::<A>(&actor_id).handle_instrument_close(close);
1466 },
1467 )))
1468 });
1469
1470 msgbus::subscribe_topic(topic, handler, None);
1471
1472 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
1473 instrument_id,
1474 client_id,
1475 venue: Some(instrument_id.venue),
1476 command_id: UUID4::new(),
1477 ts_init: self.generate_ts_init(),
1478 params,
1479 });
1480
1481 self.send_data_cmd(DataCommand::Subscribe(command));
1482 }
1483
1484 pub fn unsubscribe_data<A: DataActor>(
1486 &self,
1487 data_type: DataType,
1488 client_id: Option<ClientId>,
1489 params: Option<IndexMap<String, String>>,
1490 ) {
1491 self.check_registered();
1492
1493 let topic = get_custom_topic(&data_type);
1494 if let Some(handler) = self.topic_handlers.get(&topic) {
1495 msgbus::unsubscribe_topic(topic, handler.clone());
1496 };
1497
1498 if client_id.is_none() {
1499 return;
1500 }
1501
1502 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
1503 data_type,
1504 client_id,
1505 venue: None,
1506 command_id: UUID4::new(),
1507 ts_init: self.generate_ts_init(),
1508 params,
1509 });
1510
1511 self.send_data_cmd(DataCommand::Unsubscribe(command));
1512 }
1513
1514 pub fn unsubscribe_instruments<A: DataActor>(
1516 &self,
1517 venue: Venue,
1518 client_id: Option<ClientId>,
1519 params: Option<IndexMap<String, String>>,
1520 ) {
1521 self.check_registered();
1522
1523 let topic = get_instruments_topic(venue);
1524 if let Some(handler) = self.topic_handlers.get(&topic) {
1525 msgbus::unsubscribe_topic(topic, handler.clone());
1526 };
1527
1528 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
1529 client_id,
1530 venue,
1531 command_id: UUID4::new(),
1532 ts_init: self.generate_ts_init(),
1533 params,
1534 });
1535
1536 self.send_data_cmd(DataCommand::Unsubscribe(command));
1537 }
1538
1539 pub fn unsubscribe_instrument<A: DataActor>(
1541 &self,
1542 instrument_id: InstrumentId,
1543 client_id: Option<ClientId>,
1544 params: Option<IndexMap<String, String>>,
1545 ) {
1546 self.check_registered();
1547
1548 let topic = get_instrument_topic(instrument_id);
1549 if let Some(handler) = self.topic_handlers.get(&topic) {
1550 msgbus::unsubscribe_topic(topic, handler.clone());
1551 };
1552
1553 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
1554 instrument_id,
1555 client_id,
1556 venue: Some(instrument_id.venue),
1557 command_id: UUID4::new(),
1558 ts_init: self.generate_ts_init(),
1559 params,
1560 });
1561
1562 self.send_data_cmd(DataCommand::Unsubscribe(command));
1563 }
1564
1565 pub fn unsubscribe_book_deltas<A: DataActor>(
1567 &self,
1568 instrument_id: InstrumentId,
1569 client_id: Option<ClientId>,
1570 params: Option<IndexMap<String, String>>,
1571 ) {
1572 self.check_registered();
1573
1574 let topic = get_book_deltas_topic(instrument_id);
1575 if let Some(handler) = self.topic_handlers.get(&topic) {
1576 msgbus::unsubscribe_topic(topic, handler.clone());
1577 };
1578
1579 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
1580 instrument_id,
1581 client_id,
1582 venue: Some(instrument_id.venue),
1583 command_id: UUID4::new(),
1584 ts_init: self.generate_ts_init(),
1585 params,
1586 });
1587
1588 self.send_data_cmd(DataCommand::Unsubscribe(command));
1589 }
1590
1591 pub fn unsubscribe_book_at_interval<A: DataActor>(
1595 &mut self,
1596 instrument_id: InstrumentId,
1597 interval_ms: NonZeroUsize,
1598 client_id: Option<ClientId>,
1599 params: Option<IndexMap<String, String>>,
1600 ) {
1601 self.check_registered();
1602
1603 let topic = get_book_snapshots_topic(instrument_id);
1604 if let Some(handler) = self.topic_handlers.get(&topic) {
1605 msgbus::unsubscribe_topic(topic, handler.clone());
1606 };
1607
1608 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
1609 instrument_id,
1610 client_id,
1611 venue: Some(instrument_id.venue),
1612 command_id: UUID4::new(),
1613 ts_init: self.generate_ts_init(),
1614 params,
1615 });
1616
1617 self.send_data_cmd(DataCommand::Unsubscribe(command));
1618 }
1619
1620 pub fn unsubscribe_quotes<A: DataActor>(
1622 &self,
1623 instrument_id: InstrumentId,
1624 client_id: Option<ClientId>,
1625 params: Option<IndexMap<String, String>>,
1626 ) {
1627 self.check_registered();
1628
1629 let topic = get_quotes_topic(instrument_id);
1630 if let Some(handler) = self.topic_handlers.get(&topic) {
1631 msgbus::unsubscribe_topic(topic, handler.clone());
1632 };
1633
1634 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
1635 instrument_id,
1636 client_id,
1637 venue: Some(instrument_id.venue),
1638 command_id: UUID4::new(),
1639 ts_init: self.generate_ts_init(),
1640 params,
1641 });
1642
1643 self.send_data_cmd(DataCommand::Unsubscribe(command));
1644 }
1645
1646 pub fn unsubscribe_trades<A: DataActor>(
1648 &self,
1649 instrument_id: InstrumentId,
1650 client_id: Option<ClientId>,
1651 params: Option<IndexMap<String, String>>,
1652 ) {
1653 self.check_registered();
1654
1655 let topic = get_trades_topic(instrument_id);
1656 if let Some(handler) = self.topic_handlers.get(&topic) {
1657 msgbus::unsubscribe_topic(topic, handler.clone());
1658 };
1659
1660 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
1661 instrument_id,
1662 client_id,
1663 venue: Some(instrument_id.venue),
1664 command_id: UUID4::new(),
1665 ts_init: self.generate_ts_init(),
1666 params,
1667 });
1668
1669 self.send_data_cmd(DataCommand::Unsubscribe(command));
1670 }
1671
1672 pub fn unsubscribe_bars<A: DataActor>(
1674 &mut self,
1675 bar_type: BarType,
1676 client_id: Option<ClientId>,
1677 params: Option<IndexMap<String, String>>,
1678 ) {
1679 self.check_registered();
1680
1681 let topic = get_bars_topic(bar_type);
1682 if let Some(handler) = self.topic_handlers.get(&topic) {
1683 msgbus::unsubscribe_topic(topic, handler.clone());
1684 };
1685
1686 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
1687 bar_type,
1688 client_id,
1689 venue: Some(bar_type.instrument_id().venue),
1690 command_id: UUID4::new(),
1691 ts_init: self.generate_ts_init(),
1692 params,
1693 });
1694
1695 self.send_data_cmd(DataCommand::Unsubscribe(command));
1696 }
1697
1698 pub fn unsubscribe_mark_prices<A: DataActor>(
1700 &self,
1701 instrument_id: InstrumentId,
1702 client_id: Option<ClientId>,
1703 params: Option<IndexMap<String, String>>,
1704 ) {
1705 self.check_registered();
1706
1707 let topic = get_mark_price_topic(instrument_id);
1708 if let Some(handler) = self.topic_handlers.get(&topic) {
1709 msgbus::unsubscribe_topic(topic, handler.clone());
1710 };
1711
1712 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
1713 instrument_id,
1714 client_id,
1715 venue: Some(instrument_id.venue),
1716 command_id: UUID4::new(),
1717 ts_init: self.generate_ts_init(),
1718 params,
1719 });
1720
1721 self.send_data_cmd(DataCommand::Unsubscribe(command));
1722 }
1723
1724 pub fn unsubscribe_index_prices<A: DataActor>(
1726 &self,
1727 instrument_id: InstrumentId,
1728 client_id: Option<ClientId>,
1729 params: Option<IndexMap<String, String>>,
1730 ) {
1731 self.check_registered();
1732
1733 let topic = get_index_price_topic(instrument_id);
1734 if let Some(handler) = self.topic_handlers.get(&topic) {
1735 msgbus::unsubscribe_topic(topic, handler.clone());
1736 };
1737
1738 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
1739 instrument_id,
1740 client_id,
1741 venue: Some(instrument_id.venue),
1742 command_id: UUID4::new(),
1743 ts_init: self.generate_ts_init(),
1744 params,
1745 });
1746
1747 self.send_data_cmd(DataCommand::Unsubscribe(command));
1748 }
1749
1750 pub fn unsubscribe_instrument_status<A: DataActor>(
1752 &self,
1753 instrument_id: InstrumentId,
1754 client_id: Option<ClientId>,
1755 params: Option<IndexMap<String, String>>,
1756 ) {
1757 self.check_registered();
1758
1759 let topic = get_instrument_status_topic(instrument_id);
1760 if let Some(handler) = self.topic_handlers.get(&topic) {
1761 msgbus::unsubscribe_topic(topic, handler.clone());
1762 };
1763
1764 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
1765 instrument_id,
1766 client_id,
1767 venue: Some(instrument_id.venue),
1768 command_id: UUID4::new(),
1769 ts_init: self.generate_ts_init(),
1770 params,
1771 });
1772
1773 self.send_data_cmd(DataCommand::Unsubscribe(command));
1774 }
1775
1776 pub fn unsubscribe_instrument_close<A: DataActor>(
1778 &self,
1779 instrument_id: InstrumentId,
1780 client_id: Option<ClientId>,
1781 params: Option<IndexMap<String, String>>,
1782 ) {
1783 self.check_registered();
1784
1785 let topic = get_instrument_close_topic(instrument_id);
1786 if let Some(handler) = self.topic_handlers.get(&topic) {
1787 msgbus::unsubscribe_topic(topic, handler.clone());
1788 };
1789
1790 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
1791 instrument_id,
1792 client_id,
1793 venue: Some(instrument_id.venue),
1794 command_id: UUID4::new(),
1795 ts_init: self.generate_ts_init(),
1796 params,
1797 });
1798
1799 self.send_data_cmd(DataCommand::Unsubscribe(command));
1800 }
1801
1802 pub fn request_data<A: DataActor>(
1812 &self,
1813 data_type: DataType,
1814 client_id: ClientId,
1815 start: Option<DateTime<Utc>>,
1816 end: Option<DateTime<Utc>>,
1817 limit: Option<NonZeroUsize>,
1818 params: Option<IndexMap<String, String>>,
1819 ) -> anyhow::Result<UUID4> {
1820 self.check_registered();
1821
1822 let now = self.clock.borrow().utc_now();
1823 check_timestamps(now, start, end)?;
1824
1825 let request_id = UUID4::new();
1826 let command = RequestCommand::Data(RequestCustomData {
1827 client_id,
1828 data_type,
1829 request_id,
1830 ts_init: self.generate_ts_init(),
1831 params,
1832 });
1833
1834 let actor_id = self.actor_id.inner();
1835 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1836 move |response: &CustomDataResponse| {
1837 get_actor_unchecked::<A>(&actor_id).handle_data_response(response);
1838 },
1839 )));
1840
1841 let msgbus = get_message_bus()
1842 .borrow_mut()
1843 .register_response_handler(command.request_id(), handler);
1844
1845 self.send_data_cmd(DataCommand::Request(command));
1846
1847 Ok(request_id)
1848 }
1849
1850 pub fn request_instrument<A: DataActor>(
1858 &self,
1859 instrument_id: InstrumentId,
1860 start: Option<DateTime<Utc>>,
1861 end: Option<DateTime<Utc>>,
1862 client_id: Option<ClientId>,
1863 params: Option<IndexMap<String, String>>,
1864 ) -> anyhow::Result<UUID4> {
1865 self.check_registered();
1866
1867 let now = self.clock.borrow().utc_now();
1868 check_timestamps(now, start, end)?;
1869
1870 let request_id = UUID4::new();
1871 let command = RequestCommand::Instrument(RequestInstrument {
1872 instrument_id,
1873 start,
1874 end,
1875 client_id,
1876 request_id,
1877 ts_init: now.into(),
1878 params,
1879 });
1880
1881 let actor_id = self.actor_id.inner();
1882 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1883 move |response: &InstrumentResponse| {
1884 get_actor_unchecked::<A>(&actor_id).handle_instrument_response(response);
1885 },
1886 )));
1887
1888 let msgbus = get_message_bus()
1889 .borrow_mut()
1890 .register_response_handler(command.request_id(), handler);
1891
1892 self.send_data_cmd(DataCommand::Request(command));
1893
1894 Ok(request_id)
1895 }
1896
1897 pub fn request_instruments<A: DataActor>(
1905 &self,
1906 venue: Option<Venue>,
1907 start: Option<DateTime<Utc>>,
1908 end: Option<DateTime<Utc>>,
1909 client_id: Option<ClientId>,
1910 params: Option<IndexMap<String, String>>,
1911 ) -> anyhow::Result<UUID4> {
1912 self.check_registered();
1913
1914 let now = self.clock.borrow().utc_now();
1915 check_timestamps(now, start, end)?;
1916
1917 let request_id = UUID4::new();
1918 let command = RequestCommand::Instruments(RequestInstruments {
1919 venue,
1920 start,
1921 end,
1922 client_id,
1923 request_id,
1924 ts_init: now.into(),
1925 params,
1926 });
1927
1928 let actor_id = self.actor_id.inner();
1929 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1930 move |response: &InstrumentsResponse| {
1931 get_actor_unchecked::<A>(&actor_id).handle_instruments_response(response);
1932 },
1933 )));
1934
1935 let msgbus = get_message_bus()
1936 .borrow_mut()
1937 .register_response_handler(command.request_id(), handler);
1938
1939 self.send_data_cmd(DataCommand::Request(command));
1940
1941 Ok(request_id)
1942 }
1943
1944 pub fn request_book_snapshot<A: DataActor>(
1952 &self,
1953 instrument_id: InstrumentId,
1954 depth: Option<NonZeroUsize>,
1955 client_id: Option<ClientId>,
1956 params: Option<IndexMap<String, String>>,
1957 ) -> anyhow::Result<UUID4> {
1958 self.check_registered();
1959
1960 let request_id = UUID4::new();
1961 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
1962 instrument_id,
1963 depth,
1964 client_id,
1965 request_id,
1966 ts_init: self.generate_ts_init(),
1967 params,
1968 });
1969
1970 let actor_id = self.actor_id.inner();
1971 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1972 move |response: &BookResponse| {
1973 get_actor_unchecked::<A>(&actor_id).handle_book_response(response);
1974 },
1975 )));
1976
1977 let msgbus = get_message_bus()
1978 .borrow_mut()
1979 .register_response_handler(command.request_id(), handler);
1980
1981 self.send_data_cmd(DataCommand::Request(command));
1982
1983 Ok(request_id)
1984 }
1985
1986 pub fn request_quotes<A: DataActor>(
1994 &self,
1995 instrument_id: InstrumentId,
1996 start: Option<DateTime<Utc>>,
1997 end: Option<DateTime<Utc>>,
1998 limit: Option<NonZeroUsize>,
1999 client_id: Option<ClientId>,
2000 params: Option<IndexMap<String, String>>,
2001 ) -> anyhow::Result<UUID4> {
2002 self.check_registered();
2003
2004 let now = self.clock.borrow().utc_now();
2005 check_timestamps(now, start, end)?;
2006
2007 let request_id = UUID4::new();
2008 let command = RequestCommand::Quotes(RequestQuotes {
2009 instrument_id,
2010 start,
2011 end,
2012 limit,
2013 client_id,
2014 request_id,
2015 ts_init: now.into(),
2016 params,
2017 });
2018
2019 let actor_id = self.actor_id.inner();
2020 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
2021 move |response: &QuotesResponse| {
2022 get_actor_unchecked::<A>(&actor_id).handle_quotes_response(response);
2023 },
2024 )));
2025
2026 let msgbus = get_message_bus()
2027 .borrow_mut()
2028 .register_response_handler(command.request_id(), handler);
2029
2030 self.send_data_cmd(DataCommand::Request(command));
2031
2032 Ok(request_id)
2033 }
2034
2035 pub fn request_trades<A: DataActor>(
2043 &self,
2044 instrument_id: InstrumentId,
2045 start: Option<DateTime<Utc>>,
2046 end: Option<DateTime<Utc>>,
2047 limit: Option<NonZeroUsize>,
2048 client_id: Option<ClientId>,
2049 params: Option<IndexMap<String, String>>,
2050 ) -> anyhow::Result<UUID4> {
2051 self.check_registered();
2052
2053 let now = self.clock.borrow().utc_now();
2054 check_timestamps(now, start, end)?;
2055
2056 let request_id = UUID4::new();
2057 let command = RequestCommand::Trades(RequestTrades {
2058 instrument_id,
2059 start,
2060 end,
2061 limit,
2062 client_id,
2063 request_id,
2064 ts_init: now.into(),
2065 params,
2066 });
2067
2068 let actor_id = self.actor_id.inner();
2069 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
2070 move |response: &TradesResponse| {
2071 get_actor_unchecked::<A>(&actor_id).handle_trades_response(response);
2072 },
2073 )));
2074
2075 let msgbus = get_message_bus()
2076 .borrow_mut()
2077 .register_response_handler(command.request_id(), handler);
2078
2079 self.send_data_cmd(DataCommand::Request(command));
2080
2081 Ok(request_id)
2082 }
2083
2084 pub fn request_bars<A: DataActor>(
2092 &self,
2093 bar_type: BarType,
2094 start: Option<DateTime<Utc>>,
2095 end: Option<DateTime<Utc>>,
2096 limit: Option<NonZeroUsize>,
2097 client_id: Option<ClientId>,
2098 params: Option<IndexMap<String, String>>,
2099 ) -> anyhow::Result<UUID4> {
2100 self.check_registered();
2101
2102 let now = self.clock.borrow().utc_now();
2103 check_timestamps(now, start, end)?;
2104
2105 let request_id = UUID4::new();
2106 let command = RequestCommand::Bars(RequestBars {
2107 bar_type,
2108 start,
2109 end,
2110 limit,
2111 client_id,
2112 request_id,
2113 ts_init: now.into(),
2114 params,
2115 });
2116
2117 let actor_id = self.actor_id.inner();
2118 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
2119 move |response: &BarsResponse| {
2120 get_actor_unchecked::<A>(&actor_id).handle_bars_response(response);
2121 },
2122 )));
2123
2124 let msgbus = get_message_bus()
2125 .borrow_mut()
2126 .register_response_handler(command.request_id(), handler);
2127
2128 self.send_data_cmd(DataCommand::Request(command));
2129
2130 Ok(request_id)
2131 }
2132}
2133
2134fn check_timestamps(
2135 now: DateTime<Utc>,
2136 start: Option<DateTime<Utc>>,
2137 end: Option<DateTime<Utc>>,
2138) -> anyhow::Result<()> {
2139 if let Some(start) = start {
2140 check_predicate_true(start <= now, "start was > now")?
2141 }
2142 if let Some(end) = end {
2143 check_predicate_true(end <= now, "end was > now")?
2144 }
2145
2146 if let (Some(start), Some(end)) = (start, end) {
2147 check_predicate_true(start < end, "start was >= end")?
2148 }
2149
2150 Ok(())
2151}
2152
2153fn log_error(e: &anyhow::Error) {
2154 log::error!("{e}");
2155}
2156
2157fn log_not_running<T>(msg: &T)
2158where
2159 T: Debug,
2160{
2161 log::warn!("Received message when not running - skipping {msg:?}");
2163}
2164
2165fn log_received<T>(msg: &T)
2166where
2167 T: Debug,
2168{
2169 log::debug!("{RECV} {msg:?}");
2170}