nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19#![allow(unused_imports)]
20
21use std::{
22    any::{Any, TypeId},
23    cell::{RefCell, UnsafeCell},
24    collections::{HashMap, HashSet},
25    num::NonZeroUsize,
26    ops::{Deref, DerefMut},
27    rc::Rc,
28    sync::Arc,
29};
30
31use nautilus_core::{UUID4, UnixNanos};
32use nautilus_model::{
33    data::{
34        Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
35        OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
36    },
37    enums::BookType,
38    identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
39    instruments::{Instrument, InstrumentAny},
40    orderbook::OrderBook,
41};
42use ustr::Ustr;
43use uuid::Uuid;
44
45use super::{
46    Actor, executor::ActorExecutor, indicators::Indicators, registry::get_actor_unchecked,
47};
48use crate::{
49    cache::Cache,
50    clock::Clock,
51    enums::{ComponentState, ComponentTrigger},
52    logging::{CMD, RECV, SENT},
53    messages::{
54        data::{
55            DataCommand, DataRequest, DataResponse, RequestBars, RequestInstrument,
56            RequestInstruments, RequestOrderBookSnapshot, RequestQuoteTicks, RequestTradeTicks,
57            SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
58            SubscribeData, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
59            SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
60            SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
61            UnsubscribeCommand, UnsubscribeData, UnsubscribeIndexPrices, UnsubscribeInstrument,
62            UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
63            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
64        },
65        system::ShutdownSystem,
66    },
67    msgbus::{
68        self, get_message_bus,
69        handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
70        switchboard::{
71            self, MessagingSwitchboard, get_bars_topic, get_book_deltas_topic,
72            get_book_snapshots_topic, get_custom_topic, get_index_price_topic,
73            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
74            get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
75        },
76    },
77    signal::Signal,
78};
79
80/// Configuration for Actor components.
81#[derive(Debug, Clone)]
82pub struct DataActorConfig {
83    /// The custom identifier for the Actor.
84    pub actor_id: Option<ActorId>,
85    /// Whether to log events.
86    pub log_events: bool,
87    /// Whether to log commands.
88    pub log_commands: bool,
89}
90
91impl Default for DataActorConfig {
92    fn default() -> Self {
93        Self {
94            actor_id: None,
95            log_events: true,
96            log_commands: true,
97        }
98    }
99}
100
101type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; // TODO: TBD
102
103impl Actor for DataActorCore {
104    fn id(&self) -> Ustr {
105        self.actor_id.inner()
106    }
107
108    fn handle(&mut self, msg: &dyn Any) {}
109
110    fn as_any(&self) -> &dyn Any {
111        self
112    }
113}
114
115pub trait DataActor: Actor {
116    /// Actions to be performed when the actor state is saved.
117    fn on_save(&self) -> anyhow::Result<HashMap<String, Vec<u8>>> {
118        Ok(HashMap::new())
119    }
120
121    /// Actions to be performed when the actor state is loaded.
122    fn on_load(&mut self, state: HashMap<String, Vec<u8>>) -> anyhow::Result<()> {
123        Ok(())
124    }
125
126    /// Actions to be performed on start.
127    fn on_start(&mut self) -> anyhow::Result<()> {
128        Ok(())
129    }
130
131    /// Actions to be performed on stop.
132    fn on_stop(&mut self) -> anyhow::Result<()> {
133        Ok(())
134    }
135
136    /// Actions to be performed on resume.
137    fn on_resume(&mut self) -> anyhow::Result<()> {
138        Ok(())
139    }
140
141    /// Actions to be performed on reset.
142    fn on_reset(&mut self) -> anyhow::Result<()> {
143        Ok(())
144    }
145
146    /// Actions to be performed on dispose.
147    fn on_dispose(&mut self) -> anyhow::Result<()> {
148        Ok(())
149    }
150
151    /// Actions to be performed on degrade.
152    fn on_degrade(&mut self) -> anyhow::Result<()> {
153        Ok(())
154    }
155
156    /// Actions to be performed on fault.
157    fn on_fault(&mut self) -> anyhow::Result<()> {
158        Ok(())
159    }
160
161    // Actions to be performed when receiving an event.
162    // pub fn on_event(&mut self, event: &i Event) {  // TODO: TBD
163    //     // Default empty implementation
164    // }
165    //
166    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
167        Ok(())
168    }
169
170    /// Actions to be performed when receiving a signal.
171    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
172        Ok(())
173    }
174
175    /// Actions to be performed when receiving an instrument.
176    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
177        Ok(())
178    }
179
180    /// Actions to be performed when receiving order book deltas.
181    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
182        Ok(())
183    }
184
185    /// Actions to be performed when receiving an order book.
186    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
187        Ok(())
188    }
189
190    /// Actions to be performed when receiving a quote.
191    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
192        Ok(())
193    }
194
195    /// Actions to be performed when receiving a trade.
196    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
197        Ok(())
198    }
199
200    /// Actions to be performed when receiving a bar.
201    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
202        Ok(())
203    }
204
205    /// Actions to be performed when receiving a mark price update.
206    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
207        Ok(())
208    }
209
210    /// Actions to be performed when receiving an index price update.
211    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
212        Ok(())
213    }
214
215    /// Actions to be performed when receiving an instrument status update.
216    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
217        Ok(())
218    }
219
220    /// Actions to be performed when receiving an instrument close update.
221    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
222        Ok(())
223    }
224
225    /// Actions to be performed when receiving historical data.
226    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
227        // TODO: Probably break this down into more granular methods
228        Ok(())
229    }
230}
231
232/// Core functionality for all actors.
233pub struct DataActorCore {
234    /// The component ID for the actor.
235    pub actor_id: ActorId,
236    /// The actors configuration.
237    pub config: DataActorConfig,
238    /// The actors clock.
239    pub clock: Rc<RefCell<dyn Clock>>,
240    /// The read-only cache for the actor.
241    pub cache: Rc<RefCell<Cache>>,
242    state: ComponentState,
243    trader_id: Option<TraderId>,
244    executor: Option<Arc<dyn ActorExecutor>>, // TODO: TBD
245    warning_events: HashSet<String>,          // TODO: TBD
246    pending_requests: HashMap<UUID4, Option<RequestCallback>>,
247    signal_classes: HashMap<String, String>,
248    #[cfg(feature = "indicators")]
249    indicators: Indicators,
250}
251
252impl DataActor for DataActorCore {}
253
254impl DataActorCore {
255    /// Creates a new [`Actor`] instance.
256    pub fn new(
257        config: DataActorConfig,
258        cache: Rc<RefCell<Cache>>,
259        clock: Rc<RefCell<dyn Clock>>,
260        switchboard: Arc<MessagingSwitchboard>,
261    ) -> Self {
262        let actor_id = config.actor_id.unwrap_or(ActorId::new("DataActor")); // TODO: Determine default ID
263
264        Self {
265            actor_id,
266            config,
267            clock,
268            cache,
269            state: ComponentState::default(),
270            trader_id: None, // None until registered
271            executor: None,
272            warning_events: HashSet::new(),
273            pending_requests: HashMap::new(),
274            signal_classes: HashMap::new(),
275            #[cfg(feature = "indicators")]
276            indicators: Indicators::default(),
277        }
278    }
279
280    /// Returns the trader ID this actor is registered to.
281    pub fn trader_id(&self) -> Option<TraderId> {
282        self.trader_id
283    }
284
285    // TODO: Extract this common state logic and handling out to some component module
286    pub fn state(&self) -> ComponentState {
287        self.state
288    }
289
290    pub fn is_ready(&self) -> bool {
291        self.state == ComponentState::Ready
292    }
293
294    pub fn is_running(&self) -> bool {
295        self.state == ComponentState::Running
296    }
297
298    pub fn is_stopped(&self) -> bool {
299        self.state == ComponentState::Stopped
300    }
301
302    pub fn is_disposed(&self) -> bool {
303        self.state == ComponentState::Disposed
304    }
305
306    pub fn is_degraded(&self) -> bool {
307        self.state == ComponentState::Degraded
308    }
309
310    pub fn is_faulting(&self) -> bool {
311        self.state == ComponentState::Faulted
312    }
313
314    // -- REGISTRATION ----------------------------------------------------------------------------
315
316    /// Register an executor for the actor.
317    pub fn register_executor(&mut self, executor: Arc<dyn ActorExecutor>) {
318        self.executor = Some(executor);
319        // TODO: Log registration
320    }
321
322    /// Register an event type for warning log levels.
323    pub fn register_warning_event(&mut self, event_type: &str) {
324        self.warning_events.insert(event_type.to_string());
325    }
326
327    /// Deregister an event type from warning log levels.
328    pub fn deregister_warning_event(&mut self, event_type: &str) {
329        self.warning_events.remove(event_type);
330        // TODO: Log deregistration
331    }
332
333    fn check_registered(&self) {
334        assert!(
335            self.trader_id.is_some(),
336            "Actor has not been registered with a Trader"
337        );
338    }
339
340    fn generate_ts_init(&self) -> UnixNanos {
341        self.clock.borrow().timestamp_ns()
342    }
343
344    fn send_data_cmd(&self, command: DataCommand) {
345        if self.config.log_commands {
346            log::info!("{CMD}{SENT} {command:?}");
347        }
348
349        let endpoint = MessagingSwitchboard::data_engine_execute();
350        msgbus::send(&endpoint, command.as_any())
351    }
352
353    pub fn start(&mut self) -> anyhow::Result<()> {
354        self.state.transition(&ComponentTrigger::Start)?; // -> Starting
355
356        if let Err(e) = self.on_start() {
357            log_error(&e);
358            return Err(e); // Halt state transition
359        }
360
361        self.state.transition(&ComponentTrigger::StartCompleted)?;
362
363        Ok(())
364    }
365
366    pub fn stop(&mut self) -> anyhow::Result<()> {
367        self.state.transition(&ComponentTrigger::Stop)?; // -> Stopping
368
369        if let Err(e) = self.on_stop() {
370            log_error(&e);
371            return Err(e); // Halt state transition
372        }
373
374        self.state.transition(&ComponentTrigger::StopCompleted)?;
375
376        Ok(())
377    }
378
379    pub fn resume(&mut self) -> anyhow::Result<()> {
380        self.state.transition(&ComponentTrigger::Resume)?; // -> Resuming
381
382        if let Err(e) = self.on_stop() {
383            log_error(&e);
384            return Err(e); // Halt state transition
385        }
386
387        self.state.transition(&ComponentTrigger::ResumeCompleted)?;
388
389        Ok(())
390    }
391
392    pub fn reset(&mut self) -> anyhow::Result<()> {
393        self.state.transition(&ComponentTrigger::Reset)?; // -> Resetting
394
395        if let Err(e) = self.on_reset() {
396            log_error(&e);
397            return Err(e); // Halt state transition
398        }
399
400        self.state.transition(&ComponentTrigger::ResetCompleted)?;
401
402        Ok(())
403    }
404
405    pub fn dispose(&mut self) -> anyhow::Result<()> {
406        self.state.transition(&ComponentTrigger::Dispose)?; // -> Disposing
407
408        if let Err(e) = self.on_dispose() {
409            log_error(&e);
410            return Err(e); // Halt state transition
411        }
412
413        self.state.transition(&ComponentTrigger::DisposeCompleted)?;
414
415        Ok(())
416    }
417
418    pub fn degrade(&mut self) -> anyhow::Result<()> {
419        self.state.transition(&ComponentTrigger::Degrade)?; // -> Degrading
420
421        if let Err(e) = self.on_degrade() {
422            log_error(&e);
423            return Err(e); // Halt state transition
424        }
425
426        self.state.transition(&ComponentTrigger::DegradeCompleted)?;
427
428        Ok(())
429    }
430
431    pub fn fault(&mut self) -> anyhow::Result<()> {
432        self.state.transition(&ComponentTrigger::Fault)?; // -> Faulting
433
434        if let Err(e) = self.on_fault() {
435            log_error(&e);
436            return Err(e); // Halt state transition
437        }
438
439        self.state.transition(&ComponentTrigger::FaultCompleted)?;
440
441        Ok(())
442    }
443
444    pub fn shutdown_system(&self, reason: Option<String>) {
445        self.check_registered();
446
447        // SAFETY: Checked registered before unwrapping trader ID
448        let command = ShutdownSystem::new(
449            self.trader_id().unwrap(),
450            self.actor_id.inner(),
451            reason,
452            UUID4::new(),
453            self.clock.borrow().timestamp_ns(),
454        );
455
456        let topic = Ustr::from("command.system.shutdown");
457        msgbus::send(&topic, command.as_any());
458    }
459
460    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
461
462    /// Subscribe to streaming data of the given data type.
463    pub fn subscribe_data(
464        &self,
465        data_type: DataType,
466        client_id: Option<ClientId>,
467        params: Option<HashMap<String, String>>,
468    ) {
469        self.check_registered();
470
471        let actor_id = self.actor_id.inner();
472        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
473            move |data: &dyn Any| {
474                get_actor_unchecked::<DataActorCore>(&actor_id).handle(data);
475            },
476        )));
477
478        let topic = get_custom_topic(&data_type);
479        msgbus::subscribe(topic, handler, None);
480
481        if client_id.is_none() {
482            // If no client ID specified, just subscribe to the topic
483            return;
484        }
485
486        let command = SubscribeCommand::Data(SubscribeData {
487            data_type,
488            client_id,
489            venue: None,
490            command_id: UUID4::new(),
491            ts_init: self.generate_ts_init(),
492            params,
493        });
494
495        self.send_data_cmd(DataCommand::Subscribe(command));
496    }
497
498    /// Subscribe to streaming [`Instrument`] data for the given venue.
499    pub fn subscribe_instruments(
500        &self,
501        venue: Venue,
502        client_id: Option<ClientId>,
503        params: Option<HashMap<String, String>>,
504    ) {
505        self.check_registered();
506
507        let actor_id = self.actor_id.inner();
508        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
509            move |instrument: &InstrumentAny| {
510                get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument(instrument);
511            },
512        )));
513
514        let topic = get_instruments_topic(venue);
515        msgbus::subscribe(topic, handler, None);
516
517        let command = SubscribeCommand::Instruments(SubscribeInstruments {
518            client_id,
519            venue,
520            command_id: UUID4::new(),
521            ts_init: self.generate_ts_init(),
522            params,
523        });
524
525        self.send_data_cmd(DataCommand::Subscribe(command));
526    }
527
528    /// Subscribe to streaming [`InstrumentAny`] data for the given instrument ID.
529    pub fn subscribe_instrument(
530        &self,
531        instrument_id: InstrumentId,
532        client_id: Option<ClientId>,
533        params: Option<HashMap<String, String>>,
534    ) {
535        self.check_registered();
536
537        let actor_id = self.actor_id.inner();
538        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
539            move |instrument: &InstrumentAny| {
540                get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument(instrument);
541            },
542        )));
543
544        let topic = get_instrument_topic(instrument_id);
545        msgbus::subscribe(topic, handler, None);
546
547        let command = SubscribeCommand::Instrument(SubscribeInstrument {
548            instrument_id,
549            client_id,
550            venue: Some(instrument_id.venue),
551            command_id: UUID4::new(),
552            ts_init: self.generate_ts_init(),
553            params,
554        });
555
556        self.send_data_cmd(DataCommand::Subscribe(command));
557    }
558
559    /// Subscribe to streaming [`OrderBookDeltas`] data for the given instrument ID.
560    ///
561    /// Once subscribed, any matching order book deltas published on the message bus is forwarded
562    /// to the `on_book_deltas` handler.
563    pub fn subscribe_book_deltas(
564        &self,
565        instrument_id: InstrumentId,
566        book_type: BookType,
567        depth: Option<NonZeroUsize>,
568        client_id: Option<ClientId>,
569        managed: bool,
570        params: Option<HashMap<String, String>>,
571    ) {
572        self.check_registered();
573
574        let actor_id = self.actor_id.inner();
575        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
576            move |deltas: &OrderBookDeltas| {
577                get_actor_unchecked::<DataActorCore>(&actor_id).handle_book_deltas(deltas);
578            },
579        )));
580
581        let topic = get_book_deltas_topic(instrument_id);
582        msgbus::subscribe(topic, handler, None);
583
584        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
585            instrument_id,
586            book_type,
587            client_id,
588            venue: Some(instrument_id.venue),
589            command_id: UUID4::new(),
590            ts_init: self.generate_ts_init(),
591            depth,
592            managed,
593            params,
594        });
595
596        self.send_data_cmd(DataCommand::Subscribe(command));
597    }
598
599    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the given instrument ID.
600    ///
601    /// Once subscribed, any matching order book snapshots published on the message bus are forwarded
602    /// to the `on_book` handler.
603    ///
604    /// # Warnings
605    ///
606    /// Consider subscribing to order book deltas if you need intervals less than 100 milliseconds.
607    pub fn subscribe_book_snapshots(
608        &self,
609        instrument_id: InstrumentId,
610        book_type: BookType,
611        depth: Option<NonZeroUsize>,
612        interval_ms: NonZeroUsize,
613        client_id: Option<ClientId>,
614        params: Option<HashMap<String, String>>,
615    ) {
616        self.check_registered();
617
618        if book_type == BookType::L1_MBP && depth.is_some_and(|d| d.get() > 1) {
619            log::error!(
620                "Cannot subscribe to order book snapshots: L1 MBP book subscription depth > 1, was {:?}",
621                depth,
622            );
623            return;
624        }
625
626        let actor_id = self.actor_id.inner();
627        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
628            move |book: &OrderBook| {
629                get_actor_unchecked::<DataActorCore>(&actor_id).handle_book(book);
630            },
631        )));
632
633        let topic = get_book_snapshots_topic(instrument_id);
634        msgbus::subscribe(topic, handler, None);
635
636        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
637            instrument_id,
638            book_type,
639            client_id,
640            venue: Some(instrument_id.venue),
641            command_id: UUID4::new(),
642            ts_init: self.generate_ts_init(),
643            depth,
644            interval_ms,
645            params,
646        });
647
648        self.send_data_cmd(DataCommand::Subscribe(command));
649    }
650
651    /// Subscribe to streaming [`QuoteTick`] data for the given instrument ID.
652    pub fn subscribe_quotes(
653        &self,
654        instrument_id: InstrumentId,
655        client_id: Option<ClientId>,
656        params: Option<HashMap<String, String>>,
657    ) {
658        self.check_registered();
659
660        let actor_id = self.actor_id.inner();
661        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
662            move |quote: &QuoteTick| {
663                get_actor_unchecked::<DataActorCore>(&actor_id).handle_quote(quote);
664            },
665        )));
666
667        let topic = get_trades_topic(instrument_id);
668        msgbus::subscribe(topic, handler, None);
669
670        let command = SubscribeCommand::Quotes(SubscribeQuotes {
671            instrument_id,
672            client_id,
673            venue: Some(instrument_id.venue),
674            command_id: UUID4::new(),
675            ts_init: self.generate_ts_init(),
676            params,
677        });
678
679        self.send_data_cmd(DataCommand::Subscribe(command));
680    }
681
682    /// Subscribe to streaming [`TradeTick`] data for the given instrument ID.
683    pub fn subscribe_trades(
684        &self,
685        instrument_id: InstrumentId,
686        client_id: Option<ClientId>,
687        params: Option<HashMap<String, String>>,
688    ) {
689        self.check_registered();
690
691        let actor_id = self.actor_id.inner();
692        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
693            move |trade: &TradeTick| {
694                get_actor_unchecked::<DataActorCore>(&actor_id).handle_trade(trade);
695            },
696        )));
697
698        let topic = get_trades_topic(instrument_id);
699        msgbus::subscribe(topic, handler, None);
700
701        let command = SubscribeCommand::Trades(SubscribeTrades {
702            instrument_id,
703            client_id,
704            venue: Some(instrument_id.venue),
705            command_id: UUID4::new(),
706            ts_init: self.generate_ts_init(),
707            params,
708        });
709
710        self.send_data_cmd(DataCommand::Subscribe(command));
711    }
712
713    /// Subscribe to streaming [`Bar`] data for the given bar type.
714    ///
715    /// Once subscribed, any matching bar data published on the message bus is forwarded
716    /// to the `on_bar` handler.
717    pub fn subscribe_bars(
718        &self,
719        bar_type: BarType,
720        client_id: Option<ClientId>,
721        await_partial: bool,
722        params: Option<HashMap<String, String>>,
723    ) {
724        self.check_registered();
725
726        let actor_id = self.actor_id.inner();
727        let handler =
728            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
729                get_actor_unchecked::<DataActorCore>(&actor_id).handle_bar(bar);
730            })));
731
732        let topic = get_bars_topic(bar_type);
733        msgbus::subscribe(topic, handler, None);
734
735        let command = SubscribeCommand::Bars(SubscribeBars {
736            bar_type,
737            client_id,
738            venue: Some(bar_type.instrument_id().venue),
739            command_id: UUID4::new(),
740            ts_init: self.generate_ts_init(),
741            await_partial,
742            params,
743        });
744
745        self.send_data_cmd(DataCommand::Subscribe(command));
746    }
747
748    /// Subscribe to streaming [`MarkPriceUpdate`] data for the given instrument ID.
749    ///
750    /// Once subscribed, any matching mark price updates published on the message bus are forwarded
751    /// to the `on_mark_price` handler.
752    pub fn subscribe_mark_prices(
753        &self,
754        instrument_id: InstrumentId,
755        client_id: Option<ClientId>,
756        params: Option<HashMap<String, String>>,
757    ) {
758        self.check_registered();
759
760        let actor_id = self.actor_id.inner();
761        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
762            move |mark_price: &MarkPriceUpdate| {
763                get_actor_unchecked::<DataActorCore>(&actor_id).handle_mark_price(mark_price);
764            },
765        )));
766
767        let topic = get_mark_price_topic(instrument_id);
768        msgbus::subscribe(topic, handler, None);
769
770        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
771            instrument_id,
772            client_id,
773            venue: Some(instrument_id.venue),
774            command_id: UUID4::new(),
775            ts_init: self.generate_ts_init(),
776            params,
777        });
778
779        self.send_data_cmd(DataCommand::Subscribe(command));
780    }
781
782    /// Subscribe to streaming [`IndexPriceUpdate`] data for the given instrument ID.
783    ///
784    /// Once subscribed, any matching index price updates published on the message bus are forwarded
785    /// to the `on_index_price` handler.
786    pub fn subscribe_index_prices(
787        &self,
788        instrument_id: InstrumentId,
789        client_id: Option<ClientId>,
790        params: Option<HashMap<String, String>>,
791    ) {
792        self.check_registered();
793
794        let actor_id = self.actor_id.inner();
795        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
796            move |index_price: &IndexPriceUpdate| {
797                get_actor_unchecked::<DataActorCore>(&actor_id).handle_index_price(index_price);
798            },
799        )));
800
801        let topic = get_index_price_topic(instrument_id);
802        msgbus::subscribe(topic, handler, None);
803
804        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
805            instrument_id,
806            client_id,
807            venue: Some(instrument_id.venue),
808            command_id: UUID4::new(),
809            ts_init: self.generate_ts_init(),
810            params,
811        });
812
813        self.send_data_cmd(DataCommand::Subscribe(command));
814    }
815
816    /// Subscribe to streaming [`InstrumentStatus`] data for the given instrument ID.
817    ///
818    /// Once subscribed, any matching bar data published on the message bus is forwarded
819    /// to the `on_bar` handler.
820    pub fn subscribe_instrument_status(
821        &self,
822        instrument_id: InstrumentId,
823        client_id: Option<ClientId>,
824        params: Option<HashMap<String, String>>,
825    ) {
826        self.check_registered();
827
828        let actor_id = self.actor_id.inner();
829        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
830            move |status: &InstrumentStatus| {
831                get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument_status(status);
832            },
833        )));
834
835        let topic = get_instrument_status_topic(instrument_id);
836        msgbus::subscribe(topic, handler, None);
837
838        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
839            instrument_id,
840            client_id,
841            venue: Some(instrument_id.venue),
842            command_id: UUID4::new(),
843            ts_init: self.generate_ts_init(),
844            params,
845        });
846
847        self.send_data_cmd(DataCommand::Subscribe(command));
848    }
849
850    /// Subscribe to streaming [`InstrumentClose`] data for the given instrument ID.
851    ///
852    /// Once subscribed, any matching instrument close data published on the message bus is forwarded
853    /// to the `on_instrument_close` handler.
854    pub fn subscribe_instrument_close(
855        &self,
856        instrument_id: InstrumentId,
857        client_id: Option<ClientId>,
858        params: Option<HashMap<String, String>>,
859    ) {
860        self.check_registered();
861
862        let actor_id = self.actor_id.inner();
863        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
864            move |close: &InstrumentClose| {
865                get_actor_unchecked::<DataActorCore>(&actor_id).handle_instrument_close(close);
866            },
867        )));
868
869        // Topic may need to be adjusted to match Python implementation
870        let topic = get_instrument_close_topic(instrument_id);
871        msgbus::subscribe(topic, handler, None);
872
873        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
874            instrument_id,
875            client_id,
876            venue: Some(instrument_id.venue),
877            command_id: UUID4::new(),
878            ts_init: self.generate_ts_init(),
879            params,
880        });
881
882        self.send_data_cmd(DataCommand::Subscribe(command));
883    }
884
885    /// Unsubscribe from data of the given data type.
886    pub fn unsubscribe_data(
887        &self,
888        data_type: DataType,
889        client_id: Option<ClientId>,
890        params: Option<HashMap<String, String>>,
891    ) {
892        self.check_registered();
893
894        let topic = get_custom_topic(&data_type);
895        // msgbus::unsubscribe(&topic, self.handle_data);  // TODO
896
897        if client_id.is_none() {
898            return;
899        }
900
901        let command = UnsubscribeCommand::Data(UnsubscribeData {
902            data_type,
903            client_id,
904            venue: None,
905            command_id: UUID4::new(),
906            ts_init: self.generate_ts_init(),
907            params,
908        });
909
910        self.send_data_cmd(DataCommand::Unsubscribe(command));
911    }
912
913    /// Unsubscribe from update `Instrument` data for the given venue.
914    pub fn unsubscribe_instruments(
915        &self,
916        venue: Venue,
917        client_id: Option<ClientId>,
918        params: Option<HashMap<String, String>>,
919    ) {
920        self.check_registered();
921
922        let topic = get_instruments_topic(venue);
923        // msgbus::unsubscribe(&topic, self.handle_instruments);  // TODO!
924
925        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
926            client_id,
927            venue,
928            command_id: UUID4::new(),
929            ts_init: self.generate_ts_init(),
930            params,
931        });
932
933        self.send_data_cmd(DataCommand::Unsubscribe(command));
934    }
935
936    pub fn unsubscribe_instrument(
937        &self,
938        instrument_id: InstrumentId,
939        client_id: Option<ClientId>,
940        params: Option<HashMap<String, String>>,
941    ) {
942        self.check_registered();
943
944        let topic = get_instrument_topic(instrument_id);
945        // msgbus::unsubscribe(&topic, self.handle_instrument);  // TODO
946
947        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
948            instrument_id,
949            client_id,
950            venue: Some(instrument_id.venue),
951            command_id: UUID4::new(),
952            ts_init: self.generate_ts_init(),
953            params,
954        });
955
956        self.send_data_cmd(DataCommand::Unsubscribe(command));
957    }
958
959    pub fn unsubscribe_book_deltas(
960        &self,
961        instrument_id: InstrumentId,
962        client_id: Option<ClientId>,
963        params: Option<HashMap<String, String>>,
964    ) {
965        self.check_registered();
966
967        let topic = get_book_deltas_topic(instrument_id);
968        // msgbus::unsubscribe(&topic, self.handle_book_deltas);
969
970        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
971            instrument_id,
972            client_id,
973            venue: Some(instrument_id.venue),
974            command_id: UUID4::new(),
975            ts_init: self.generate_ts_init(),
976            params,
977        });
978
979        self.send_data_cmd(DataCommand::Unsubscribe(command));
980    }
981
982    /// Unsubscribe from order book snapshots for the given instrument ID.
983    pub fn unsubscribe_book_snapshots(
984        &self,
985        instrument_id: InstrumentId,
986        client_id: Option<ClientId>,
987        params: Option<HashMap<String, String>>,
988    ) {
989        self.check_registered();
990
991        let topic = get_book_snapshots_topic(instrument_id);
992        // msgbus::unsubscribe(&topic, self.handle_book);  // TODO
993
994        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
995            instrument_id,
996            client_id,
997            venue: Some(instrument_id.venue),
998            command_id: UUID4::new(),
999            ts_init: self.generate_ts_init(),
1000            params,
1001        });
1002
1003        self.send_data_cmd(DataCommand::Unsubscribe(command));
1004    }
1005
1006    /// Unsubscribe from streaming `QuoteTick` data for the given instrument ID.
1007    pub fn unsubscribe_quote_ticks(
1008        &self,
1009        instrument_id: InstrumentId,
1010        client_id: Option<ClientId>,
1011        params: Option<HashMap<String, String>>,
1012    ) {
1013        self.check_registered();
1014
1015        let topic = get_quotes_topic(instrument_id);
1016        // msgbus::unsubscribe(&topic, self.handle_quote);  // TODO
1017
1018        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
1019            instrument_id,
1020            client_id,
1021            venue: Some(instrument_id.venue),
1022            command_id: UUID4::new(),
1023            ts_init: self.generate_ts_init(),
1024            params,
1025        });
1026
1027        self.send_data_cmd(DataCommand::Unsubscribe(command));
1028    }
1029
1030    /// Unsubscribe from streaming `TradeTick` data for the given instrument ID.
1031    pub fn unsubscribe_trade_ticks(
1032        &self,
1033        instrument_id: InstrumentId,
1034        client_id: Option<ClientId>,
1035        params: Option<HashMap<String, String>>,
1036    ) {
1037        self.check_registered();
1038
1039        let topic = get_trades_topic(instrument_id);
1040        // msgbus::unsubscribe(&topic, self.handle_trade);  // TODO
1041
1042        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
1043            instrument_id,
1044            client_id,
1045            venue: Some(instrument_id.venue),
1046            command_id: UUID4::new(),
1047            ts_init: self.generate_ts_init(),
1048            params,
1049        });
1050
1051        self.send_data_cmd(DataCommand::Unsubscribe(command));
1052    }
1053
1054    /// Unsubscribe from streaming `Bar` data for the given bar type.
1055    pub fn unsubscribe_bars(
1056        &self,
1057        bar_type: BarType,
1058        client_id: Option<ClientId>,
1059        params: Option<HashMap<String, String>>,
1060    ) {
1061        self.check_registered();
1062
1063        let topic = get_bars_topic(bar_type);
1064        // msgbus::unsubscribe(&topic, self.handle_bar);  // TODO
1065
1066        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
1067            bar_type,
1068            client_id,
1069            venue: Some(bar_type.instrument_id().venue),
1070            command_id: UUID4::new(),
1071            ts_init: self.generate_ts_init(),
1072            params,
1073        });
1074
1075        self.send_data_cmd(DataCommand::Unsubscribe(command));
1076    }
1077
1078    /// Unsubscribe from streaming `MarkPriceUpdate` data for the given instrument ID.
1079    pub fn unsubscribe_mark_prices(
1080        &self,
1081        instrument_id: InstrumentId,
1082        client_id: Option<ClientId>,
1083        params: Option<HashMap<String, String>>,
1084    ) {
1085        self.check_registered();
1086
1087        let topic = get_mark_price_topic(instrument_id);
1088        // msgbus::unsubscribe(&topic, self.handle_mark_price);  // TODO
1089
1090        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
1091            instrument_id,
1092            client_id,
1093            venue: Some(instrument_id.venue),
1094            command_id: UUID4::new(),
1095            ts_init: self.generate_ts_init(),
1096            params,
1097        });
1098
1099        self.send_data_cmd(DataCommand::Unsubscribe(command));
1100    }
1101
1102    /// Unsubscribe from streaming `IndexPriceUpdate` data for the given instrument ID.
1103    pub fn unsubscribe_index_prices(
1104        &self,
1105        instrument_id: InstrumentId,
1106        client_id: Option<ClientId>,
1107        params: Option<HashMap<String, String>>,
1108    ) {
1109        self.check_registered();
1110
1111        let topic = get_index_price_topic(instrument_id);
1112        // msgbus::unsubscribe(&topic, self.handle_index_price);  // TODO
1113
1114        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
1115            instrument_id,
1116            client_id,
1117            venue: Some(instrument_id.venue),
1118            command_id: UUID4::new(),
1119            ts_init: self.generate_ts_init(),
1120            params,
1121        });
1122
1123        self.send_data_cmd(DataCommand::Unsubscribe(command));
1124    }
1125
1126    /// Unsubscribe from instrument status updates for the given instrument ID.
1127    pub fn unsubscribe_instrument_status(
1128        &self,
1129        instrument_id: InstrumentId,
1130        client_id: Option<ClientId>,
1131        params: Option<HashMap<String, String>>,
1132    ) {
1133        self.check_registered();
1134
1135        let topic = get_instrument_status_topic(instrument_id);
1136        // msgbus::unsubscribe(&topic, self.handle_instrument_status);  // TODO
1137
1138        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
1139            instrument_id,
1140            client_id,
1141            venue: Some(instrument_id.venue),
1142            command_id: UUID4::new(),
1143            ts_init: self.generate_ts_init(),
1144            params,
1145        });
1146
1147        self.send_data_cmd(DataCommand::Unsubscribe(command));
1148    }
1149
1150    /// Unsubscribe from instrument close updates for the given instrument ID.
1151    pub fn unsubscribe_instrument_close(
1152        &self,
1153        instrument_id: InstrumentId,
1154        client_id: Option<ClientId>,
1155        params: Option<HashMap<String, String>>,
1156    ) {
1157        self.check_registered();
1158
1159        let topic = get_instrument_close_topic(instrument_id);
1160        // msgbus::unsubscribe(&topic, self.handle_instrument_close);  // TODO
1161
1162        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
1163            instrument_id,
1164            client_id,
1165            venue: Some(instrument_id.venue),
1166            command_id: UUID4::new(),
1167            ts_init: self.generate_ts_init(),
1168            params,
1169        });
1170
1171        self.send_data_cmd(DataCommand::Unsubscribe(command));
1172    }
1173
1174    // -- HANDLERS --------------------------------------------------------------------------------
1175
1176    /// Handles a received custom/generic data point.
1177    pub fn handle_data(&mut self, data: &dyn Any) {
1178        log_received(&data);
1179
1180        if !self.is_running() {
1181            return;
1182        }
1183
1184        if let Err(e) = self.on_data(data) {
1185            log_error(&e);
1186        }
1187    }
1188
1189    /// Handles a received instrument.
1190    pub(crate) fn handle_instrument(&mut self, instrument: &InstrumentAny) {
1191        log_received(&instrument);
1192
1193        if !self.is_running() {
1194            return;
1195        }
1196
1197        if let Err(e) = self.on_instrument(instrument) {
1198            log_error(&e);
1199        }
1200    }
1201
1202    /// Handles received order book deltas.
1203    pub(crate) fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
1204        log_received(&deltas);
1205
1206        if !self.is_running() {
1207            return;
1208        }
1209
1210        if let Err(e) = self.on_book_deltas(deltas) {
1211            log_error(&e);
1212        }
1213    }
1214
1215    /// Handle a received order book reference.
1216    pub(crate) fn handle_book(&mut self, book: &OrderBook) {
1217        log_received(&book);
1218
1219        if !self.is_running() {
1220            return;
1221        }
1222
1223        if let Err(e) = self.on_book(book) {
1224            log_error(&e);
1225        };
1226    }
1227
1228    /// Handles a received quote.
1229    pub(crate) fn handle_quote(&mut self, quote: &QuoteTick) {
1230        log_received(&quote);
1231
1232        if !self.is_running() {
1233            return;
1234        }
1235
1236        if let Err(e) = self.on_quote(quote) {
1237            log_error(&e);
1238        }
1239    }
1240
1241    /// Handles a received trade.
1242    pub(crate) fn handle_trade(&mut self, trade: &TradeTick) {
1243        log_received(&trade);
1244
1245        if !self.is_running() {
1246            return;
1247        }
1248
1249        if let Err(e) = self.on_trade(trade) {
1250            log_error(&e);
1251        }
1252    }
1253
1254    /// Handles a receiving bar.
1255    pub(crate) fn handle_bar(&mut self, bar: &Bar) {
1256        log_received(&bar);
1257
1258        if !self.is_running() {
1259            return;
1260        }
1261
1262        if let Err(e) = self.on_bar(bar) {
1263            log_error(&e);
1264        }
1265    }
1266
1267    /// Handles a received mark price update.
1268    pub(crate) fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
1269        log_received(&mark_price);
1270
1271        if !self.is_running() {
1272            return;
1273        }
1274
1275        if let Err(e) = self.on_mark_price(mark_price) {
1276            log_error(&e);
1277        }
1278    }
1279
1280    /// Handles a received index price update.
1281    pub(crate) fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
1282        log_received(&index_price);
1283
1284        if !self.is_running() {
1285            return;
1286        }
1287
1288        if let Err(e) = self.on_index_price(index_price) {
1289            log_error(&e);
1290        }
1291    }
1292
1293    /// Handles a received instrument status.
1294    pub(crate) fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
1295        log_received(&status);
1296
1297        if !self.is_running() {
1298            return;
1299        }
1300
1301        if let Err(e) = self.on_instrument_status(status) {
1302            log_error(&e);
1303        }
1304    }
1305
1306    /// Handles a received instrument close.
1307    pub(crate) fn handle_instrument_close(&mut self, close: &InstrumentClose) {
1308        log_received(&close);
1309
1310        if !self.is_running() {
1311            return;
1312        }
1313
1314        if let Err(e) = self.on_instrument_close(close) {
1315            log_error(&e);
1316        }
1317    }
1318
1319    /// Handles multiple received instruments.
1320    pub(crate) fn handle_instruments(&mut self, instruments: &Vec<InstrumentAny>) {
1321        for instrument in instruments {
1322            self.handle_instrument(instrument);
1323        }
1324    }
1325
1326    /// Handles multiple received quote ticks.
1327    pub(crate) fn handle_quotes(&mut self, quotes: &Vec<QuoteTick>) {
1328        for quote in quotes {
1329            self.handle_quote(quote);
1330        }
1331    }
1332
1333    /// Handles multiple received trade ticks.
1334    pub(crate) fn handle_trades(&mut self, trades: &Vec<TradeTick>) {
1335        for trade in trades {
1336            self.handle_trade(trade);
1337        }
1338    }
1339
1340    /// Handles multiple received bars.
1341    pub(crate) fn handle_bars(&mut self, bars: &Vec<Bar>) {
1342        for bar in bars {
1343            self.handle_bar(bar);
1344        }
1345    }
1346
1347    /// Handles a received historical data.
1348    pub(crate) fn handle_historical_data(&mut self, data: &dyn Any) {
1349        log_received(&data);
1350
1351        if let Err(e) = self.on_historical_data(data) {
1352            log_error(&e);
1353        }
1354    }
1355
1356    /// Handles a received signal.
1357    pub(crate) fn handle_signal(&mut self, signal: &Signal) {
1358        log_received(&signal);
1359
1360        if !self.is_running() {
1361            return;
1362        }
1363
1364        if let Err(e) = self.on_signal(signal) {
1365            log_error(&e);
1366        }
1367    }
1368}
1369
1370fn log_error(e: &anyhow::Error) {
1371    log::error!("{e}");
1372}
1373
1374fn log_received<T>(msg: &T)
1375where
1376    T: std::fmt::Debug,
1377{
1378    log::debug!("{} {:?}", RECV, msg);
1379}
1380
1381///////////////////////////////////////////////////////////////////////////////////////////////////
1382// Tests
1383///////////////////////////////////////////////////////////////////////////////////////////////////
1384#[cfg(test)]
1385mod tests {
1386    use std::{
1387        any::Any,
1388        cell::{RefCell, UnsafeCell},
1389        ops::{Deref, DerefMut},
1390        rc::Rc,
1391        sync::Arc,
1392    };
1393
1394    use nautilus_model::{
1395        data::{QuoteTick, TradeTick},
1396        identifiers::ActorId,
1397        instruments::CurrencyPair,
1398        orderbook::OrderBook,
1399    };
1400    use rstest::{fixture, rstest};
1401    use ustr::Ustr;
1402
1403    use super::{Actor, DataActor, DataActorConfig, DataActorCore};
1404    use crate::{
1405        actor::registry::{get_actor_unchecked, register_actor},
1406        cache::Cache,
1407        clock::{Clock, TestClock},
1408        msgbus::{
1409            self,
1410            switchboard::{MessagingSwitchboard, get_quotes_topic, get_trades_topic},
1411        },
1412    };
1413
1414    struct TestDataActor {
1415        core: DataActorCore,
1416        pub received_quotes: Vec<TradeTick>,
1417        pub received_trades: Vec<TradeTick>,
1418    }
1419
1420    impl Deref for TestDataActor {
1421        type Target = DataActorCore;
1422
1423        fn deref(&self) -> &Self::Target {
1424            &self.core
1425        }
1426    }
1427
1428    impl DerefMut for TestDataActor {
1429        fn deref_mut(&mut self) -> &mut Self::Target {
1430            &mut self.core
1431        }
1432    }
1433
1434    impl Actor for TestDataActor {
1435        fn id(&self) -> Ustr {
1436            self.core.actor_id.inner()
1437        }
1438
1439        fn handle(&mut self, msg: &dyn Any) {
1440            // Let the core handle message routing
1441            self.core.handle(msg);
1442        }
1443
1444        fn as_any(&self) -> &dyn Any {
1445            self
1446        }
1447    }
1448
1449    // Implement DataActor trait overriding handlers are required
1450    impl DataActor for TestDataActor {
1451        fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
1452            Ok(())
1453        }
1454
1455        fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
1456            Ok(())
1457        }
1458
1459        fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
1460            Ok(())
1461        }
1462
1463        fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
1464            self.received_trades.push(*trade);
1465            Ok(())
1466        }
1467    }
1468
1469    // Custom functionality as required
1470    impl TestDataActor {
1471        pub fn new(
1472            config: DataActorConfig,
1473            cache: Rc<RefCell<Cache>>,
1474            clock: Rc<RefCell<dyn Clock>>,
1475            switchboard: Arc<MessagingSwitchboard>,
1476        ) -> Self {
1477            Self {
1478                core: DataActorCore::new(config, cache, clock, switchboard),
1479                received_quotes: Vec::new(),
1480                received_trades: Vec::new(),
1481            }
1482        }
1483        pub fn custom_function(&mut self) {}
1484    }
1485
1486    #[fixture]
1487    pub fn clock() -> Rc<RefCell<TestClock>> {
1488        Rc::new(RefCell::new(TestClock::new()))
1489    }
1490
1491    #[fixture]
1492    pub fn cache() -> Rc<RefCell<Cache>> {
1493        Rc::new(RefCell::new(Cache::new(None, None)))
1494    }
1495
1496    #[fixture]
1497    pub fn switchboard() -> Arc<MessagingSwitchboard> {
1498        Arc::new(MessagingSwitchboard::default())
1499    }
1500
1501    fn register_data_actor(
1502        clock: Rc<RefCell<TestClock>>,
1503        cache: Rc<RefCell<Cache>>,
1504        switchboard: Arc<MessagingSwitchboard>,
1505    ) {
1506        let config = DataActorConfig::default();
1507        let actor = TestDataActor::new(config, cache, clock, switchboard);
1508        let actor_rc = Rc::new(UnsafeCell::new(actor));
1509        register_actor(actor_rc);
1510    }
1511
1512    fn test_subscribe_and_receive_quotes(
1513        clock: Rc<RefCell<TestClock>>,
1514        cache: Rc<RefCell<Cache>>,
1515        switchboard: Arc<MessagingSwitchboard>,
1516        audusd_sim: CurrencyPair,
1517    ) {
1518        register_data_actor(clock.clone(), cache.clone(), switchboard.clone());
1519
1520        let actor_id = ActorId::new("DataActor").inner(); // TODO: Determine default ID
1521        let actor = get_actor_unchecked::<TestDataActor>(&actor_id);
1522        actor.subscribe_quotes(audusd_sim.id, None, None);
1523
1524        let topic = get_quotes_topic(audusd_sim.id);
1525        let trade = QuoteTick::default();
1526        msgbus::publish(&topic, &trade);
1527        msgbus::publish(&topic, &trade);
1528
1529        assert_eq!(actor.received_quotes.len(), 2);
1530    }
1531
1532    fn test_subscribe_and_receive_trades(
1533        clock: Rc<RefCell<TestClock>>,
1534        cache: Rc<RefCell<Cache>>,
1535        switchboard: Arc<MessagingSwitchboard>,
1536        audusd_sim: CurrencyPair,
1537    ) {
1538        register_data_actor(clock.clone(), cache.clone(), switchboard.clone());
1539
1540        let actor_id = ActorId::new("DataActor").inner(); // TODO: Determine default ID
1541        let actor = get_actor_unchecked::<TestDataActor>(&actor_id);
1542        actor.subscribe_trades(audusd_sim.id, None, None);
1543
1544        let topic = get_trades_topic(audusd_sim.id);
1545        let trade = TradeTick::default();
1546        msgbus::publish(&topic, &trade);
1547        msgbus::publish(&topic, &trade);
1548
1549        assert_eq!(actor.received_trades.len(), 2);
1550    }
1551}