1#![allow(dead_code)]
18#![allow(unused_variables)]
19#![allow(unused_imports)]
20
21use std::{
22 any::{Any, TypeId},
23 cell::{RefCell, UnsafeCell},
24 collections::{HashMap, HashSet},
25 num::NonZeroUsize,
26 ops::{Deref, DerefMut},
27 rc::Rc,
28 sync::Arc,
29};
30
31use nautilus_core::{UUID4, UnixNanos};
32use nautilus_model::{
33 data::{
34 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
35 OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
36 },
37 enums::BookType,
38 identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
39 instruments::{Instrument, InstrumentAny},
40 orderbook::OrderBook,
41};
42use ustr::Ustr;
43use uuid::Uuid;
44
45use super::{
46 Actor, executor::ActorExecutor, indicators::Indicators, registry::get_actor_unchecked,
47};
48use crate::{
49 cache::Cache,
50 clock::Clock,
51 enums::{ComponentState, ComponentTrigger},
52 logging::{CMD, RECV, SENT},
53 messages::{
54 data::{
55 DataCommand, DataRequest, DataResponse, RequestBars, RequestInstrument,
56 RequestInstruments, RequestOrderBookSnapshot, RequestQuoteTicks, RequestTradeTicks,
57 SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
58 SubscribeData, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
59 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
60 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
61 UnsubscribeCommand, UnsubscribeData, UnsubscribeIndexPrices, UnsubscribeInstrument,
62 UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
63 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
64 },
65 system::ShutdownSystem,
66 },
67 msgbus::{
68 self, get_message_bus,
69 handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
70 switchboard::{
71 self, MessagingSwitchboard, get_bars_topic, get_book_deltas_topic,
72 get_book_snapshots_topic, get_custom_topic, get_index_price_topic,
73 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
74 get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
75 },
76 },
77 signal::Signal,
78};
79
80#[derive(Debug, Clone)]
82pub struct DataActorConfig {
83 pub actor_id: Option<ActorId>,
85 pub log_events: bool,
87 pub log_commands: bool,
89}
90
91impl Default for DataActorConfig {
92 fn default() -> Self {
93 Self {
94 actor_id: None,
95 log_events: true,
96 log_commands: true,
97 }
98 }
99}
100
101type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; impl Actor for DataActorCore {
104 fn id(&self) -> Ustr {
105 self.actor_id.inner()
106 }
107
108 fn handle(&mut self, msg: &dyn Any) {}
109
110 fn as_any(&self) -> &dyn Any {
111 self
112 }
113}
114
115pub trait DataActor: Actor {
116 fn on_save(&self) -> anyhow::Result<HashMap<String, Vec<u8>>> {
118 Ok(HashMap::new())
119 }
120
121 fn on_load(&mut self, state: HashMap<String, Vec<u8>>) -> anyhow::Result<()> {
123 Ok(())
124 }
125
126 fn on_start(&mut self) -> anyhow::Result<()> {
128 Ok(())
129 }
130
131 fn on_stop(&mut self) -> anyhow::Result<()> {
133 Ok(())
134 }
135
136 fn on_resume(&mut self) -> anyhow::Result<()> {
138 Ok(())
139 }
140
141 fn on_reset(&mut self) -> anyhow::Result<()> {
143 Ok(())
144 }
145
146 fn on_dispose(&mut self) -> anyhow::Result<()> {
148 Ok(())
149 }
150
151 fn on_degrade(&mut self) -> anyhow::Result<()> {
153 Ok(())
154 }
155
156 fn on_fault(&mut self) -> anyhow::Result<()> {
158 Ok(())
159 }
160
161 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
167 Ok(())
168 }
169
170 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
172 Ok(())
173 }
174
175 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
177 Ok(())
178 }
179
180 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
182 Ok(())
183 }
184
185 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
187 Ok(())
188 }
189
190 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
192 Ok(())
193 }
194
195 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
197 Ok(())
198 }
199
200 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
202 Ok(())
203 }
204
205 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
207 Ok(())
208 }
209
210 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
212 Ok(())
213 }
214
215 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
217 Ok(())
218 }
219
220 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
222 Ok(())
223 }
224
225 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
227 Ok(())
229 }
230}
231
232pub struct DataActorCore {
234 pub actor_id: ActorId,
236 pub config: DataActorConfig,
238 pub clock: Rc<RefCell<dyn Clock>>,
240 pub cache: Rc<RefCell<Cache>>,
242 state: ComponentState,
243 trader_id: Option<TraderId>,
244 executor: Option<Arc<dyn ActorExecutor>>, warning_events: HashSet<String>, pending_requests: HashMap<UUID4, Option<RequestCallback>>,
247 signal_classes: HashMap<String, String>,
248 #[cfg(feature = "indicators")]
249 indicators: Indicators,
250}
251
252impl DataActor for DataActorCore {}
253
254impl DataActorCore {
255 pub fn new(
257 config: DataActorConfig,
258 cache: Rc<RefCell<Cache>>,
259 clock: Rc<RefCell<dyn Clock>>,
260 switchboard: Arc<MessagingSwitchboard>,
261 ) -> Self {
262 let actor_id = config.actor_id.unwrap_or(ActorId::new("DataActor")); Self {
265 actor_id,
266 config,
267 clock,
268 cache,
269 state: ComponentState::default(),
270 trader_id: None, executor: None,
272 warning_events: HashSet::new(),
273 pending_requests: HashMap::new(),
274 signal_classes: HashMap::new(),
275 #[cfg(feature = "indicators")]
276 indicators: Indicators::default(),
277 }
278 }
279
280 pub fn trader_id(&self) -> Option<TraderId> {
282 self.trader_id
283 }
284
285 pub fn state(&self) -> ComponentState {
287 self.state
288 }
289
290 pub fn is_ready(&self) -> bool {
291 self.state == ComponentState::Ready
292 }
293
294 pub fn is_running(&self) -> bool {
295 self.state == ComponentState::Running
296 }
297
298 pub fn is_stopped(&self) -> bool {
299 self.state == ComponentState::Stopped
300 }
301
302 pub fn is_disposed(&self) -> bool {
303 self.state == ComponentState::Disposed
304 }
305
306 pub fn is_degraded(&self) -> bool {
307 self.state == ComponentState::Degraded
308 }
309
310 pub fn is_faulting(&self) -> bool {
311 self.state == ComponentState::Faulted
312 }
313
314 pub fn register_executor(&mut self, executor: Arc<dyn ActorExecutor>) {
318 self.executor = Some(executor);
319 }
321
322 pub fn register_warning_event(&mut self, event_type: &str) {
324 self.warning_events.insert(event_type.to_string());
325 }
326
327 pub fn deregister_warning_event(&mut self, event_type: &str) {
329 self.warning_events.remove(event_type);
330 }
332
333 fn check_registered(&self) {
334 assert!(
335 self.trader_id.is_some(),
336 "Actor has not been registered with a Trader"
337 );
338 }
339
340 fn generate_ts_init(&self) -> UnixNanos {
341 self.clock.borrow().timestamp_ns()
342 }
343
344 fn send_data_cmd(&self, command: DataCommand) {
345 if self.config.log_commands {
346 log::info!("{CMD}{SENT} {command:?}");
347 }
348
349 let endpoint = MessagingSwitchboard::data_engine_execute();
350 msgbus::send(&endpoint, command.as_any())
351 }
352
353 pub fn start(&mut self) -> anyhow::Result<()> {
354 self.state.transition(&ComponentTrigger::Start)?; if let Err(e) = self.on_start() {
357 log_error(&e);
358 return Err(e); }
360
361 self.state.transition(&ComponentTrigger::StartCompleted)?;
362
363 Ok(())
364 }
365
366 pub fn stop(&mut self) -> anyhow::Result<()> {
367 self.state.transition(&ComponentTrigger::Stop)?; if let Err(e) = self.on_stop() {
370 log_error(&e);
371 return Err(e); }
373
374 self.state.transition(&ComponentTrigger::StopCompleted)?;
375
376 Ok(())
377 }
378
379 pub fn resume(&mut self) -> anyhow::Result<()> {
380 self.state.transition(&ComponentTrigger::Resume)?; if let Err(e) = self.on_stop() {
383 log_error(&e);
384 return Err(e); }
386
387 self.state.transition(&ComponentTrigger::ResumeCompleted)?;
388
389 Ok(())
390 }
391
392 pub fn reset(&mut self) -> anyhow::Result<()> {
393 self.state.transition(&ComponentTrigger::Reset)?; if let Err(e) = self.on_reset() {
396 log_error(&e);
397 return Err(e); }
399
400 self.state.transition(&ComponentTrigger::ResetCompleted)?;
401
402 Ok(())
403 }
404
405 pub fn dispose(&mut self) -> anyhow::Result<()> {
406 self.state.transition(&ComponentTrigger::Dispose)?; if let Err(e) = self.on_dispose() {
409 log_error(&e);
410 return Err(e); }
412
413 self.state.transition(&ComponentTrigger::DisposeCompleted)?;
414
415 Ok(())
416 }
417
418 pub fn degrade(&mut self) -> anyhow::Result<()> {
419 self.state.transition(&ComponentTrigger::Degrade)?; if let Err(e) = self.on_degrade() {
422 log_error(&e);
423 return Err(e); }
425
426 self.state.transition(&ComponentTrigger::DegradeCompleted)?;
427
428 Ok(())
429 }
430
431 pub fn fault(&mut self) -> anyhow::Result<()> {
432 self.state.transition(&ComponentTrigger::Fault)?; if let Err(e) = self.on_fault() {
435 log_error(&e);
436 return Err(e); }
438
439 self.state.transition(&ComponentTrigger::FaultCompleted)?;
440
441 Ok(())
442 }
443
444 pub fn shutdown_system(&self, reason: Option<String>) {
445 self.check_registered();
446
447 let command = ShutdownSystem::new(
449 self.trader_id().unwrap(),
450 self.actor_id.inner(),
451 reason,
452 UUID4::new(),
453 self.clock.borrow().timestamp_ns(),
454 );
455
456 let topic = Ustr::from("command.system.shutdown");
457 msgbus::send(&topic, command.as_any());
458 }
459
460 pub fn subscribe_data(
464 &self,
465 data_type: DataType,
466 client_id: Option<ClientId>,
467 params: Option<HashMap<String, String>>,
468 ) {
469 self.check_registered();
470
471 let actor_id = self.actor_id.inner();
472 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
473 move |data: &dyn Any| {
474 get_actor_unchecked::<DataActorCore>(&actor_id).handle(data);
475 },
476 )));
477
478 let topic = get_custom_topic(&data_type);
479 msgbus::subscribe(topic, handler, None);
480
481 if client_id.is_none() {
482 return;
484 }
485
486 let command = SubscribeCommand::Data(SubscribeData {
487 data_type,
488 client_id,
489 venue: None,
490 command_id: UUID4::new(),
491 ts_init: self.generate_ts_init(),
492 params,
493 });
494
495 self.send_data_cmd(DataCommand::Subscribe(command));
496 }
497
498 pub fn subscribe_instruments(
500 &self,
501 venue: Venue,
502 client_id: Option<ClientId>,
503 params: Option<HashMap<String, String>>,
504 ) {
505 self.check_registered();
506
507 let actor_id = self.actor_id.inner();
508 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
509 move |instrument: &InstrumentAny| {
510 get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument(instrument);
511 },
512 )));
513
514 let topic = get_instruments_topic(venue);
515 msgbus::subscribe(topic, handler, None);
516
517 let command = SubscribeCommand::Instruments(SubscribeInstruments {
518 client_id,
519 venue,
520 command_id: UUID4::new(),
521 ts_init: self.generate_ts_init(),
522 params,
523 });
524
525 self.send_data_cmd(DataCommand::Subscribe(command));
526 }
527
528 pub fn subscribe_instrument(
530 &self,
531 instrument_id: InstrumentId,
532 client_id: Option<ClientId>,
533 params: Option<HashMap<String, String>>,
534 ) {
535 self.check_registered();
536
537 let actor_id = self.actor_id.inner();
538 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
539 move |instrument: &InstrumentAny| {
540 get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument(instrument);
541 },
542 )));
543
544 let topic = get_instrument_topic(instrument_id);
545 msgbus::subscribe(topic, handler, None);
546
547 let command = SubscribeCommand::Instrument(SubscribeInstrument {
548 instrument_id,
549 client_id,
550 venue: Some(instrument_id.venue),
551 command_id: UUID4::new(),
552 ts_init: self.generate_ts_init(),
553 params,
554 });
555
556 self.send_data_cmd(DataCommand::Subscribe(command));
557 }
558
559 pub fn subscribe_book_deltas(
564 &self,
565 instrument_id: InstrumentId,
566 book_type: BookType,
567 depth: Option<NonZeroUsize>,
568 client_id: Option<ClientId>,
569 managed: bool,
570 params: Option<HashMap<String, String>>,
571 ) {
572 self.check_registered();
573
574 let actor_id = self.actor_id.inner();
575 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
576 move |deltas: &OrderBookDeltas| {
577 get_actor_unchecked::<DataActorCore>(&actor_id).handle_book_deltas(deltas);
578 },
579 )));
580
581 let topic = get_book_deltas_topic(instrument_id);
582 msgbus::subscribe(topic, handler, None);
583
584 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
585 instrument_id,
586 book_type,
587 client_id,
588 venue: Some(instrument_id.venue),
589 command_id: UUID4::new(),
590 ts_init: self.generate_ts_init(),
591 depth,
592 managed,
593 params,
594 });
595
596 self.send_data_cmd(DataCommand::Subscribe(command));
597 }
598
599 pub fn subscribe_book_snapshots(
608 &self,
609 instrument_id: InstrumentId,
610 book_type: BookType,
611 depth: Option<NonZeroUsize>,
612 interval_ms: NonZeroUsize,
613 client_id: Option<ClientId>,
614 params: Option<HashMap<String, String>>,
615 ) {
616 self.check_registered();
617
618 if book_type == BookType::L1_MBP && depth.is_some_and(|d| d.get() > 1) {
619 log::error!(
620 "Cannot subscribe to order book snapshots: L1 MBP book subscription depth > 1, was {:?}",
621 depth,
622 );
623 return;
624 }
625
626 let actor_id = self.actor_id.inner();
627 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
628 move |book: &OrderBook| {
629 get_actor_unchecked::<DataActorCore>(&actor_id).handle_book(book);
630 },
631 )));
632
633 let topic = get_book_snapshots_topic(instrument_id);
634 msgbus::subscribe(topic, handler, None);
635
636 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
637 instrument_id,
638 book_type,
639 client_id,
640 venue: Some(instrument_id.venue),
641 command_id: UUID4::new(),
642 ts_init: self.generate_ts_init(),
643 depth,
644 interval_ms,
645 params,
646 });
647
648 self.send_data_cmd(DataCommand::Subscribe(command));
649 }
650
651 pub fn subscribe_quotes(
653 &self,
654 instrument_id: InstrumentId,
655 client_id: Option<ClientId>,
656 params: Option<HashMap<String, String>>,
657 ) {
658 self.check_registered();
659
660 let actor_id = self.actor_id.inner();
661 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
662 move |quote: &QuoteTick| {
663 get_actor_unchecked::<DataActorCore>(&actor_id).handle_quote(quote);
664 },
665 )));
666
667 let topic = get_trades_topic(instrument_id);
668 msgbus::subscribe(topic, handler, None);
669
670 let command = SubscribeCommand::Quotes(SubscribeQuotes {
671 instrument_id,
672 client_id,
673 venue: Some(instrument_id.venue),
674 command_id: UUID4::new(),
675 ts_init: self.generate_ts_init(),
676 params,
677 });
678
679 self.send_data_cmd(DataCommand::Subscribe(command));
680 }
681
682 pub fn subscribe_trades(
684 &self,
685 instrument_id: InstrumentId,
686 client_id: Option<ClientId>,
687 params: Option<HashMap<String, String>>,
688 ) {
689 self.check_registered();
690
691 let actor_id = self.actor_id.inner();
692 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
693 move |trade: &TradeTick| {
694 get_actor_unchecked::<DataActorCore>(&actor_id).handle_trade(trade);
695 },
696 )));
697
698 let topic = get_trades_topic(instrument_id);
699 msgbus::subscribe(topic, handler, None);
700
701 let command = SubscribeCommand::Trades(SubscribeTrades {
702 instrument_id,
703 client_id,
704 venue: Some(instrument_id.venue),
705 command_id: UUID4::new(),
706 ts_init: self.generate_ts_init(),
707 params,
708 });
709
710 self.send_data_cmd(DataCommand::Subscribe(command));
711 }
712
713 pub fn subscribe_bars(
718 &self,
719 bar_type: BarType,
720 client_id: Option<ClientId>,
721 await_partial: bool,
722 params: Option<HashMap<String, String>>,
723 ) {
724 self.check_registered();
725
726 let actor_id = self.actor_id.inner();
727 let handler =
728 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
729 get_actor_unchecked::<DataActorCore>(&actor_id).handle_bar(bar);
730 })));
731
732 let topic = get_bars_topic(bar_type);
733 msgbus::subscribe(topic, handler, None);
734
735 let command = SubscribeCommand::Bars(SubscribeBars {
736 bar_type,
737 client_id,
738 venue: Some(bar_type.instrument_id().venue),
739 command_id: UUID4::new(),
740 ts_init: self.generate_ts_init(),
741 await_partial,
742 params,
743 });
744
745 self.send_data_cmd(DataCommand::Subscribe(command));
746 }
747
748 pub fn subscribe_mark_prices(
753 &self,
754 instrument_id: InstrumentId,
755 client_id: Option<ClientId>,
756 params: Option<HashMap<String, String>>,
757 ) {
758 self.check_registered();
759
760 let actor_id = self.actor_id.inner();
761 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
762 move |mark_price: &MarkPriceUpdate| {
763 get_actor_unchecked::<DataActorCore>(&actor_id).handle_mark_price(mark_price);
764 },
765 )));
766
767 let topic = get_mark_price_topic(instrument_id);
768 msgbus::subscribe(topic, handler, None);
769
770 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
771 instrument_id,
772 client_id,
773 venue: Some(instrument_id.venue),
774 command_id: UUID4::new(),
775 ts_init: self.generate_ts_init(),
776 params,
777 });
778
779 self.send_data_cmd(DataCommand::Subscribe(command));
780 }
781
782 pub fn subscribe_index_prices(
787 &self,
788 instrument_id: InstrumentId,
789 client_id: Option<ClientId>,
790 params: Option<HashMap<String, String>>,
791 ) {
792 self.check_registered();
793
794 let actor_id = self.actor_id.inner();
795 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
796 move |index_price: &IndexPriceUpdate| {
797 get_actor_unchecked::<DataActorCore>(&actor_id).handle_index_price(index_price);
798 },
799 )));
800
801 let topic = get_index_price_topic(instrument_id);
802 msgbus::subscribe(topic, handler, None);
803
804 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
805 instrument_id,
806 client_id,
807 venue: Some(instrument_id.venue),
808 command_id: UUID4::new(),
809 ts_init: self.generate_ts_init(),
810 params,
811 });
812
813 self.send_data_cmd(DataCommand::Subscribe(command));
814 }
815
816 pub fn subscribe_instrument_status(
821 &self,
822 instrument_id: InstrumentId,
823 client_id: Option<ClientId>,
824 params: Option<HashMap<String, String>>,
825 ) {
826 self.check_registered();
827
828 let actor_id = self.actor_id.inner();
829 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
830 move |status: &InstrumentStatus| {
831 get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument_status(status);
832 },
833 )));
834
835 let topic = get_instrument_status_topic(instrument_id);
836 msgbus::subscribe(topic, handler, None);
837
838 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
839 instrument_id,
840 client_id,
841 venue: Some(instrument_id.venue),
842 command_id: UUID4::new(),
843 ts_init: self.generate_ts_init(),
844 params,
845 });
846
847 self.send_data_cmd(DataCommand::Subscribe(command));
848 }
849
850 pub fn subscribe_instrument_close(
855 &self,
856 instrument_id: InstrumentId,
857 client_id: Option<ClientId>,
858 params: Option<HashMap<String, String>>,
859 ) {
860 self.check_registered();
861
862 let actor_id = self.actor_id.inner();
863 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
864 move |close: &InstrumentClose| {
865 get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument_close(close);
866 },
867 )));
868
869 let topic = get_instrument_close_topic(instrument_id);
871 msgbus::subscribe(topic, handler, None);
872
873 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
874 instrument_id,
875 client_id,
876 venue: Some(instrument_id.venue),
877 command_id: UUID4::new(),
878 ts_init: self.generate_ts_init(),
879 params,
880 });
881
882 self.send_data_cmd(DataCommand::Subscribe(command));
883 }
884
885 pub fn unsubscribe_data(
887 &self,
888 data_type: DataType,
889 client_id: Option<ClientId>,
890 params: Option<HashMap<String, String>>,
891 ) {
892 self.check_registered();
893
894 let topic = get_custom_topic(&data_type);
895 if client_id.is_none() {
898 return;
899 }
900
901 let command = UnsubscribeCommand::Data(UnsubscribeData {
902 data_type,
903 client_id,
904 venue: None,
905 command_id: UUID4::new(),
906 ts_init: self.generate_ts_init(),
907 params,
908 });
909
910 self.send_data_cmd(DataCommand::Unsubscribe(command));
911 }
912
913 pub fn unsubscribe_instruments(
915 &self,
916 venue: Venue,
917 client_id: Option<ClientId>,
918 params: Option<HashMap<String, String>>,
919 ) {
920 self.check_registered();
921
922 let topic = get_instruments_topic(venue);
923 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
926 client_id,
927 venue,
928 command_id: UUID4::new(),
929 ts_init: self.generate_ts_init(),
930 params,
931 });
932
933 self.send_data_cmd(DataCommand::Unsubscribe(command));
934 }
935
936 pub fn unsubscribe_instrument(
937 &self,
938 instrument_id: InstrumentId,
939 client_id: Option<ClientId>,
940 params: Option<HashMap<String, String>>,
941 ) {
942 self.check_registered();
943
944 let topic = get_instrument_topic(instrument_id);
945 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
948 instrument_id,
949 client_id,
950 venue: Some(instrument_id.venue),
951 command_id: UUID4::new(),
952 ts_init: self.generate_ts_init(),
953 params,
954 });
955
956 self.send_data_cmd(DataCommand::Unsubscribe(command));
957 }
958
959 pub fn unsubscribe_book_deltas(
960 &self,
961 instrument_id: InstrumentId,
962 client_id: Option<ClientId>,
963 params: Option<HashMap<String, String>>,
964 ) {
965 self.check_registered();
966
967 let topic = get_book_deltas_topic(instrument_id);
968 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
971 instrument_id,
972 client_id,
973 venue: Some(instrument_id.venue),
974 command_id: UUID4::new(),
975 ts_init: self.generate_ts_init(),
976 params,
977 });
978
979 self.send_data_cmd(DataCommand::Unsubscribe(command));
980 }
981
982 pub fn unsubscribe_book_snapshots(
984 &self,
985 instrument_id: InstrumentId,
986 client_id: Option<ClientId>,
987 params: Option<HashMap<String, String>>,
988 ) {
989 self.check_registered();
990
991 let topic = get_book_snapshots_topic(instrument_id);
992 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
995 instrument_id,
996 client_id,
997 venue: Some(instrument_id.venue),
998 command_id: UUID4::new(),
999 ts_init: self.generate_ts_init(),
1000 params,
1001 });
1002
1003 self.send_data_cmd(DataCommand::Unsubscribe(command));
1004 }
1005
1006 pub fn unsubscribe_quote_ticks(
1008 &self,
1009 instrument_id: InstrumentId,
1010 client_id: Option<ClientId>,
1011 params: Option<HashMap<String, String>>,
1012 ) {
1013 self.check_registered();
1014
1015 let topic = get_quotes_topic(instrument_id);
1016 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
1019 instrument_id,
1020 client_id,
1021 venue: Some(instrument_id.venue),
1022 command_id: UUID4::new(),
1023 ts_init: self.generate_ts_init(),
1024 params,
1025 });
1026
1027 self.send_data_cmd(DataCommand::Unsubscribe(command));
1028 }
1029
1030 pub fn unsubscribe_trade_ticks(
1032 &self,
1033 instrument_id: InstrumentId,
1034 client_id: Option<ClientId>,
1035 params: Option<HashMap<String, String>>,
1036 ) {
1037 self.check_registered();
1038
1039 let topic = get_trades_topic(instrument_id);
1040 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
1043 instrument_id,
1044 client_id,
1045 venue: Some(instrument_id.venue),
1046 command_id: UUID4::new(),
1047 ts_init: self.generate_ts_init(),
1048 params,
1049 });
1050
1051 self.send_data_cmd(DataCommand::Unsubscribe(command));
1052 }
1053
1054 pub fn unsubscribe_bars(
1056 &self,
1057 bar_type: BarType,
1058 client_id: Option<ClientId>,
1059 params: Option<HashMap<String, String>>,
1060 ) {
1061 self.check_registered();
1062
1063 let topic = get_bars_topic(bar_type);
1064 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
1067 bar_type,
1068 client_id,
1069 venue: Some(bar_type.instrument_id().venue),
1070 command_id: UUID4::new(),
1071 ts_init: self.generate_ts_init(),
1072 params,
1073 });
1074
1075 self.send_data_cmd(DataCommand::Unsubscribe(command));
1076 }
1077
1078 pub fn unsubscribe_mark_prices(
1080 &self,
1081 instrument_id: InstrumentId,
1082 client_id: Option<ClientId>,
1083 params: Option<HashMap<String, String>>,
1084 ) {
1085 self.check_registered();
1086
1087 let topic = get_mark_price_topic(instrument_id);
1088 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
1091 instrument_id,
1092 client_id,
1093 venue: Some(instrument_id.venue),
1094 command_id: UUID4::new(),
1095 ts_init: self.generate_ts_init(),
1096 params,
1097 });
1098
1099 self.send_data_cmd(DataCommand::Unsubscribe(command));
1100 }
1101
1102 pub fn unsubscribe_index_prices(
1104 &self,
1105 instrument_id: InstrumentId,
1106 client_id: Option<ClientId>,
1107 params: Option<HashMap<String, String>>,
1108 ) {
1109 self.check_registered();
1110
1111 let topic = get_index_price_topic(instrument_id);
1112 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
1115 instrument_id,
1116 client_id,
1117 venue: Some(instrument_id.venue),
1118 command_id: UUID4::new(),
1119 ts_init: self.generate_ts_init(),
1120 params,
1121 });
1122
1123 self.send_data_cmd(DataCommand::Unsubscribe(command));
1124 }
1125
1126 pub fn unsubscribe_instrument_status(
1128 &self,
1129 instrument_id: InstrumentId,
1130 client_id: Option<ClientId>,
1131 params: Option<HashMap<String, String>>,
1132 ) {
1133 self.check_registered();
1134
1135 let topic = get_instrument_status_topic(instrument_id);
1136 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
1139 instrument_id,
1140 client_id,
1141 venue: Some(instrument_id.venue),
1142 command_id: UUID4::new(),
1143 ts_init: self.generate_ts_init(),
1144 params,
1145 });
1146
1147 self.send_data_cmd(DataCommand::Unsubscribe(command));
1148 }
1149
1150 pub fn unsubscribe_instrument_close(
1152 &self,
1153 instrument_id: InstrumentId,
1154 client_id: Option<ClientId>,
1155 params: Option<HashMap<String, String>>,
1156 ) {
1157 self.check_registered();
1158
1159 let topic = get_instrument_close_topic(instrument_id);
1160 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
1163 instrument_id,
1164 client_id,
1165 venue: Some(instrument_id.venue),
1166 command_id: UUID4::new(),
1167 ts_init: self.generate_ts_init(),
1168 params,
1169 });
1170
1171 self.send_data_cmd(DataCommand::Unsubscribe(command));
1172 }
1173
1174 pub fn handle_data(&mut self, data: &dyn Any) {
1178 log_received(&data);
1179
1180 if !self.is_running() {
1181 return;
1182 }
1183
1184 if let Err(e) = self.on_data(data) {
1185 log_error(&e);
1186 }
1187 }
1188
1189 pub(crate) fn handle_instrument(&mut self, instrument: &InstrumentAny) {
1191 log_received(&instrument);
1192
1193 if !self.is_running() {
1194 return;
1195 }
1196
1197 if let Err(e) = self.on_instrument(instrument) {
1198 log_error(&e);
1199 }
1200 }
1201
1202 pub(crate) fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
1204 log_received(&deltas);
1205
1206 if !self.is_running() {
1207 return;
1208 }
1209
1210 if let Err(e) = self.on_book_deltas(deltas) {
1211 log_error(&e);
1212 }
1213 }
1214
1215 pub(crate) fn handle_book(&mut self, book: &OrderBook) {
1217 log_received(&book);
1218
1219 if !self.is_running() {
1220 return;
1221 }
1222
1223 if let Err(e) = self.on_book(book) {
1224 log_error(&e);
1225 };
1226 }
1227
1228 pub(crate) fn handle_quote(&mut self, quote: &QuoteTick) {
1230 log_received("e);
1231
1232 if !self.is_running() {
1233 return;
1234 }
1235
1236 if let Err(e) = self.on_quote(quote) {
1237 log_error(&e);
1238 }
1239 }
1240
1241 pub(crate) fn handle_trade(&mut self, trade: &TradeTick) {
1243 log_received(&trade);
1244
1245 if !self.is_running() {
1246 return;
1247 }
1248
1249 if let Err(e) = self.on_trade(trade) {
1250 log_error(&e);
1251 }
1252 }
1253
1254 pub(crate) fn handle_bar(&mut self, bar: &Bar) {
1256 log_received(&bar);
1257
1258 if !self.is_running() {
1259 return;
1260 }
1261
1262 if let Err(e) = self.on_bar(bar) {
1263 log_error(&e);
1264 }
1265 }
1266
1267 pub(crate) fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
1269 log_received(&mark_price);
1270
1271 if !self.is_running() {
1272 return;
1273 }
1274
1275 if let Err(e) = self.on_mark_price(mark_price) {
1276 log_error(&e);
1277 }
1278 }
1279
1280 pub(crate) fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
1282 log_received(&index_price);
1283
1284 if !self.is_running() {
1285 return;
1286 }
1287
1288 if let Err(e) = self.on_index_price(index_price) {
1289 log_error(&e);
1290 }
1291 }
1292
1293 pub(crate) fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
1295 log_received(&status);
1296
1297 if !self.is_running() {
1298 return;
1299 }
1300
1301 if let Err(e) = self.on_instrument_status(status) {
1302 log_error(&e);
1303 }
1304 }
1305
1306 pub(crate) fn handle_instrument_close(&mut self, close: &InstrumentClose) {
1308 log_received(&close);
1309
1310 if !self.is_running() {
1311 return;
1312 }
1313
1314 if let Err(e) = self.on_instrument_close(close) {
1315 log_error(&e);
1316 }
1317 }
1318
1319 pub(crate) fn handle_instruments(&mut self, instruments: &Vec<InstrumentAny>) {
1321 for instrument in instruments {
1322 self.handle_instrument(instrument);
1323 }
1324 }
1325
1326 pub(crate) fn handle_quotes(&mut self, quotes: &Vec<QuoteTick>) {
1328 for quote in quotes {
1329 self.handle_quote(quote);
1330 }
1331 }
1332
1333 pub(crate) fn handle_trades(&mut self, trades: &Vec<TradeTick>) {
1335 for trade in trades {
1336 self.handle_trade(trade);
1337 }
1338 }
1339
1340 pub(crate) fn handle_bars(&mut self, bars: &Vec<Bar>) {
1342 for bar in bars {
1343 self.handle_bar(bar);
1344 }
1345 }
1346
1347 pub(crate) fn handle_historical_data(&mut self, data: &dyn Any) {
1349 log_received(&data);
1350
1351 if let Err(e) = self.on_historical_data(data) {
1352 log_error(&e);
1353 }
1354 }
1355
1356 pub(crate) fn handle_signal(&mut self, signal: &Signal) {
1358 log_received(&signal);
1359
1360 if !self.is_running() {
1361 return;
1362 }
1363
1364 if let Err(e) = self.on_signal(signal) {
1365 log_error(&e);
1366 }
1367 }
1368}
1369
1370fn log_error(e: &anyhow::Error) {
1371 log::error!("{e}");
1372}
1373
1374fn log_received<T>(msg: &T)
1375where
1376 T: std::fmt::Debug,
1377{
1378 log::debug!("{} {:?}", RECV, msg);
1379}
1380
1381#[cfg(test)]
1385mod tests {
1386 use std::{
1387 any::Any,
1388 cell::{RefCell, UnsafeCell},
1389 ops::{Deref, DerefMut},
1390 rc::Rc,
1391 sync::Arc,
1392 };
1393
1394 use nautilus_model::{
1395 data::{QuoteTick, TradeTick},
1396 identifiers::ActorId,
1397 instruments::CurrencyPair,
1398 orderbook::OrderBook,
1399 };
1400 use rstest::{fixture, rstest};
1401 use ustr::Ustr;
1402
1403 use super::{Actor, DataActor, DataActorConfig, DataActorCore};
1404 use crate::{
1405 actor::registry::{get_actor_unchecked, register_actor},
1406 cache::Cache,
1407 clock::{Clock, TestClock},
1408 msgbus::{
1409 self,
1410 switchboard::{MessagingSwitchboard, get_quotes_topic, get_trades_topic},
1411 },
1412 };
1413
1414 struct TestDataActor {
1415 core: DataActorCore,
1416 pub received_quotes: Vec<TradeTick>,
1417 pub received_trades: Vec<TradeTick>,
1418 }
1419
1420 impl Deref for TestDataActor {
1421 type Target = DataActorCore;
1422
1423 fn deref(&self) -> &Self::Target {
1424 &self.core
1425 }
1426 }
1427
1428 impl DerefMut for TestDataActor {
1429 fn deref_mut(&mut self) -> &mut Self::Target {
1430 &mut self.core
1431 }
1432 }
1433
1434 impl Actor for TestDataActor {
1435 fn id(&self) -> Ustr {
1436 self.core.actor_id.inner()
1437 }
1438
1439 fn handle(&mut self, msg: &dyn Any) {
1440 self.core.handle(msg);
1442 }
1443
1444 fn as_any(&self) -> &dyn Any {
1445 self
1446 }
1447 }
1448
1449 impl DataActor for TestDataActor {
1451 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
1452 Ok(())
1453 }
1454
1455 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
1456 Ok(())
1457 }
1458
1459 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
1460 Ok(())
1461 }
1462
1463 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
1464 self.received_trades.push(*trade);
1465 Ok(())
1466 }
1467 }
1468
1469 impl TestDataActor {
1471 pub fn new(
1472 config: DataActorConfig,
1473 cache: Rc<RefCell<Cache>>,
1474 clock: Rc<RefCell<dyn Clock>>,
1475 switchboard: Arc<MessagingSwitchboard>,
1476 ) -> Self {
1477 Self {
1478 core: DataActorCore::new(config, cache, clock, switchboard),
1479 received_quotes: Vec::new(),
1480 received_trades: Vec::new(),
1481 }
1482 }
1483 pub fn custom_function(&mut self) {}
1484 }
1485
1486 #[fixture]
1487 pub fn clock() -> Rc<RefCell<TestClock>> {
1488 Rc::new(RefCell::new(TestClock::new()))
1489 }
1490
1491 #[fixture]
1492 pub fn cache() -> Rc<RefCell<Cache>> {
1493 Rc::new(RefCell::new(Cache::new(None, None)))
1494 }
1495
1496 #[fixture]
1497 pub fn switchboard() -> Arc<MessagingSwitchboard> {
1498 Arc::new(MessagingSwitchboard::default())
1499 }
1500
1501 fn register_data_actor(
1502 clock: Rc<RefCell<TestClock>>,
1503 cache: Rc<RefCell<Cache>>,
1504 switchboard: Arc<MessagingSwitchboard>,
1505 ) {
1506 let config = DataActorConfig::default();
1507 let actor = TestDataActor::new(config, cache, clock, switchboard);
1508 let actor_rc = Rc::new(UnsafeCell::new(actor));
1509 register_actor(actor_rc);
1510 }
1511
1512 fn test_subscribe_and_receive_quotes(
1513 clock: Rc<RefCell<TestClock>>,
1514 cache: Rc<RefCell<Cache>>,
1515 switchboard: Arc<MessagingSwitchboard>,
1516 audusd_sim: CurrencyPair,
1517 ) {
1518 register_data_actor(clock.clone(), cache.clone(), switchboard.clone());
1519
1520 let actor_id = ActorId::new("DataActor").inner(); let actor = get_actor_unchecked::<TestDataActor>(&actor_id);
1522 actor.subscribe_quotes(audusd_sim.id, None, None);
1523
1524 let topic = get_quotes_topic(audusd_sim.id);
1525 let trade = QuoteTick::default();
1526 msgbus::publish(&topic, &trade);
1527 msgbus::publish(&topic, &trade);
1528
1529 assert_eq!(actor.received_quotes.len(), 2);
1530 }
1531
1532 fn test_subscribe_and_receive_trades(
1533 clock: Rc<RefCell<TestClock>>,
1534 cache: Rc<RefCell<Cache>>,
1535 switchboard: Arc<MessagingSwitchboard>,
1536 audusd_sim: CurrencyPair,
1537 ) {
1538 register_data_actor(clock.clone(), cache.clone(), switchboard.clone());
1539
1540 let actor_id = ActorId::new("DataActor").inner(); let actor = get_actor_unchecked::<TestDataActor>(&actor_id);
1542 actor.subscribe_trades(audusd_sim.id, None, None);
1543
1544 let topic = get_trades_topic(audusd_sim.id);
1545 let trade = TradeTick::default();
1546 msgbus::publish(&topic, &trade);
1547 msgbus::publish(&topic, &trade);
1548
1549 assert_eq!(actor.received_trades.len(), 2);
1550 }
1551}