nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19#![allow(unused_imports)]
20
21use std::{
22    any::{Any, TypeId},
23    cell::{RefCell, UnsafeCell},
24    collections::HashSet,
25    fmt::Debug,
26    num::NonZeroUsize,
27    ops::{Deref, DerefMut},
28    rc::Rc,
29    sync::Arc,
30};
31
32use ahash::{AHashMap, AHashSet};
33use chrono::{DateTime, Utc};
34use indexmap::IndexMap;
35use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
36use nautilus_model::{
37    data::{
38        Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
39        OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
40    },
41    enums::BookType,
42    identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
43    instruments::{Instrument, InstrumentAny},
44    orderbook::OrderBook,
45};
46use ustr::Ustr;
47use uuid::Uuid;
48
49#[cfg(feature = "indicators")]
50use super::indicators::Indicators;
51use super::{Actor, registry::get_actor_unchecked};
52use crate::{
53    cache::Cache,
54    clock::Clock,
55    enums::{ComponentState, ComponentTrigger},
56    logging::{CMD, RECV, REQ, SEND},
57    messages::{
58        data::{
59            BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
60            InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
61            RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
62            SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
63            SubscribeCustomData, SubscribeIndexPrices, SubscribeInstrument,
64            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
65            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
66            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
67            UnsubscribeCustomData, UnsubscribeIndexPrices, UnsubscribeInstrument,
68            UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
69            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
70        },
71        system::ShutdownSystem,
72    },
73    msgbus::{
74        self, MStr, Pattern, Topic, get_message_bus,
75        handler::{MessageHandler, ShareableMessageHandler, TypedMessageHandler},
76        switchboard::{
77            self, MessagingSwitchboard, get_bars_topic, get_book_deltas_topic,
78            get_book_snapshots_topic, get_custom_topic, get_index_price_topic,
79            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
80            get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
81        },
82    },
83    signal::Signal,
84    timer::TimeEvent,
85};
86
87/// Common configuration for [`DataActor`] based components.
88#[derive(Debug, Clone)]
89pub struct DataActorConfig {
90    /// The custom identifier for the Actor.
91    pub actor_id: Option<ActorId>,
92    /// If events should be logged.
93    pub log_events: bool,
94    /// If commands should be logged.
95    pub log_commands: bool,
96}
97
98impl Default for DataActorConfig {
99    fn default() -> Self {
100        Self {
101            actor_id: None,
102            log_events: true,
103            log_commands: true,
104        }
105    }
106}
107
108type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; // TODO: TBD
109
110impl Actor for DataActorCore {
111    fn id(&self) -> Ustr {
112        self.actor_id.inner()
113    }
114
115    fn handle(&mut self, msg: &dyn Any) {}
116
117    fn as_any(&self) -> &dyn Any {
118        self
119    }
120}
121
122pub trait DataActor: Actor {
123    /// Returns the [`ComponentState`] of the actor.
124    fn state(&self) -> ComponentState;
125
126    /// Returns `true` if the actor is in a `Ready` state.
127    fn is_ready(&self) -> bool {
128        self.state() == ComponentState::Ready
129    }
130
131    /// Returns `true` if the actor is in a `Running` state.
132    fn is_running(&self) -> bool {
133        self.state() == ComponentState::Running
134    }
135
136    /// Returns `true` if the actor is in a `Stopped` state.
137    fn is_stopped(&self) -> bool {
138        self.state() == ComponentState::Stopped
139    }
140
141    /// Returns `true` if the actor is in a `Disposed` state.
142    fn is_disposed(&self) -> bool {
143        self.state() == ComponentState::Disposed
144    }
145
146    /// Returns `true` if the actor is in a `Degraded` state.
147    fn is_degraded(&self) -> bool {
148        self.state() == ComponentState::Degraded
149    }
150
151    /// Returns `true` if the actor is in a `Faulted` state.
152    fn is_faulted(&self) -> bool {
153        self.state() == ComponentState::Faulted
154    }
155
156    /// Actions to be performed when the actor state is saved.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if saving the actor state fails.
161    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
162        Ok(IndexMap::new())
163    }
164
165    /// Actions to be performed when the actor state is loaded.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if loading the actor state fails.
170    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
171        Ok(())
172    }
173
174    /// Actions to be performed on start.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if starting the actor fails.
179    fn on_start(&mut self) -> anyhow::Result<()> {
180        log::warn!(
181            "The `on_start` handler was called when not overridden, \
182            it's expected that any actions required when starting the actor \
183            occur here, such as subscribing/requesting data"
184        );
185        Ok(())
186    }
187
188    /// Actions to be performed on stop.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if stopping the actor fails.
193    fn on_stop(&mut self) -> anyhow::Result<()> {
194        log::warn!(
195            "The `on_stop` handler was called when not overridden, \
196            it's expected that any actions required when stopping the actor \
197            occur here, such as unsubscribing from data",
198        );
199        Ok(())
200    }
201
202    /// Actions to be performed on resume.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if resuming the actor fails.
207    fn on_resume(&mut self) -> anyhow::Result<()> {
208        log::warn!(
209            "The `on_resume` handler was called when not overridden, \
210            it's expected that any actions required when resuming the actor \
211            following a stop occur here"
212        );
213        Ok(())
214    }
215
216    /// Actions to be performed on reset.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if resetting the actor fails.
221    fn on_reset(&mut self) -> anyhow::Result<()> {
222        log::warn!(
223            "The `on_reset` handler was called when not overridden, \
224            it's expected that any actions required when resetting the actor \
225            occur here, such as resetting indicators and other state"
226        );
227        Ok(())
228    }
229
230    /// Actions to be performed on dispose.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if disposing the actor fails.
235    fn on_dispose(&mut self) -> anyhow::Result<()> {
236        Ok(())
237    }
238
239    /// Actions to be performed on degrade.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if degrading the actor fails.
244    fn on_degrade(&mut self) -> anyhow::Result<()> {
245        Ok(())
246    }
247
248    /// Actions to be performed on fault.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if faulting the actor fails.
253    fn on_fault(&mut self) -> anyhow::Result<()> {
254        Ok(())
255    }
256
257    /// Actions to be performed when receiving an event.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if handling the event fails.
262    fn on_event(&mut self, event: &dyn Any) -> anyhow::Result<()> {
263        // TODO: Implement `Event` enum?
264        Ok(())
265    }
266
267    /// Actions to be performed when receiving a time event.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if handling the time event fails.
272    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
273        Ok(())
274    }
275
276    /// Actions to be performed when receiving custom data.
277    ///
278    /// # Errors
279    ///
280    /// Returns an error if handling the data fails.
281    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
282        Ok(())
283    }
284
285    /// Actions to be performed when receiving a signal.
286    ///
287    /// # Errors
288    ///
289    /// Returns an error if handling the signal fails.
290    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
291        Ok(())
292    }
293
294    /// Actions to be performed when receiving an instrument.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if handling the instrument fails.
299    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
300        Ok(())
301    }
302
303    /// Actions to be performed when receiving order book deltas.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if handling the book deltas fails.
308    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
309        Ok(())
310    }
311
312    /// Actions to be performed when receiving an order book.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if handling the book fails.
317    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
318        Ok(())
319    }
320
321    /// Actions to be performed when receiving a quote.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if handling the quote fails.
326    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
327        Ok(())
328    }
329
330    /// Actions to be performed when receiving a trade.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if handling the trade fails.
335    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
336        Ok(())
337    }
338
339    /// Actions to be performed when receiving a bar.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if handling the bar fails.
344    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
345        Ok(())
346    }
347
348    /// Actions to be performed when receiving a mark price update.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if handling the mark price update fails.
353    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
354        Ok(())
355    }
356
357    /// Actions to be performed when receiving an index price update.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if handling the index price update fails.
362    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
363        Ok(())
364    }
365
366    /// Actions to be performed when receiving an instrument status update.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if handling the instrument status update fails.
371    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
372        Ok(())
373    }
374
375    /// Actions to be performed when receiving an instrument close update.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if handling the instrument close update fails.
380    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
381        Ok(())
382    }
383
384    /// Actions to be performed when receiving historical data.
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if handling the historical data fails.
389    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
390        Ok(())
391    }
392
393    /// Actions to be performed when receiving historical quotes.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if handling the historical quotes fails.
398    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
399        Ok(())
400    }
401
402    /// Actions to be performed when receiving historical trades.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if handling the historical trades fails.
407    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
408        Ok(())
409    }
410
411    /// Actions to be performed when receiving historical bars.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if handling the historical bars fails.
416    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
417        Ok(())
418    }
419
420    /// Actions to be performed when receiving historical mark prices.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if handling the historical mark prices fails.
425    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
426        Ok(())
427    }
428
429    /// Actions to be performed when receiving historical index prices.
430    ///
431    /// # Errors
432    ///
433    /// Returns an error if handling the historical index prices fails.
434    fn on_historical_index_prices(
435        &mut self,
436        index_prices: &[IndexPriceUpdate],
437    ) -> anyhow::Result<()> {
438        Ok(())
439    }
440
441    /// Handles a received custom data point.
442    fn handle_data(&mut self, data: &dyn Any) {
443        log_received(&data);
444
445        if !self.is_running() {
446            log_not_running(&data);
447            return;
448        }
449
450        if let Err(e) = self.on_data(data) {
451            log_error(&e);
452        }
453    }
454
455    /// Handles a received signal.
456    fn handle_signal(&mut self, signal: &Signal) {
457        log_received(&signal);
458
459        if !self.is_running() {
460            log_not_running(&signal);
461            return;
462        }
463
464        if let Err(e) = self.on_signal(signal) {
465            log_error(&e);
466        }
467    }
468
469    /// Handles a received instrument.
470    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
471        log_received(&instrument);
472
473        if !self.is_running() {
474            log_not_running(&instrument);
475            return;
476        }
477
478        if let Err(e) = self.on_instrument(instrument) {
479            log_error(&e);
480        }
481    }
482
483    /// Handles received order book deltas.
484    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
485        log_received(&deltas);
486
487        if !self.is_running() {
488            log_not_running(&deltas);
489            return;
490        }
491
492        if let Err(e) = self.on_book_deltas(deltas) {
493            log_error(&e);
494        }
495    }
496
497    /// Handles a received order book reference.
498    fn handle_book(&mut self, book: &OrderBook) {
499        log_received(&book);
500
501        if !self.is_running() {
502            log_not_running(&book);
503            return;
504        }
505
506        if let Err(e) = self.on_book(book) {
507            log_error(&e);
508        };
509    }
510
511    /// Handles a received quote.
512    fn handle_quote(&mut self, quote: &QuoteTick) {
513        log_received(&quote);
514
515        if !self.is_running() {
516            log_not_running(&quote);
517            return;
518        }
519
520        if let Err(e) = self.on_quote(quote) {
521            log_error(&e);
522        }
523    }
524
525    /// Handles a received trade.
526    fn handle_trade(&mut self, trade: &TradeTick) {
527        log_received(&trade);
528
529        if !self.is_running() {
530            log_not_running(&trade);
531            return;
532        }
533
534        if let Err(e) = self.on_trade(trade) {
535            log_error(&e);
536        }
537    }
538
539    /// Handles a receiving bar.
540    fn handle_bar(&mut self, bar: &Bar) {
541        log_received(&bar);
542
543        if !self.is_running() {
544            log_not_running(&bar);
545            return;
546        }
547
548        if let Err(e) = self.on_bar(bar) {
549            log_error(&e);
550        }
551    }
552
553    /// Handles a received mark price update.
554    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
555        log_received(&mark_price);
556
557        if !self.is_running() {
558            log_not_running(&mark_price);
559            return;
560        }
561
562        if let Err(e) = self.on_mark_price(mark_price) {
563            log_error(&e);
564        }
565    }
566
567    /// Handles a received index price update.
568    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
569        log_received(&index_price);
570
571        if !self.is_running() {
572            log_not_running(&index_price);
573            return;
574        }
575
576        if let Err(e) = self.on_index_price(index_price) {
577            log_error(&e);
578        }
579    }
580
581    /// Handles a received instrument status.
582    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
583        log_received(&status);
584
585        if !self.is_running() {
586            log_not_running(&status);
587            return;
588        }
589
590        if let Err(e) = self.on_instrument_status(status) {
591            log_error(&e);
592        }
593    }
594
595    /// Handles a received instrument close.
596    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
597        log_received(&close);
598
599        if !self.is_running() {
600            log_not_running(&close);
601            return;
602        }
603
604        if let Err(e) = self.on_instrument_close(close) {
605            log_error(&e);
606        }
607    }
608
609    /// Handles received historical data.
610    fn handle_historical_data(&mut self, data: &dyn Any) {
611        log_received(&data);
612
613        if let Err(e) = self.on_historical_data(data) {
614            log_error(&e);
615        }
616    }
617
618    /// Handles a received time event.
619    fn handle_time_event(&mut self, event: &TimeEvent) {
620        log_received(&event);
621
622        if let Err(e) = self.on_time_event(event) {
623            log_error(&e);
624        }
625    }
626
627    /// Handles a received event.
628    fn handle_event(&mut self, event: &dyn Any) {
629        log_received(&event);
630
631        if let Err(e) = self.on_event(event) {
632            log_error(&e);
633        }
634    }
635
636    /// Handles a data response.
637    fn handle_data_response(&mut self, response: &CustomDataResponse) {
638        log_received(&response);
639
640        if let Err(e) = self.on_historical_data(response.data.as_ref()) {
641            log_error(&e);
642        }
643    }
644
645    /// Handles an instrument response.
646    fn handle_instrument_response(&mut self, response: &InstrumentResponse) {
647        log_received(&response);
648
649        if let Err(e) = self.on_instrument(&response.data) {
650            log_error(&e);
651        }
652    }
653
654    /// Handles an instruments response.
655    fn handle_instruments_response(&mut self, response: &InstrumentsResponse) {
656        log_received(&response);
657
658        for inst in &response.data {
659            if let Err(e) = self.on_instrument(inst) {
660                log_error(&e);
661            }
662        }
663    }
664
665    /// Handles a book response.
666    fn handle_book_response(&mut self, response: &BookResponse) {
667        log_received(&response);
668
669        if let Err(e) = self.on_book(&response.data) {
670            log_error(&e);
671        }
672    }
673
674    /// Handles a quotes response.
675    fn handle_quotes_response(&mut self, response: &QuotesResponse) {
676        log_received(&response);
677
678        if let Err(e) = self.on_historical_quotes(&response.data) {
679            log_error(&e);
680        }
681    }
682
683    /// Handles a trades response.
684    fn handle_trades_response(&mut self, response: &TradesResponse) {
685        log_received(&response);
686
687        if let Err(e) = self.on_historical_trades(&response.data) {
688            log_error(&e);
689        }
690    }
691
692    /// Handles a bars response.
693    fn handle_bars_response(&mut self, response: &BarsResponse) {
694        log_received(&response);
695
696        if let Err(e) = self.on_historical_bars(&response.data) {
697            log_error(&e);
698        }
699    }
700}
701
702/// Core functionality for all actors.
703pub struct DataActorCore {
704    /// The actor identifier.
705    pub actor_id: ActorId,
706    /// The actors configuration.
707    pub config: DataActorConfig,
708    /// The actors clock.
709    pub clock: Rc<RefCell<dyn Clock>>,
710    /// The cache for the actor.
711    pub cache: Rc<RefCell<Cache>>,
712    state: ComponentState,
713    trader_id: Option<TraderId>,
714    warning_events: AHashSet<String>, // TODO: TBD
715    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
716    signal_classes: AHashMap<String, String>,
717    #[cfg(feature = "indicators")]
718    indicators: Indicators,
719    topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
720}
721
722impl Debug for DataActorCore {
723    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
724        f.debug_struct(stringify!(DataActorCore))
725            .field("actor_id", &self.actor_id)
726            .field("config", &self.config)
727            .field("state", &self.state)
728            .field("trader_id", &self.trader_id)
729            .finish()
730    }
731}
732
733impl DataActor for DataActorCore {
734    fn state(&self) -> ComponentState {
735        self.state
736    }
737}
738
739impl DataActorCore {
740    /// Creates a new [`DataActorCore`] instance.
741    pub fn new(
742        config: DataActorConfig,
743        cache: Rc<RefCell<Cache>>,
744        clock: Rc<RefCell<dyn Clock>>,
745    ) -> Self {
746        let actor_id = config
747            .actor_id
748            .unwrap_or_else(|| Self::default_actor_id(&config));
749
750        Self {
751            actor_id,
752            config,
753            clock,
754            cache,
755            state: ComponentState::default(),
756            trader_id: None, // None until registered
757            warning_events: AHashSet::new(),
758            pending_requests: AHashMap::new(),
759            signal_classes: AHashMap::new(),
760            #[cfg(feature = "indicators")]
761            indicators: Indicators::default(),
762            topic_handlers: AHashMap::new(),
763        }
764    }
765
766    fn default_actor_id(config: &DataActorConfig) -> ActorId {
767        let memory_address = std::ptr::from_ref(config) as *const _ as usize;
768        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
769    }
770
771    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
772        self.state = self.state.transition(&trigger)?;
773        log::info!("{}", self.state);
774        Ok(())
775    }
776
777    // TODO: TBD initialization flow
778
779    /// Initializes the actor.
780    ///
781    /// # Errors
782    ///
783    /// Returns an error if the initialization state transition fails.
784    pub fn initialize(&mut self) -> anyhow::Result<()> {
785        self.transition_state(ComponentTrigger::Initialize)
786    }
787
788    /// Returns the trader ID this actor is registered to.
789    pub fn trader_id(&self) -> Option<TraderId> {
790        self.trader_id
791    }
792
793    // TODO: Extract this common state logic and handling out to some component module
794    pub fn state(&self) -> ComponentState {
795        self.state
796    }
797
798    // -- REGISTRATION ----------------------------------------------------------------------------
799
800    /// Register an event type for warning log levels.
801    pub fn register_warning_event(&mut self, event_type: &str) {
802        self.warning_events.insert(event_type.to_string());
803    }
804
805    /// Deregister an event type from warning log levels.
806    pub fn deregister_warning_event(&mut self, event_type: &str) {
807        self.warning_events.remove(event_type);
808        // TODO: Log deregistration
809    }
810
811    /// Sets the trader ID for the actor.
812    ///
813    /// # Panics
814    ///
815    /// Panics if a trader ID has already been set.
816    pub(crate) fn set_trader_id(&mut self, trader_id: TraderId) {
817        if let Some(existing_trader_id) = self.trader_id {
818            panic!("trader_id {existing_trader_id} already set");
819        }
820
821        self.trader_id = Some(trader_id)
822    }
823
824    fn check_registered(&self) {
825        assert!(
826            self.trader_id.is_some(),
827            "Actor has not been registered with a Trader"
828        );
829    }
830
831    fn generate_ts_init(&self) -> UnixNanos {
832        self.clock.borrow().timestamp_ns()
833    }
834
835    fn send_data_cmd(&self, command: DataCommand) {
836        if self.config.log_commands {
837            log::info!("{CMD}{SEND} {command:?}");
838        }
839
840        let endpoint = MessagingSwitchboard::data_engine_execute();
841        msgbus::send(endpoint, command.as_any())
842    }
843
844    fn send_data_req<A: DataActor>(&self, request: RequestCommand) {
845        if self.config.log_commands {
846            log::info!("{REQ}{SEND} {request:?}");
847        }
848
849        let actor_id = self.actor_id.inner();
850        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
851            move |response: &CustomDataResponse| {
852                get_actor_unchecked::<A>(&actor_id).handle_data_response(response);
853            },
854        )));
855
856        let msgbus = get_message_bus()
857            .borrow_mut()
858            .register_response_handler(request.request_id(), handler);
859
860        let endpoint = MessagingSwitchboard::data_engine_execute();
861        msgbus::send(endpoint, request.as_any())
862    }
863
864    /// Starts the actor.
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if starting the actor fails.
869    pub fn start(&mut self) -> anyhow::Result<()> {
870        self.transition_state(ComponentTrigger::Start)?; // -> Starting
871
872        if let Err(e) = self.on_start() {
873            log_error(&e);
874            return Err(e); // Halt state transition
875        }
876
877        self.transition_state(ComponentTrigger::StartCompleted)?;
878
879        Ok(())
880    }
881
882    /// Stops the actor.
883    ///
884    /// # Errors
885    ///
886    /// Returns an error if stopping the actor fails.
887    pub fn stop(&mut self) -> anyhow::Result<()> {
888        self.transition_state(ComponentTrigger::Stop)?; // -> Stopping
889
890        if let Err(e) = self.on_stop() {
891            log_error(&e);
892            return Err(e); // Halt state transition
893        }
894
895        self.transition_state(ComponentTrigger::StopCompleted)?;
896
897        Ok(())
898    }
899
900    /// Resumes the actor.
901    ///
902    /// # Errors
903    ///
904    /// Returns an error if resuming the actor fails.
905    pub fn resume(&mut self) -> anyhow::Result<()> {
906        self.transition_state(ComponentTrigger::Resume)?; // -> Resuming
907
908        if let Err(e) = self.on_stop() {
909            log_error(&e);
910            return Err(e); // Halt state transition
911        }
912
913        self.transition_state(ComponentTrigger::ResumeCompleted)?;
914
915        Ok(())
916    }
917
918    /// Resets the actor.
919    ///
920    /// # Errors
921    ///
922    /// Returns an error if resetting the actor fails.
923    pub fn reset(&mut self) -> anyhow::Result<()> {
924        self.transition_state(ComponentTrigger::Reset)?; // -> Resetting
925
926        if let Err(e) = self.on_reset() {
927            log_error(&e);
928            return Err(e); // Halt state transition
929        }
930
931        self.transition_state(ComponentTrigger::ResetCompleted)?;
932
933        Ok(())
934    }
935
936    /// Disposes the actor.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if disposing the actor fails.
941    pub fn dispose(&mut self) -> anyhow::Result<()> {
942        self.transition_state(ComponentTrigger::Dispose)?; // -> Disposing
943
944        if let Err(e) = self.on_dispose() {
945            log_error(&e);
946            return Err(e); // Halt state transition
947        }
948
949        self.transition_state(ComponentTrigger::DisposeCompleted)?;
950
951        Ok(())
952    }
953
954    /// Degrades the actor.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if degrading the actor fails.
959    pub fn degrade(&mut self) -> anyhow::Result<()> {
960        self.transition_state(ComponentTrigger::Degrade)?; // -> Degrading
961
962        if let Err(e) = self.on_degrade() {
963            log_error(&e);
964            return Err(e); // Halt state transition
965        }
966
967        self.transition_state(ComponentTrigger::DegradeCompleted)?;
968
969        Ok(())
970    }
971
972    /// Faults the actor.
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if faulting the actor fails.
977    pub fn fault(&mut self) -> anyhow::Result<()> {
978        self.transition_state(ComponentTrigger::Fault)?; // -> Faulting
979
980        if let Err(e) = self.on_fault() {
981            log_error(&e);
982            return Err(e); // Halt state transition
983        }
984
985        self.transition_state(ComponentTrigger::FaultCompleted)?;
986
987        Ok(())
988    }
989
990    /// Sends a shutdown command to the system with an optional reason.
991    ///
992    /// # Panics
993    ///
994    /// Panics if the actor is not registered or has no trader ID.
995    pub fn shutdown_system(&self, reason: Option<String>) {
996        self.check_registered();
997
998        // SAFETY: Checked registered before unwrapping trader ID
999        let command = ShutdownSystem::new(
1000            self.trader_id().unwrap(),
1001            self.actor_id.inner(),
1002            reason,
1003            UUID4::new(),
1004            self.clock.borrow().timestamp_ns(),
1005        );
1006
1007        let endpoint = "command.system.shutdown".into();
1008        msgbus::send(endpoint, command.as_any());
1009    }
1010
1011    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
1012
1013    fn get_or_create_handler_for_topic<F>(
1014        &mut self,
1015        topic: MStr<Topic>,
1016        create_handler: F,
1017    ) -> ShareableMessageHandler
1018    where
1019        F: FnOnce() -> ShareableMessageHandler,
1020    {
1021        if let Some(existing_handler) = self.topic_handlers.get(&topic) {
1022            existing_handler.clone()
1023        } else {
1024            let new_handler = create_handler();
1025            self.topic_handlers.insert(topic, new_handler.clone());
1026            new_handler
1027        }
1028    }
1029
1030    fn get_handler_for_topic(&self, topic: MStr<Topic>) -> Option<ShareableMessageHandler> {
1031        self.topic_handlers.get(&topic).cloned()
1032    }
1033
1034    /// Subscribe to streaming `data_type` data.
1035    pub fn subscribe_data<A: DataActor>(
1036        &mut self,
1037        data_type: DataType,
1038        client_id: Option<ClientId>,
1039        params: Option<IndexMap<String, String>>,
1040    ) {
1041        self.check_registered();
1042
1043        let topic = get_custom_topic(&data_type);
1044        let actor_id = self.actor_id.inner();
1045        let handler = if let Some(existing_handler) = self.topic_handlers.get(&topic) {
1046            existing_handler.clone()
1047        } else {
1048            let new_handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
1049                move |data: &dyn Any| {
1050                    get_actor_unchecked::<A>(&actor_id).handle_data(data);
1051                },
1052            )));
1053
1054            self.topic_handlers.insert(topic, new_handler.clone());
1055            new_handler
1056        };
1057
1058        msgbus::subscribe_topic(topic, handler, None);
1059
1060        if client_id.is_none() {
1061            // If no client ID specified, just subscribe to the topic
1062            return;
1063        }
1064
1065        let command = SubscribeCommand::Data(SubscribeCustomData {
1066            data_type,
1067            client_id,
1068            venue: None,
1069            command_id: UUID4::new(),
1070            ts_init: self.generate_ts_init(),
1071            params,
1072        });
1073
1074        self.send_data_cmd(DataCommand::Subscribe(command));
1075    }
1076
1077    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
1078    pub fn subscribe_instruments<A: DataActor>(
1079        &mut self,
1080        venue: Venue,
1081        client_id: Option<ClientId>,
1082        params: Option<IndexMap<String, String>>,
1083    ) {
1084        self.check_registered();
1085
1086        let topic = get_instruments_topic(venue);
1087        let actor_id = self.actor_id.inner();
1088        let handler = self.get_or_create_handler_for_topic(topic, || {
1089            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1090                move |instrument: &InstrumentAny| {
1091                    get_actor_unchecked::<A>(&actor_id).handle_instrument(instrument);
1092                },
1093            )))
1094        });
1095
1096        msgbus::subscribe_topic(topic, handler, None);
1097
1098        let command = SubscribeCommand::Instruments(SubscribeInstruments {
1099            client_id,
1100            venue,
1101            command_id: UUID4::new(),
1102            ts_init: self.generate_ts_init(),
1103            params,
1104        });
1105
1106        self.send_data_cmd(DataCommand::Subscribe(command));
1107    }
1108
1109    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
1110    pub fn subscribe_instrument<A: DataActor>(
1111        &mut self,
1112        instrument_id: InstrumentId,
1113        client_id: Option<ClientId>,
1114        params: Option<IndexMap<String, String>>,
1115    ) {
1116        self.check_registered();
1117
1118        let topic = get_instrument_topic(instrument_id);
1119        let actor_id = self.actor_id.inner();
1120        let handler = self.get_or_create_handler_for_topic(topic, || {
1121            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1122                move |instrument: &InstrumentAny| {
1123                    get_actor_unchecked::<A>(&actor_id).handle_instrument(instrument);
1124                },
1125            )))
1126        });
1127
1128        msgbus::subscribe_topic(topic, handler, None);
1129
1130        let command = SubscribeCommand::Instrument(SubscribeInstrument {
1131            instrument_id,
1132            client_id,
1133            venue: Some(instrument_id.venue),
1134            command_id: UUID4::new(),
1135            ts_init: self.generate_ts_init(),
1136            params,
1137        });
1138
1139        self.send_data_cmd(DataCommand::Subscribe(command));
1140    }
1141
1142    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1143    ///
1144    /// Once subscribed, any matching order book deltas published on the message bus are forwarded
1145    /// to the `on_book_deltas` handler.
1146    pub fn subscribe_book_deltas<A: DataActor>(
1147        &mut self,
1148        instrument_id: InstrumentId,
1149        book_type: BookType,
1150        depth: Option<NonZeroUsize>,
1151        client_id: Option<ClientId>,
1152        managed: bool,
1153        params: Option<IndexMap<String, String>>,
1154    ) {
1155        self.check_registered();
1156
1157        let topic = get_book_deltas_topic(instrument_id);
1158        let actor_id = self.actor_id.inner();
1159        let handler = self.get_or_create_handler_for_topic(topic, || {
1160            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1161                move |deltas: &OrderBookDeltas| {
1162                    get_actor_unchecked::<A>(&actor_id).handle_book_deltas(deltas);
1163                },
1164            )))
1165        });
1166
1167        msgbus::subscribe_topic(topic, handler, None);
1168
1169        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
1170            instrument_id,
1171            book_type,
1172            client_id,
1173            venue: Some(instrument_id.venue),
1174            command_id: UUID4::new(),
1175            ts_init: self.generate_ts_init(),
1176            depth,
1177            managed,
1178            params,
1179        });
1180
1181        self.send_data_cmd(DataCommand::Subscribe(command));
1182    }
1183
1184    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1185    ///
1186    /// Once subscribed, any matching order book snapshots published on the message bus are forwarded
1187    /// to the `on_book` handler.
1188    ///
1189    /// # Warnings
1190    ///
1191    /// Consider subscribing to order book deltas if you need intervals less than 100 milliseconds.
1192    pub fn subscribe_book_at_interval<A: DataActor>(
1193        &mut self,
1194        instrument_id: InstrumentId,
1195        book_type: BookType,
1196        depth: Option<NonZeroUsize>,
1197        interval_ms: NonZeroUsize,
1198        client_id: Option<ClientId>,
1199        params: Option<IndexMap<String, String>>,
1200    ) {
1201        self.check_registered();
1202
1203        if book_type == BookType::L1_MBP && depth.is_some_and(|d| d.get() > 1) {
1204            log::error!(
1205                "Cannot subscribe to order book snapshots: L1 MBP book subscription depth > 1, was {:?}",
1206                depth,
1207            );
1208            return;
1209        }
1210
1211        let topic = get_book_snapshots_topic(instrument_id);
1212        let actor_id = self.actor_id.inner();
1213        let handler = self.get_or_create_handler_for_topic(topic, || {
1214            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1215                move |book: &OrderBook| {
1216                    get_actor_unchecked::<A>(&actor_id).handle_book(book);
1217                },
1218            )))
1219        });
1220
1221        msgbus::subscribe_topic(topic, handler, None);
1222
1223        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
1224            instrument_id,
1225            book_type,
1226            client_id,
1227            venue: Some(instrument_id.venue),
1228            command_id: UUID4::new(),
1229            ts_init: self.generate_ts_init(),
1230            depth,
1231            interval_ms,
1232            params,
1233        });
1234
1235        self.send_data_cmd(DataCommand::Subscribe(command));
1236    }
1237
1238    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
1239    pub fn subscribe_quotes<A: DataActor>(
1240        &mut self,
1241        instrument_id: InstrumentId,
1242        client_id: Option<ClientId>,
1243        params: Option<IndexMap<String, String>>,
1244    ) {
1245        self.check_registered();
1246
1247        let topic = get_quotes_topic(instrument_id);
1248        let actor_id = self.actor_id.inner();
1249        let handler = self.get_or_create_handler_for_topic(topic, || {
1250            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1251                move |quote: &QuoteTick| {
1252                    get_actor_unchecked::<A>(&actor_id).handle_quote(quote);
1253                },
1254            )))
1255        });
1256
1257        msgbus::subscribe_topic(topic, handler, None);
1258
1259        let command = SubscribeCommand::Quotes(SubscribeQuotes {
1260            instrument_id,
1261            client_id,
1262            venue: Some(instrument_id.venue),
1263            command_id: UUID4::new(),
1264            ts_init: self.generate_ts_init(),
1265            params,
1266        });
1267
1268        self.send_data_cmd(DataCommand::Subscribe(command));
1269    }
1270
1271    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1272    pub fn subscribe_trades<A: DataActor>(
1273        &mut self,
1274        instrument_id: InstrumentId,
1275        client_id: Option<ClientId>,
1276        params: Option<IndexMap<String, String>>,
1277    ) {
1278        self.check_registered();
1279
1280        let topic = get_trades_topic(instrument_id);
1281        let actor_id = self.actor_id.inner();
1282        let handler = self.get_or_create_handler_for_topic(topic, || {
1283            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1284                move |trade: &TradeTick| {
1285                    get_actor_unchecked::<A>(&actor_id).handle_trade(trade);
1286                },
1287            )))
1288        });
1289
1290        msgbus::subscribe_topic(topic, handler, None);
1291
1292        let command = SubscribeCommand::Trades(SubscribeTrades {
1293            instrument_id,
1294            client_id,
1295            venue: Some(instrument_id.venue),
1296            command_id: UUID4::new(),
1297            ts_init: self.generate_ts_init(),
1298            params,
1299        });
1300
1301        self.send_data_cmd(DataCommand::Subscribe(command));
1302    }
1303
1304    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1305    ///
1306    /// Once subscribed, any matching bar data published on the message bus is forwarded
1307    /// to the `on_bar` handler.
1308    pub fn subscribe_bars<A: DataActor>(
1309        &mut self,
1310        bar_type: BarType,
1311        client_id: Option<ClientId>,
1312        await_partial: bool,
1313        params: Option<IndexMap<String, String>>,
1314    ) {
1315        self.check_registered();
1316
1317        let topic = get_bars_topic(bar_type);
1318        let actor_id = self.actor_id.inner();
1319        let handler = self.get_or_create_handler_for_topic(topic, || {
1320            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1321                get_actor_unchecked::<A>(&actor_id).handle_bar(bar);
1322            })))
1323        });
1324
1325        msgbus::subscribe_topic(topic, handler, None);
1326
1327        let command = SubscribeCommand::Bars(SubscribeBars {
1328            bar_type,
1329            client_id,
1330            venue: Some(bar_type.instrument_id().venue),
1331            command_id: UUID4::new(),
1332            ts_init: self.generate_ts_init(),
1333            await_partial,
1334            params,
1335        });
1336
1337        self.send_data_cmd(DataCommand::Subscribe(command));
1338    }
1339
1340    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1341    ///
1342    /// Once subscribed, any matching mark price updates published on the message bus are forwarded
1343    /// to the `on_mark_price` handler.
1344    pub fn subscribe_mark_prices<A: DataActor>(
1345        &mut self,
1346        instrument_id: InstrumentId,
1347        client_id: Option<ClientId>,
1348        params: Option<IndexMap<String, String>>,
1349    ) {
1350        self.check_registered();
1351
1352        let topic = get_mark_price_topic(instrument_id);
1353        let actor_id = self.actor_id.inner();
1354        let handler = self.get_or_create_handler_for_topic(topic, || {
1355            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1356                move |mark_price: &MarkPriceUpdate| {
1357                    get_actor_unchecked::<A>(&actor_id).handle_mark_price(mark_price);
1358                },
1359            )))
1360        });
1361
1362        msgbus::subscribe_topic(topic, handler, None);
1363
1364        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
1365            instrument_id,
1366            client_id,
1367            venue: Some(instrument_id.venue),
1368            command_id: UUID4::new(),
1369            ts_init: self.generate_ts_init(),
1370            params,
1371        });
1372
1373        self.send_data_cmd(DataCommand::Subscribe(command));
1374    }
1375
1376    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1377    ///
1378    /// Once subscribed, any matching index price updates published on the message bus are forwarded
1379    /// to the `on_index_price` handler.
1380    pub fn subscribe_index_prices<A: DataActor>(
1381        &mut self,
1382        instrument_id: InstrumentId,
1383        client_id: Option<ClientId>,
1384        params: Option<IndexMap<String, String>>,
1385    ) {
1386        self.check_registered();
1387
1388        let topic = get_index_price_topic(instrument_id);
1389        let actor_id = self.actor_id.inner();
1390        let handler = self.get_or_create_handler_for_topic(topic, || {
1391            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1392                move |index_price: &IndexPriceUpdate| {
1393                    get_actor_unchecked::<A>(&actor_id).handle_index_price(index_price);
1394                },
1395            )))
1396        });
1397
1398        msgbus::subscribe_topic(topic, handler, None);
1399
1400        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
1401            instrument_id,
1402            client_id,
1403            venue: Some(instrument_id.venue),
1404            command_id: UUID4::new(),
1405            ts_init: self.generate_ts_init(),
1406            params,
1407        });
1408
1409        self.send_data_cmd(DataCommand::Subscribe(command));
1410    }
1411
1412    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1413    ///
1414    /// Once subscribed, any matching bar data published on the message bus is forwarded
1415    /// to the `on_bar` handler.
1416    pub fn subscribe_instrument_status<A: DataActor>(
1417        &mut self,
1418        instrument_id: InstrumentId,
1419        client_id: Option<ClientId>,
1420        params: Option<IndexMap<String, String>>,
1421    ) {
1422        self.check_registered();
1423
1424        let topic = get_instrument_status_topic(instrument_id);
1425        let actor_id = self.actor_id.inner();
1426        let handler = self.get_or_create_handler_for_topic(topic, || {
1427            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1428                move |status: &InstrumentStatus| {
1429                    get_actor_unchecked::<A>(&actor_id).handle_instrument_status(status);
1430                },
1431            )))
1432        });
1433
1434        msgbus::subscribe_topic(topic, handler, None);
1435
1436        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
1437            instrument_id,
1438            client_id,
1439            venue: Some(instrument_id.venue),
1440            command_id: UUID4::new(),
1441            ts_init: self.generate_ts_init(),
1442            params,
1443        });
1444
1445        self.send_data_cmd(DataCommand::Subscribe(command));
1446    }
1447
1448    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1449    ///
1450    /// Once subscribed, any matching instrument close data published on the message bus is forwarded
1451    /// to the `on_instrument_close` handler.
1452    pub fn subscribe_instrument_close<A: DataActor>(
1453        &mut self,
1454        instrument_id: InstrumentId,
1455        client_id: Option<ClientId>,
1456        params: Option<IndexMap<String, String>>,
1457    ) {
1458        self.check_registered();
1459
1460        let topic = get_instrument_close_topic(instrument_id);
1461        let actor_id = self.actor_id.inner();
1462        let handler = self.get_or_create_handler_for_topic(topic, || {
1463            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1464                move |close: &InstrumentClose| {
1465                    get_actor_unchecked::<A>(&actor_id).handle_instrument_close(close);
1466                },
1467            )))
1468        });
1469
1470        msgbus::subscribe_topic(topic, handler, None);
1471
1472        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
1473            instrument_id,
1474            client_id,
1475            venue: Some(instrument_id.venue),
1476            command_id: UUID4::new(),
1477            ts_init: self.generate_ts_init(),
1478            params,
1479        });
1480
1481        self.send_data_cmd(DataCommand::Subscribe(command));
1482    }
1483
1484    /// Unsubscribe from streaming `data_type` data.
1485    pub fn unsubscribe_data<A: DataActor>(
1486        &self,
1487        data_type: DataType,
1488        client_id: Option<ClientId>,
1489        params: Option<IndexMap<String, String>>,
1490    ) {
1491        self.check_registered();
1492
1493        let topic = get_custom_topic(&data_type);
1494        if let Some(handler) = self.topic_handlers.get(&topic) {
1495            msgbus::unsubscribe_topic(topic, handler.clone());
1496        };
1497
1498        if client_id.is_none() {
1499            return;
1500        }
1501
1502        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
1503            data_type,
1504            client_id,
1505            venue: None,
1506            command_id: UUID4::new(),
1507            ts_init: self.generate_ts_init(),
1508            params,
1509        });
1510
1511        self.send_data_cmd(DataCommand::Unsubscribe(command));
1512    }
1513
1514    /// Unsubscribe from streaming [`Instrument`] data for the `venue`.
1515    pub fn unsubscribe_instruments<A: DataActor>(
1516        &self,
1517        venue: Venue,
1518        client_id: Option<ClientId>,
1519        params: Option<IndexMap<String, String>>,
1520    ) {
1521        self.check_registered();
1522
1523        let topic = get_instruments_topic(venue);
1524        if let Some(handler) = self.topic_handlers.get(&topic) {
1525            msgbus::unsubscribe_topic(topic, handler.clone());
1526        };
1527
1528        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
1529            client_id,
1530            venue,
1531            command_id: UUID4::new(),
1532            ts_init: self.generate_ts_init(),
1533            params,
1534        });
1535
1536        self.send_data_cmd(DataCommand::Unsubscribe(command));
1537    }
1538
1539    /// Unsubscribe from streaming [`Instrument`] definitions for the `instrument_id`.
1540    pub fn unsubscribe_instrument<A: DataActor>(
1541        &self,
1542        instrument_id: InstrumentId,
1543        client_id: Option<ClientId>,
1544        params: Option<IndexMap<String, String>>,
1545    ) {
1546        self.check_registered();
1547
1548        let topic = get_instrument_topic(instrument_id);
1549        if let Some(handler) = self.topic_handlers.get(&topic) {
1550            msgbus::unsubscribe_topic(topic, handler.clone());
1551        };
1552
1553        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
1554            instrument_id,
1555            client_id,
1556            venue: Some(instrument_id.venue),
1557            command_id: UUID4::new(),
1558            ts_init: self.generate_ts_init(),
1559            params,
1560        });
1561
1562        self.send_data_cmd(DataCommand::Unsubscribe(command));
1563    }
1564
1565    /// Unsubscribe from streaming [`OrderBookDeltas`] for the `instrument_id`.
1566    pub fn unsubscribe_book_deltas<A: DataActor>(
1567        &self,
1568        instrument_id: InstrumentId,
1569        client_id: Option<ClientId>,
1570        params: Option<IndexMap<String, String>>,
1571    ) {
1572        self.check_registered();
1573
1574        let topic = get_book_deltas_topic(instrument_id);
1575        if let Some(handler) = self.topic_handlers.get(&topic) {
1576            msgbus::unsubscribe_topic(topic, handler.clone());
1577        };
1578
1579        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
1580            instrument_id,
1581            client_id,
1582            venue: Some(instrument_id.venue),
1583            command_id: UUID4::new(),
1584            ts_init: self.generate_ts_init(),
1585            params,
1586        });
1587
1588        self.send_data_cmd(DataCommand::Unsubscribe(command));
1589    }
1590
1591    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1592    ///
1593    /// The `interval_ms` must match a previously subscribed interval.
1594    pub fn unsubscribe_book_at_interval<A: DataActor>(
1595        &mut self,
1596        instrument_id: InstrumentId,
1597        interval_ms: NonZeroUsize,
1598        client_id: Option<ClientId>,
1599        params: Option<IndexMap<String, String>>,
1600    ) {
1601        self.check_registered();
1602
1603        let topic = get_book_snapshots_topic(instrument_id);
1604        if let Some(handler) = self.topic_handlers.get(&topic) {
1605            msgbus::unsubscribe_topic(topic, handler.clone());
1606        };
1607
1608        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
1609            instrument_id,
1610            client_id,
1611            venue: Some(instrument_id.venue),
1612            command_id: UUID4::new(),
1613            ts_init: self.generate_ts_init(),
1614            params,
1615        });
1616
1617        self.send_data_cmd(DataCommand::Unsubscribe(command));
1618    }
1619
1620    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1621    pub fn unsubscribe_quotes<A: DataActor>(
1622        &self,
1623        instrument_id: InstrumentId,
1624        client_id: Option<ClientId>,
1625        params: Option<IndexMap<String, String>>,
1626    ) {
1627        self.check_registered();
1628
1629        let topic = get_quotes_topic(instrument_id);
1630        if let Some(handler) = self.topic_handlers.get(&topic) {
1631            msgbus::unsubscribe_topic(topic, handler.clone());
1632        };
1633
1634        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
1635            instrument_id,
1636            client_id,
1637            venue: Some(instrument_id.venue),
1638            command_id: UUID4::new(),
1639            ts_init: self.generate_ts_init(),
1640            params,
1641        });
1642
1643        self.send_data_cmd(DataCommand::Unsubscribe(command));
1644    }
1645
1646    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1647    pub fn unsubscribe_trades<A: DataActor>(
1648        &self,
1649        instrument_id: InstrumentId,
1650        client_id: Option<ClientId>,
1651        params: Option<IndexMap<String, String>>,
1652    ) {
1653        self.check_registered();
1654
1655        let topic = get_trades_topic(instrument_id);
1656        if let Some(handler) = self.topic_handlers.get(&topic) {
1657            msgbus::unsubscribe_topic(topic, handler.clone());
1658        };
1659
1660        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
1661            instrument_id,
1662            client_id,
1663            venue: Some(instrument_id.venue),
1664            command_id: UUID4::new(),
1665            ts_init: self.generate_ts_init(),
1666            params,
1667        });
1668
1669        self.send_data_cmd(DataCommand::Unsubscribe(command));
1670    }
1671
1672    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1673    pub fn unsubscribe_bars<A: DataActor>(
1674        &mut self,
1675        bar_type: BarType,
1676        client_id: Option<ClientId>,
1677        params: Option<IndexMap<String, String>>,
1678    ) {
1679        self.check_registered();
1680
1681        let topic = get_bars_topic(bar_type);
1682        if let Some(handler) = self.topic_handlers.get(&topic) {
1683            msgbus::unsubscribe_topic(topic, handler.clone());
1684        };
1685
1686        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
1687            bar_type,
1688            client_id,
1689            venue: Some(bar_type.instrument_id().venue),
1690            command_id: UUID4::new(),
1691            ts_init: self.generate_ts_init(),
1692            params,
1693        });
1694
1695        self.send_data_cmd(DataCommand::Unsubscribe(command));
1696    }
1697
1698    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1699    pub fn unsubscribe_mark_prices<A: DataActor>(
1700        &self,
1701        instrument_id: InstrumentId,
1702        client_id: Option<ClientId>,
1703        params: Option<IndexMap<String, String>>,
1704    ) {
1705        self.check_registered();
1706
1707        let topic = get_mark_price_topic(instrument_id);
1708        if let Some(handler) = self.topic_handlers.get(&topic) {
1709            msgbus::unsubscribe_topic(topic, handler.clone());
1710        };
1711
1712        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
1713            instrument_id,
1714            client_id,
1715            venue: Some(instrument_id.venue),
1716            command_id: UUID4::new(),
1717            ts_init: self.generate_ts_init(),
1718            params,
1719        });
1720
1721        self.send_data_cmd(DataCommand::Unsubscribe(command));
1722    }
1723
1724    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1725    pub fn unsubscribe_index_prices<A: DataActor>(
1726        &self,
1727        instrument_id: InstrumentId,
1728        client_id: Option<ClientId>,
1729        params: Option<IndexMap<String, String>>,
1730    ) {
1731        self.check_registered();
1732
1733        let topic = get_index_price_topic(instrument_id);
1734        if let Some(handler) = self.topic_handlers.get(&topic) {
1735            msgbus::unsubscribe_topic(topic, handler.clone());
1736        };
1737
1738        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
1739            instrument_id,
1740            client_id,
1741            venue: Some(instrument_id.venue),
1742            command_id: UUID4::new(),
1743            ts_init: self.generate_ts_init(),
1744            params,
1745        });
1746
1747        self.send_data_cmd(DataCommand::Unsubscribe(command));
1748    }
1749
1750    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1751    pub fn unsubscribe_instrument_status<A: DataActor>(
1752        &self,
1753        instrument_id: InstrumentId,
1754        client_id: Option<ClientId>,
1755        params: Option<IndexMap<String, String>>,
1756    ) {
1757        self.check_registered();
1758
1759        let topic = get_instrument_status_topic(instrument_id);
1760        if let Some(handler) = self.topic_handlers.get(&topic) {
1761            msgbus::unsubscribe_topic(topic, handler.clone());
1762        };
1763
1764        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
1765            instrument_id,
1766            client_id,
1767            venue: Some(instrument_id.venue),
1768            command_id: UUID4::new(),
1769            ts_init: self.generate_ts_init(),
1770            params,
1771        });
1772
1773        self.send_data_cmd(DataCommand::Unsubscribe(command));
1774    }
1775
1776    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1777    pub fn unsubscribe_instrument_close<A: DataActor>(
1778        &self,
1779        instrument_id: InstrumentId,
1780        client_id: Option<ClientId>,
1781        params: Option<IndexMap<String, String>>,
1782    ) {
1783        self.check_registered();
1784
1785        let topic = get_instrument_close_topic(instrument_id);
1786        if let Some(handler) = self.topic_handlers.get(&topic) {
1787            msgbus::unsubscribe_topic(topic, handler.clone());
1788        };
1789
1790        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
1791            instrument_id,
1792            client_id,
1793            venue: Some(instrument_id.venue),
1794            command_id: UUID4::new(),
1795            ts_init: self.generate_ts_init(),
1796            params,
1797        });
1798
1799        self.send_data_cmd(DataCommand::Unsubscribe(command));
1800    }
1801
1802    // -- REQUESTS --------------------------------------------------------------------------------
1803
1804    /// Request historical custom data of the given `data_type`.
1805    ///
1806    /// Returns a unique request ID to correlate subsequent [`CustomDataResponse`].
1807    ///
1808    /// # Errors
1809    ///
1810    /// Returns an error if the provided time range is invalid.
1811    pub fn request_data<A: DataActor>(
1812        &self,
1813        data_type: DataType,
1814        client_id: ClientId,
1815        start: Option<DateTime<Utc>>,
1816        end: Option<DateTime<Utc>>,
1817        limit: Option<NonZeroUsize>,
1818        params: Option<IndexMap<String, String>>,
1819    ) -> anyhow::Result<UUID4> {
1820        self.check_registered();
1821
1822        let now = self.clock.borrow().utc_now();
1823        check_timestamps(now, start, end)?;
1824
1825        let request_id = UUID4::new();
1826        let command = RequestCommand::Data(RequestCustomData {
1827            client_id,
1828            data_type,
1829            request_id,
1830            ts_init: self.generate_ts_init(),
1831            params,
1832        });
1833
1834        let actor_id = self.actor_id.inner();
1835        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1836            move |response: &CustomDataResponse| {
1837                get_actor_unchecked::<A>(&actor_id).handle_data_response(response);
1838            },
1839        )));
1840
1841        let msgbus = get_message_bus()
1842            .borrow_mut()
1843            .register_response_handler(command.request_id(), handler);
1844
1845        self.send_data_cmd(DataCommand::Request(command));
1846
1847        Ok(request_id)
1848    }
1849
1850    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1851    ///
1852    /// Returns a unique request ID to correlate subsequent [`InstrumentResponse`].
1853    ///
1854    /// # Errors
1855    ///
1856    /// Returns an error if the provided time range is invalid.
1857    pub fn request_instrument<A: DataActor>(
1858        &self,
1859        instrument_id: InstrumentId,
1860        start: Option<DateTime<Utc>>,
1861        end: Option<DateTime<Utc>>,
1862        client_id: Option<ClientId>,
1863        params: Option<IndexMap<String, String>>,
1864    ) -> anyhow::Result<UUID4> {
1865        self.check_registered();
1866
1867        let now = self.clock.borrow().utc_now();
1868        check_timestamps(now, start, end)?;
1869
1870        let request_id = UUID4::new();
1871        let command = RequestCommand::Instrument(RequestInstrument {
1872            instrument_id,
1873            start,
1874            end,
1875            client_id,
1876            request_id,
1877            ts_init: now.into(),
1878            params,
1879        });
1880
1881        let actor_id = self.actor_id.inner();
1882        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1883            move |response: &InstrumentResponse| {
1884                get_actor_unchecked::<A>(&actor_id).handle_instrument_response(response);
1885            },
1886        )));
1887
1888        let msgbus = get_message_bus()
1889            .borrow_mut()
1890            .register_response_handler(command.request_id(), handler);
1891
1892        self.send_data_cmd(DataCommand::Request(command));
1893
1894        Ok(request_id)
1895    }
1896
1897    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1898    ///
1899    /// Returns a unique request ID to correlate subsequent [`InstrumentsResponse`].
1900    ///
1901    /// # Errors
1902    ///
1903    /// Returns an error if the provided time range is invalid.
1904    pub fn request_instruments<A: DataActor>(
1905        &self,
1906        venue: Option<Venue>,
1907        start: Option<DateTime<Utc>>,
1908        end: Option<DateTime<Utc>>,
1909        client_id: Option<ClientId>,
1910        params: Option<IndexMap<String, String>>,
1911    ) -> anyhow::Result<UUID4> {
1912        self.check_registered();
1913
1914        let now = self.clock.borrow().utc_now();
1915        check_timestamps(now, start, end)?;
1916
1917        let request_id = UUID4::new();
1918        let command = RequestCommand::Instruments(RequestInstruments {
1919            venue,
1920            start,
1921            end,
1922            client_id,
1923            request_id,
1924            ts_init: now.into(),
1925            params,
1926        });
1927
1928        let actor_id = self.actor_id.inner();
1929        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1930            move |response: &InstrumentsResponse| {
1931                get_actor_unchecked::<A>(&actor_id).handle_instruments_response(response);
1932            },
1933        )));
1934
1935        let msgbus = get_message_bus()
1936            .borrow_mut()
1937            .register_response_handler(command.request_id(), handler);
1938
1939        self.send_data_cmd(DataCommand::Request(command));
1940
1941        Ok(request_id)
1942    }
1943
1944    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1945    ///
1946    /// Returns a unique request ID to correlate subsequent [`BookResponse`].
1947    ///
1948    /// # Errors
1949    ///
1950    /// This function never returns an error.
1951    pub fn request_book_snapshot<A: DataActor>(
1952        &self,
1953        instrument_id: InstrumentId,
1954        depth: Option<NonZeroUsize>,
1955        client_id: Option<ClientId>,
1956        params: Option<IndexMap<String, String>>,
1957    ) -> anyhow::Result<UUID4> {
1958        self.check_registered();
1959
1960        let request_id = UUID4::new();
1961        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
1962            instrument_id,
1963            depth,
1964            client_id,
1965            request_id,
1966            ts_init: self.generate_ts_init(),
1967            params,
1968        });
1969
1970        let actor_id = self.actor_id.inner();
1971        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1972            move |response: &BookResponse| {
1973                get_actor_unchecked::<A>(&actor_id).handle_book_response(response);
1974            },
1975        )));
1976
1977        let msgbus = get_message_bus()
1978            .borrow_mut()
1979            .register_response_handler(command.request_id(), handler);
1980
1981        self.send_data_cmd(DataCommand::Request(command));
1982
1983        Ok(request_id)
1984    }
1985
1986    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1987    ///
1988    /// Returns a unique request ID to correlate subsequent [`QuotesResponse`].
1989    ///
1990    /// # Errors
1991    ///
1992    /// Returns an error if the provided time range is invalid.
1993    pub fn request_quotes<A: DataActor>(
1994        &self,
1995        instrument_id: InstrumentId,
1996        start: Option<DateTime<Utc>>,
1997        end: Option<DateTime<Utc>>,
1998        limit: Option<NonZeroUsize>,
1999        client_id: Option<ClientId>,
2000        params: Option<IndexMap<String, String>>,
2001    ) -> anyhow::Result<UUID4> {
2002        self.check_registered();
2003
2004        let now = self.clock.borrow().utc_now();
2005        check_timestamps(now, start, end)?;
2006
2007        let request_id = UUID4::new();
2008        let command = RequestCommand::Quotes(RequestQuotes {
2009            instrument_id,
2010            start,
2011            end,
2012            limit,
2013            client_id,
2014            request_id,
2015            ts_init: now.into(),
2016            params,
2017        });
2018
2019        let actor_id = self.actor_id.inner();
2020        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
2021            move |response: &QuotesResponse| {
2022                get_actor_unchecked::<A>(&actor_id).handle_quotes_response(response);
2023            },
2024        )));
2025
2026        let msgbus = get_message_bus()
2027            .borrow_mut()
2028            .register_response_handler(command.request_id(), handler);
2029
2030        self.send_data_cmd(DataCommand::Request(command));
2031
2032        Ok(request_id)
2033    }
2034
2035    /// Request historical [`TradeTick`] data for the given `instrument_id`.
2036    ///
2037    /// Returns a unique request ID to correlate subsequent [`TradesResponse`].
2038    ///
2039    /// # Errors
2040    ///
2041    /// Returns an error if the provided time range is invalid.
2042    pub fn request_trades<A: DataActor>(
2043        &self,
2044        instrument_id: InstrumentId,
2045        start: Option<DateTime<Utc>>,
2046        end: Option<DateTime<Utc>>,
2047        limit: Option<NonZeroUsize>,
2048        client_id: Option<ClientId>,
2049        params: Option<IndexMap<String, String>>,
2050    ) -> anyhow::Result<UUID4> {
2051        self.check_registered();
2052
2053        let now = self.clock.borrow().utc_now();
2054        check_timestamps(now, start, end)?;
2055
2056        let request_id = UUID4::new();
2057        let command = RequestCommand::Trades(RequestTrades {
2058            instrument_id,
2059            start,
2060            end,
2061            limit,
2062            client_id,
2063            request_id,
2064            ts_init: now.into(),
2065            params,
2066        });
2067
2068        let actor_id = self.actor_id.inner();
2069        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
2070            move |response: &TradesResponse| {
2071                get_actor_unchecked::<A>(&actor_id).handle_trades_response(response);
2072            },
2073        )));
2074
2075        let msgbus = get_message_bus()
2076            .borrow_mut()
2077            .register_response_handler(command.request_id(), handler);
2078
2079        self.send_data_cmd(DataCommand::Request(command));
2080
2081        Ok(request_id)
2082    }
2083
2084    /// Request historical [`Bar`] data for the given `bar_type`.
2085    ///
2086    /// Returns a unique request ID to correlate subsequent [`BarsResponse`].
2087    ///
2088    /// # Errors
2089    ///
2090    /// Returns an error if the provided time range is invalid.
2091    pub fn request_bars<A: DataActor>(
2092        &self,
2093        bar_type: BarType,
2094        start: Option<DateTime<Utc>>,
2095        end: Option<DateTime<Utc>>,
2096        limit: Option<NonZeroUsize>,
2097        client_id: Option<ClientId>,
2098        params: Option<IndexMap<String, String>>,
2099    ) -> anyhow::Result<UUID4> {
2100        self.check_registered();
2101
2102        let now = self.clock.borrow().utc_now();
2103        check_timestamps(now, start, end)?;
2104
2105        let request_id = UUID4::new();
2106        let command = RequestCommand::Bars(RequestBars {
2107            bar_type,
2108            start,
2109            end,
2110            limit,
2111            client_id,
2112            request_id,
2113            ts_init: now.into(),
2114            params,
2115        });
2116
2117        let actor_id = self.actor_id.inner();
2118        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
2119            move |response: &BarsResponse| {
2120                get_actor_unchecked::<A>(&actor_id).handle_bars_response(response);
2121            },
2122        )));
2123
2124        let msgbus = get_message_bus()
2125            .borrow_mut()
2126            .register_response_handler(command.request_id(), handler);
2127
2128        self.send_data_cmd(DataCommand::Request(command));
2129
2130        Ok(request_id)
2131    }
2132}
2133
2134fn check_timestamps(
2135    now: DateTime<Utc>,
2136    start: Option<DateTime<Utc>>,
2137    end: Option<DateTime<Utc>>,
2138) -> anyhow::Result<()> {
2139    if let Some(start) = start {
2140        check_predicate_true(start <= now, "start was > now")?
2141    }
2142    if let Some(end) = end {
2143        check_predicate_true(end <= now, "end was > now")?
2144    }
2145
2146    if let (Some(start), Some(end)) = (start, end) {
2147        check_predicate_true(start < end, "start was >= end")?
2148    }
2149
2150    Ok(())
2151}
2152
2153fn log_error(e: &anyhow::Error) {
2154    log::error!("{e}");
2155}
2156
2157fn log_not_running<T>(msg: &T)
2158where
2159    T: Debug,
2160{
2161    // TODO: Potentially temporary for development? drop level at some stage
2162    log::warn!("Received message when not running - skipping {msg:?}");
2163}
2164
2165fn log_received<T>(msg: &T)
2166where
2167    T: Debug,
2168{
2169    log::debug!("{RECV} {msg:?}");
2170}