nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use chrono::{DateTime, Utc};
28use indexmap::IndexMap;
29use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
30#[cfg(feature = "defi")]
31use nautilus_model::defi::{Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap};
32use nautilus_model::{
33    data::{
34        Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
35        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
36    },
37    enums::BookType,
38    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
39    instruments::InstrumentAny,
40    orderbook::OrderBook,
41};
42use ustr::Ustr;
43
44#[cfg(feature = "indicators")]
45use super::indicators::Indicators;
46use super::{
47    Actor,
48    registry::{get_actor_unchecked, try_get_actor_unchecked},
49};
50#[cfg(feature = "defi")]
51use crate::msgbus::switchboard::{
52    get_defi_blocks_topic, get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
53};
54use crate::{
55    cache::Cache,
56    clock::Clock,
57    component::Component,
58    enums::{ComponentState, ComponentTrigger},
59    logging::{CMD, RECV, REQ, SEND},
60    messages::{
61        data::{
62            BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
63            InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
64            RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
65            SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
66            SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
67            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
68            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
69            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
70            UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
71            UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
72            UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
73        },
74        system::ShutdownSystem,
75    },
76    msgbus::{
77        self, MStr, Topic, get_message_bus,
78        handler::{ShareableMessageHandler, TypedMessageHandler},
79        switchboard::{
80            MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
81            get_custom_topic, get_funding_rate_topic, get_index_price_topic,
82            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
83            get_instruments_topic, get_mark_price_topic, get_quotes_topic, get_trades_topic,
84        },
85    },
86    signal::Signal,
87    timer::{TimeEvent, TimeEventCallback},
88};
89
90/// Common configuration for [`DataActor`] based components.
91#[derive(Debug, Clone)]
92#[cfg_attr(
93    feature = "python",
94    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", subclass)
95)]
96pub struct DataActorConfig {
97    /// The custom identifier for the Actor.
98    pub actor_id: Option<ActorId>,
99    /// If events should be logged.
100    pub log_events: bool,
101    /// If commands should be logged.
102    pub log_commands: bool,
103}
104
105impl Default for DataActorConfig {
106    fn default() -> Self {
107        Self {
108            actor_id: None,
109            log_events: true,
110            log_commands: true,
111        }
112    }
113}
114
115/// Configuration for creating actors from importable paths.
116#[derive(Debug, Clone)]
117#[cfg_attr(
118    feature = "python",
119    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
120)]
121pub struct ImportableActorConfig {
122    /// The fully qualified name of the Actor class.
123    pub actor_path: String,
124    /// The fully qualified name of the Actor config class.
125    pub config_path: String,
126    /// The actor configuration as a dictionary.
127    pub config: HashMap<String, serde_json::Value>,
128}
129
130type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; // TODO: TBD
131
132pub trait DataActor:
133    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
134{
135    /// Actions to be performed when the actor state is saved.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if saving the actor state fails.
140    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
141        Ok(IndexMap::new())
142    }
143
144    /// Actions to be performed when the actor state is loaded.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if loading the actor state fails.
149    #[allow(unused_variables)]
150    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
151        Ok(())
152    }
153
154    /// Actions to be performed on start.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if starting the actor fails.
159    fn on_start(&mut self) -> anyhow::Result<()> {
160        log::warn!(
161            "The `on_start` handler was called when not overridden, \
162            it's expected that any actions required when starting the actor \
163            occur here, such as subscribing/requesting data"
164        );
165        Ok(())
166    }
167
168    /// Actions to be performed on stop.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if stopping the actor fails.
173    fn on_stop(&mut self) -> anyhow::Result<()> {
174        log::warn!(
175            "The `on_stop` handler was called when not overridden, \
176            it's expected that any actions required when stopping the actor \
177            occur here, such as unsubscribing from data",
178        );
179        Ok(())
180    }
181
182    /// Actions to be performed on resume.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if resuming the actor fails.
187    fn on_resume(&mut self) -> anyhow::Result<()> {
188        log::warn!(
189            "The `on_resume` handler was called when not overridden, \
190            it's expected that any actions required when resuming the actor \
191            following a stop occur here"
192        );
193        Ok(())
194    }
195
196    /// Actions to be performed on reset.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if resetting the actor fails.
201    fn on_reset(&mut self) -> anyhow::Result<()> {
202        log::warn!(
203            "The `on_reset` handler was called when not overridden, \
204            it's expected that any actions required when resetting the actor \
205            occur here, such as resetting indicators and other state"
206        );
207        Ok(())
208    }
209
210    /// Actions to be performed on dispose.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if disposing the actor fails.
215    fn on_dispose(&mut self) -> anyhow::Result<()> {
216        Ok(())
217    }
218
219    /// Actions to be performed on degrade.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if degrading the actor fails.
224    fn on_degrade(&mut self) -> anyhow::Result<()> {
225        Ok(())
226    }
227
228    /// Actions to be performed on fault.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if faulting the actor fails.
233    fn on_fault(&mut self) -> anyhow::Result<()> {
234        Ok(())
235    }
236
237    /// Actions to be performed when receiving a time event.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if handling the time event fails.
242    #[allow(unused_variables)]
243    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
244        Ok(())
245    }
246
247    /// Actions to be performed when receiving custom data.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if handling the data fails.
252    #[allow(unused_variables)]
253    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
254        Ok(())
255    }
256
257    /// Actions to be performed when receiving a signal.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if handling the signal fails.
262    #[allow(unused_variables)]
263    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
264        Ok(())
265    }
266
267    /// Actions to be performed when receiving an instrument.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if handling the instrument fails.
272    #[allow(unused_variables)]
273    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
274        Ok(())
275    }
276
277    /// Actions to be performed when receiving order book deltas.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if handling the book deltas fails.
282    #[allow(unused_variables)]
283    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
284        Ok(())
285    }
286
287    /// Actions to be performed when receiving an order book.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if handling the book fails.
292    #[allow(unused_variables)]
293    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
294        Ok(())
295    }
296
297    /// Actions to be performed when receiving a quote.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if handling the quote fails.
302    #[allow(unused_variables)]
303    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
304        Ok(())
305    }
306
307    /// Actions to be performed when receiving a trade.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if handling the trade fails.
312    #[allow(unused_variables)]
313    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
314        Ok(())
315    }
316
317    /// Actions to be performed when receiving a bar.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if handling the bar fails.
322    #[allow(unused_variables)]
323    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
324        Ok(())
325    }
326
327    /// Actions to be performed when receiving a mark price update.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if handling the mark price update fails.
332    #[allow(unused_variables)]
333    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
334        Ok(())
335    }
336
337    /// Actions to be performed when receiving an index price update.
338    ///
339    /// # Errors
340    ///
341    /// Returns an error if handling the index price update fails.
342    #[allow(unused_variables)]
343    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
344        Ok(())
345    }
346
347    /// Actions to be performed when receiving a funding rate update.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if handling the funding rate update fails.
352    #[allow(unused_variables)]
353    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
354        Ok(())
355    }
356
357    /// Actions to be performed when receiving an instrument status update.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if handling the instrument status update fails.
362    #[allow(unused_variables)]
363    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
364        Ok(())
365    }
366
367    /// Actions to be performed when receiving an instrument close update.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if handling the instrument close update fails.
372    #[allow(unused_variables)]
373    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
374        Ok(())
375    }
376
377    #[cfg(feature = "defi")]
378    /// Actions to be performed when receiving a block.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if handling the block fails.
383    #[allow(unused_variables)]
384    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
385        Ok(())
386    }
387
388    #[cfg(feature = "defi")]
389    /// Actions to be performed when receiving a pool.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if handling the pool fails.
394    #[allow(unused_variables)]
395    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
396        Ok(())
397    }
398
399    #[cfg(feature = "defi")]
400    /// Actions to be performed when receiving a pool swap.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if handling the pool swap fails.
405    #[allow(unused_variables)]
406    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
407        Ok(())
408    }
409
410    #[cfg(feature = "defi")]
411    /// Actions to be performed when receiving a pool liquidity update.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if handling the pool liquidity update fails.
416    #[allow(unused_variables)]
417    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
418        Ok(())
419    }
420
421    /// Actions to be performed when receiving historical data.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if handling the historical data fails.
426    #[allow(unused_variables)]
427    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
428        Ok(())
429    }
430
431    /// Actions to be performed when receiving historical quotes.
432    ///
433    /// # Errors
434    ///
435    /// Returns an error if handling the historical quotes fails.
436    #[allow(unused_variables)]
437    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
438        Ok(())
439    }
440
441    /// Actions to be performed when receiving historical trades.
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if handling the historical trades fails.
446    #[allow(unused_variables)]
447    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
448        Ok(())
449    }
450
451    /// Actions to be performed when receiving historical bars.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if handling the historical bars fails.
456    #[allow(unused_variables)]
457    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
458        Ok(())
459    }
460
461    /// Actions to be performed when receiving historical mark prices.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if handling the historical mark prices fails.
466    #[allow(unused_variables)]
467    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
468        Ok(())
469    }
470
471    /// Actions to be performed when receiving historical index prices.
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if handling the historical index prices fails.
476    #[allow(unused_variables)]
477    fn on_historical_index_prices(
478        &mut self,
479        index_prices: &[IndexPriceUpdate],
480    ) -> anyhow::Result<()> {
481        Ok(())
482    }
483
484    /// Handles a received time event.
485    fn handle_time_event(&mut self, event: &TimeEvent) {
486        log_received(&event);
487
488        if let Err(e) = DataActor::on_time_event(self, event) {
489            log_error(&e);
490        }
491    }
492
493    /// Handles a received custom data point.
494    fn handle_data(&mut self, data: &dyn Any) {
495        log_received(&data);
496
497        if self.not_running() {
498            log_not_running(&data);
499            return;
500        }
501
502        if let Err(e) = self.on_data(data) {
503            log_error(&e);
504        }
505    }
506
507    /// Handles a received signal.
508    fn handle_signal(&mut self, signal: &Signal) {
509        log_received(&signal);
510
511        if self.not_running() {
512            log_not_running(&signal);
513            return;
514        }
515
516        if let Err(e) = self.on_signal(signal) {
517            log_error(&e);
518        }
519    }
520
521    /// Handles a received instrument.
522    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
523        log_received(&instrument);
524
525        if self.not_running() {
526            log_not_running(&instrument);
527            return;
528        }
529
530        if let Err(e) = self.on_instrument(instrument) {
531            log_error(&e);
532        }
533    }
534
535    /// Handles received order book deltas.
536    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
537        log_received(&deltas);
538
539        if self.not_running() {
540            log_not_running(&deltas);
541            return;
542        }
543
544        if let Err(e) = self.on_book_deltas(deltas) {
545            log_error(&e);
546        }
547    }
548
549    /// Handles a received order book reference.
550    fn handle_book(&mut self, book: &OrderBook) {
551        log_received(&book);
552
553        if self.not_running() {
554            log_not_running(&book);
555            return;
556        }
557
558        if let Err(e) = self.on_book(book) {
559            log_error(&e);
560        };
561    }
562
563    /// Handles a received quote.
564    fn handle_quote(&mut self, quote: &QuoteTick) {
565        log_received(&quote);
566
567        if self.not_running() {
568            log_not_running(&quote);
569            return;
570        }
571
572        if let Err(e) = self.on_quote(quote) {
573            log_error(&e);
574        }
575    }
576
577    /// Handles a received trade.
578    fn handle_trade(&mut self, trade: &TradeTick) {
579        log_received(&trade);
580
581        if self.not_running() {
582            log_not_running(&trade);
583            return;
584        }
585
586        if let Err(e) = self.on_trade(trade) {
587            log_error(&e);
588        }
589    }
590
591    /// Handles a receiving bar.
592    fn handle_bar(&mut self, bar: &Bar) {
593        log_received(&bar);
594
595        if self.not_running() {
596            log_not_running(&bar);
597            return;
598        }
599
600        if let Err(e) = self.on_bar(bar) {
601            log_error(&e);
602        }
603    }
604
605    /// Handles a received mark price update.
606    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
607        log_received(&mark_price);
608
609        if self.not_running() {
610            log_not_running(&mark_price);
611            return;
612        }
613
614        if let Err(e) = self.on_mark_price(mark_price) {
615            log_error(&e);
616        }
617    }
618
619    /// Handles a received index price update.
620    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
621        log_received(&index_price);
622
623        if self.not_running() {
624            log_not_running(&index_price);
625            return;
626        }
627
628        if let Err(e) = self.on_index_price(index_price) {
629            log_error(&e);
630        }
631    }
632
633    /// Handles a received funding rate update.
634    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
635        log_received(&funding_rate);
636
637        if self.not_running() {
638            log_not_running(&funding_rate);
639            return;
640        }
641
642        if let Err(e) = self.on_funding_rate(funding_rate) {
643            log_error(&e);
644        }
645    }
646
647    /// Handles a received instrument status.
648    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
649        log_received(&status);
650
651        if self.not_running() {
652            log_not_running(&status);
653            return;
654        }
655
656        if let Err(e) = self.on_instrument_status(status) {
657            log_error(&e);
658        }
659    }
660
661    /// Handles a received instrument close.
662    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
663        log_received(&close);
664
665        if self.not_running() {
666            log_not_running(&close);
667            return;
668        }
669
670        if let Err(e) = self.on_instrument_close(close) {
671            log_error(&e);
672        }
673    }
674
675    #[cfg(feature = "defi")]
676    /// Handles a received block.
677    fn handle_block(&mut self, block: &Block) {
678        log_received(&block);
679
680        if self.not_running() {
681            log_not_running(&block);
682            return;
683        }
684
685        if let Err(e) = self.on_block(block) {
686            log_error(&e);
687        }
688    }
689
690    #[cfg(feature = "defi")]
691    /// Handles a received pool definition update.
692    fn handle_pool(&mut self, pool: &Pool) {
693        log_received(&pool);
694
695        if self.not_running() {
696            log_not_running(&pool);
697            return;
698        }
699
700        if let Err(e) = self.on_pool(pool) {
701            log_error(&e);
702        }
703    }
704
705    #[cfg(feature = "defi")]
706    /// Handles a received pool swap.
707    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
708        log_received(&swap);
709
710        if self.not_running() {
711            log_not_running(&swap);
712            return;
713        }
714
715        if let Err(e) = self.on_pool_swap(swap) {
716            log_error(&e);
717        }
718    }
719
720    #[cfg(feature = "defi")]
721    /// Handles a received pool liquidity update.
722    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
723        log_received(&update);
724
725        if self.not_running() {
726            log_not_running(&update);
727            return;
728        }
729
730        if let Err(e) = self.on_pool_liquidity_update(update) {
731            log_error(&e);
732        }
733    }
734
735    /// Handles received historical data.
736    fn handle_historical_data(&mut self, data: &dyn Any) {
737        log_received(&data);
738
739        if let Err(e) = self.on_historical_data(data) {
740            log_error(&e);
741        }
742    }
743
744    /// Handles a data response.
745    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
746        log_received(&resp);
747
748        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
749            log_error(&e);
750        }
751    }
752
753    /// Handles an instrument response.
754    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
755        log_received(&resp);
756
757        if let Err(e) = self.on_instrument(&resp.data) {
758            log_error(&e);
759        }
760    }
761
762    /// Handles an instruments response.
763    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
764        log_received(&resp);
765
766        for inst in &resp.data {
767            if let Err(e) = self.on_instrument(inst) {
768                log_error(&e);
769            }
770        }
771    }
772
773    /// Handles a book response.
774    fn handle_book_response(&mut self, resp: &BookResponse) {
775        log_received(&resp);
776
777        if let Err(e) = self.on_book(&resp.data) {
778            log_error(&e);
779        }
780    }
781
782    /// Handles a quotes response.
783    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
784        log_received(&resp);
785
786        if let Err(e) = self.on_historical_quotes(&resp.data) {
787            log_error(&e);
788        }
789    }
790
791    /// Handles a trades response.
792    fn handle_trades_response(&mut self, resp: &TradesResponse) {
793        log_received(&resp);
794
795        if let Err(e) = self.on_historical_trades(&resp.data) {
796            log_error(&e);
797        }
798    }
799
800    /// Handles a bars response.
801    fn handle_bars_response(&mut self, resp: &BarsResponse) {
802        log_received(&resp);
803
804        if let Err(e) = self.on_historical_bars(&resp.data) {
805            log_error(&e);
806        }
807    }
808
809    /// Subscribe to streaming `data_type` data.
810    fn subscribe_data(
811        &mut self,
812        data_type: DataType,
813        client_id: Option<ClientId>,
814        params: Option<IndexMap<String, String>>,
815    ) where
816        Self: 'static + Debug + Sized,
817    {
818        let actor_id = self.actor_id().inner();
819        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
820            move |data: &dyn Any| {
821                get_actor_unchecked::<Self>(&actor_id).handle_data(data);
822            },
823        )));
824
825        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
826    }
827
828    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
829    fn subscribe_quotes(
830        &mut self,
831        instrument_id: InstrumentId,
832        client_id: Option<ClientId>,
833        params: Option<IndexMap<String, String>>,
834    ) where
835        Self: 'static + Debug + Sized,
836    {
837        let actor_id = self.actor_id().inner();
838        let topic = get_quotes_topic(instrument_id);
839
840        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
841            move |quote: &QuoteTick| {
842                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
843                    actor.handle_quote(quote);
844                } else {
845                    log::error!("Actor {actor_id} not found for quote handling");
846                }
847            },
848        )));
849
850        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
851    }
852
853    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
854    fn subscribe_instruments(
855        &mut self,
856        venue: Venue,
857        client_id: Option<ClientId>,
858        params: Option<IndexMap<String, String>>,
859    ) where
860        Self: 'static + Debug + Sized,
861    {
862        let actor_id = self.actor_id().inner();
863        let topic = get_instruments_topic(venue);
864
865        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
866            move |instrument: &InstrumentAny| {
867                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
868                    actor.handle_instrument(instrument);
869                } else {
870                    log::error!("Actor {actor_id} not found for instruments handling");
871                }
872            },
873        )));
874
875        DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
876    }
877
878    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
879    fn subscribe_instrument(
880        &mut self,
881        instrument_id: InstrumentId,
882        client_id: Option<ClientId>,
883        params: Option<IndexMap<String, String>>,
884    ) where
885        Self: 'static + Debug + Sized,
886    {
887        let actor_id = self.actor_id().inner();
888        let topic = get_instrument_topic(instrument_id);
889
890        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
891            move |instrument: &InstrumentAny| {
892                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
893                    actor.handle_instrument(instrument);
894                } else {
895                    log::error!("Actor {actor_id} not found for instrument handling");
896                }
897            },
898        )));
899
900        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
901    }
902
903    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
904    fn subscribe_book_deltas(
905        &mut self,
906        instrument_id: InstrumentId,
907        book_type: BookType,
908        depth: Option<NonZeroUsize>,
909        client_id: Option<ClientId>,
910        managed: bool,
911        params: Option<IndexMap<String, String>>,
912    ) where
913        Self: 'static + Debug + Sized,
914    {
915        let actor_id = self.actor_id().inner();
916        let topic = get_book_deltas_topic(instrument_id);
917
918        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
919            move |deltas: &OrderBookDeltas| {
920                get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
921            },
922        )));
923
924        DataActorCore::subscribe_book_deltas(
925            self,
926            topic,
927            handler,
928            instrument_id,
929            book_type,
930            depth,
931            client_id,
932            managed,
933            params,
934        );
935    }
936
937    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
938    fn subscribe_book_at_interval(
939        &mut self,
940        instrument_id: InstrumentId,
941        book_type: BookType,
942        depth: Option<NonZeroUsize>,
943        interval_ms: NonZeroUsize,
944        client_id: Option<ClientId>,
945        params: Option<IndexMap<String, String>>,
946    ) where
947        Self: 'static + Debug + Sized,
948    {
949        let actor_id = self.actor_id().inner();
950        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
951
952        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
953            move |book: &OrderBook| {
954                get_actor_unchecked::<Self>(&actor_id).handle_book(book);
955            },
956        )));
957
958        DataActorCore::subscribe_book_at_interval(
959            self,
960            topic,
961            handler,
962            instrument_id,
963            book_type,
964            depth,
965            interval_ms,
966            client_id,
967            params,
968        );
969    }
970
971    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
972    fn subscribe_trades(
973        &mut self,
974        instrument_id: InstrumentId,
975        client_id: Option<ClientId>,
976        params: Option<IndexMap<String, String>>,
977    ) where
978        Self: 'static + Debug + Sized,
979    {
980        let actor_id = self.actor_id().inner();
981        let topic = get_trades_topic(instrument_id);
982
983        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
984            move |trade: &TradeTick| {
985                get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
986            },
987        )));
988
989        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
990    }
991
992    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
993    fn subscribe_bars(
994        &mut self,
995        bar_type: BarType,
996        client_id: Option<ClientId>,
997        await_partial: bool,
998        params: Option<IndexMap<String, String>>,
999    ) where
1000        Self: 'static + Debug + Sized,
1001    {
1002        let actor_id = self.actor_id().inner();
1003        let topic = get_bars_topic(bar_type);
1004
1005        let handler =
1006            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1007                get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1008            })));
1009
1010        DataActorCore::subscribe_bars(
1011            self,
1012            topic,
1013            handler,
1014            bar_type,
1015            client_id,
1016            await_partial,
1017            params,
1018        );
1019    }
1020
1021    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1022    fn subscribe_mark_prices(
1023        &mut self,
1024        instrument_id: InstrumentId,
1025        client_id: Option<ClientId>,
1026        params: Option<IndexMap<String, String>>,
1027    ) where
1028        Self: 'static + Debug + Sized,
1029    {
1030        let actor_id = self.actor_id().inner();
1031        let topic = get_mark_price_topic(instrument_id);
1032
1033        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1034            move |mark_price: &MarkPriceUpdate| {
1035                get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1036            },
1037        )));
1038
1039        DataActorCore::subscribe_mark_prices(
1040            self,
1041            topic,
1042            handler,
1043            instrument_id,
1044            client_id,
1045            params,
1046        );
1047    }
1048
1049    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1050    fn subscribe_index_prices(
1051        &mut self,
1052        instrument_id: InstrumentId,
1053        client_id: Option<ClientId>,
1054        params: Option<IndexMap<String, String>>,
1055    ) where
1056        Self: 'static + Debug + Sized,
1057    {
1058        let actor_id = self.actor_id().inner();
1059        let topic = get_index_price_topic(instrument_id);
1060
1061        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1062            move |index_price: &IndexPriceUpdate| {
1063                get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1064            },
1065        )));
1066
1067        DataActorCore::subscribe_index_prices(
1068            self,
1069            topic,
1070            handler,
1071            instrument_id,
1072            client_id,
1073            params,
1074        );
1075    }
1076
1077    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1078    fn subscribe_funding_rates(
1079        &mut self,
1080        instrument_id: InstrumentId,
1081        client_id: Option<ClientId>,
1082        params: Option<IndexMap<String, String>>,
1083    ) where
1084        Self: 'static + Debug + Sized,
1085    {
1086        let actor_id = self.actor_id().inner();
1087        let topic = get_funding_rate_topic(instrument_id);
1088
1089        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1090            move |funding_rate: &FundingRateUpdate| {
1091                get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1092            },
1093        )));
1094
1095        DataActorCore::subscribe_funding_rates(
1096            self,
1097            topic,
1098            handler,
1099            instrument_id,
1100            client_id,
1101            params,
1102        );
1103    }
1104
1105    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1106    fn subscribe_instrument_status(
1107        &mut self,
1108        instrument_id: InstrumentId,
1109        client_id: Option<ClientId>,
1110        params: Option<IndexMap<String, String>>,
1111    ) where
1112        Self: 'static + Debug + Sized,
1113    {
1114        let actor_id = self.actor_id().inner();
1115        let topic = get_instrument_status_topic(instrument_id);
1116
1117        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1118            move |status: &InstrumentStatus| {
1119                get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1120            },
1121        )));
1122
1123        DataActorCore::subscribe_instrument_status(
1124            self,
1125            topic,
1126            handler,
1127            instrument_id,
1128            client_id,
1129            params,
1130        );
1131    }
1132
1133    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1134    fn subscribe_instrument_close(
1135        &mut self,
1136        instrument_id: InstrumentId,
1137        client_id: Option<ClientId>,
1138        params: Option<IndexMap<String, String>>,
1139    ) where
1140        Self: 'static + Debug + Sized,
1141    {
1142        let actor_id = self.actor_id().inner();
1143        let topic = get_instrument_close_topic(instrument_id);
1144
1145        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1146            move |close: &InstrumentClose| {
1147                get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1148            },
1149        )));
1150
1151        DataActorCore::subscribe_instrument_close(
1152            self,
1153            topic,
1154            handler,
1155            instrument_id,
1156            client_id,
1157            params,
1158        );
1159    }
1160
1161    #[cfg(feature = "defi")]
1162    /// Subscribe to streaming [`Block`] data for the `chain`.
1163    fn subscribe_blocks(
1164        &mut self,
1165        chain: Blockchain,
1166        client_id: Option<ClientId>,
1167        params: Option<IndexMap<String, String>>,
1168    ) where
1169        Self: 'static + Debug + Sized,
1170    {
1171        let actor_id = self.actor_id().inner();
1172        let topic = get_defi_blocks_topic(chain);
1173
1174        let handler =
1175            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1176                get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1177            })));
1178
1179        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1180    }
1181
1182    #[cfg(feature = "defi")]
1183    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1184    fn subscribe_pool(
1185        &mut self,
1186        instrument_id: InstrumentId,
1187        client_id: Option<ClientId>,
1188        params: Option<IndexMap<String, String>>,
1189    ) where
1190        Self: 'static + Debug + Sized,
1191    {
1192        let actor_id = self.actor_id().inner();
1193        let topic = get_defi_pool_topic(instrument_id);
1194
1195        let handler =
1196            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1197                get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1198            })));
1199
1200        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1201    }
1202
1203    #[cfg(feature = "defi")]
1204    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1205    fn subscribe_pool_swaps(
1206        &mut self,
1207        instrument_id: InstrumentId,
1208        client_id: Option<ClientId>,
1209        params: Option<IndexMap<String, String>>,
1210    ) where
1211        Self: 'static + Debug + Sized,
1212    {
1213        let actor_id = self.actor_id().inner();
1214        let topic = get_defi_pool_swaps_topic(instrument_id);
1215
1216        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1217            move |swap: &PoolSwap| {
1218                get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1219            },
1220        )));
1221
1222        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1223    }
1224
1225    #[cfg(feature = "defi")]
1226    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1227    fn subscribe_pool_liquidity_updates(
1228        &mut self,
1229        instrument_id: InstrumentId,
1230        client_id: Option<ClientId>,
1231        params: Option<IndexMap<String, String>>,
1232    ) where
1233        Self: 'static + Debug + Sized,
1234    {
1235        let actor_id = self.actor_id().inner();
1236        let topic = get_defi_liquidity_topic(instrument_id);
1237
1238        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1239            move |update: &PoolLiquidityUpdate| {
1240                get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1241            },
1242        )));
1243
1244        DataActorCore::subscribe_pool_liquidity_updates(
1245            self,
1246            topic,
1247            handler,
1248            instrument_id,
1249            client_id,
1250            params,
1251        );
1252    }
1253
1254    /// Unsubscribe from streaming `data_type` data.
1255    fn unsubscribe_data(
1256        &mut self,
1257        data_type: DataType,
1258        client_id: Option<ClientId>,
1259        params: Option<IndexMap<String, String>>,
1260    ) where
1261        Self: 'static + Debug + Sized,
1262    {
1263        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1264    }
1265
1266    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1267    fn unsubscribe_instruments(
1268        &mut self,
1269        venue: Venue,
1270        client_id: Option<ClientId>,
1271        params: Option<IndexMap<String, String>>,
1272    ) where
1273        Self: 'static + Debug + Sized,
1274    {
1275        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1276    }
1277
1278    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1279    fn unsubscribe_instrument(
1280        &mut self,
1281        instrument_id: InstrumentId,
1282        client_id: Option<ClientId>,
1283        params: Option<IndexMap<String, String>>,
1284    ) where
1285        Self: 'static + Debug + Sized,
1286    {
1287        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1288    }
1289
1290    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1291    fn unsubscribe_book_deltas(
1292        &mut self,
1293        instrument_id: InstrumentId,
1294        client_id: Option<ClientId>,
1295        params: Option<IndexMap<String, String>>,
1296    ) where
1297        Self: 'static + Debug + Sized,
1298    {
1299        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1300    }
1301
1302    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1303    fn unsubscribe_book_at_interval(
1304        &mut self,
1305        instrument_id: InstrumentId,
1306        interval_ms: NonZeroUsize,
1307        client_id: Option<ClientId>,
1308        params: Option<IndexMap<String, String>>,
1309    ) where
1310        Self: 'static + Debug + Sized,
1311    {
1312        DataActorCore::unsubscribe_book_at_interval(
1313            self,
1314            instrument_id,
1315            interval_ms,
1316            client_id,
1317            params,
1318        );
1319    }
1320
1321    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1322    fn unsubscribe_quotes(
1323        &mut self,
1324        instrument_id: InstrumentId,
1325        client_id: Option<ClientId>,
1326        params: Option<IndexMap<String, String>>,
1327    ) where
1328        Self: 'static + Debug + Sized,
1329    {
1330        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1331    }
1332
1333    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1334    fn unsubscribe_trades(
1335        &mut self,
1336        instrument_id: InstrumentId,
1337        client_id: Option<ClientId>,
1338        params: Option<IndexMap<String, String>>,
1339    ) where
1340        Self: 'static + Debug + Sized,
1341    {
1342        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1343    }
1344
1345    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1346    fn unsubscribe_bars(
1347        &mut self,
1348        bar_type: BarType,
1349        client_id: Option<ClientId>,
1350        params: Option<IndexMap<String, String>>,
1351    ) where
1352        Self: 'static + Debug + Sized,
1353    {
1354        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1355    }
1356
1357    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1358    fn unsubscribe_mark_prices(
1359        &mut self,
1360        instrument_id: InstrumentId,
1361        client_id: Option<ClientId>,
1362        params: Option<IndexMap<String, String>>,
1363    ) where
1364        Self: 'static + Debug + Sized,
1365    {
1366        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1367    }
1368
1369    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1370    fn unsubscribe_index_prices(
1371        &mut self,
1372        instrument_id: InstrumentId,
1373        client_id: Option<ClientId>,
1374        params: Option<IndexMap<String, String>>,
1375    ) where
1376        Self: 'static + Debug + Sized,
1377    {
1378        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1379    }
1380
1381    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1382    fn unsubscribe_funding_rates(
1383        &mut self,
1384        instrument_id: InstrumentId,
1385        client_id: Option<ClientId>,
1386        params: Option<IndexMap<String, String>>,
1387    ) where
1388        Self: 'static + Debug + Sized,
1389    {
1390        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1391    }
1392
1393    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1394    fn unsubscribe_instrument_status(
1395        &mut self,
1396        instrument_id: InstrumentId,
1397        client_id: Option<ClientId>,
1398        params: Option<IndexMap<String, String>>,
1399    ) where
1400        Self: 'static + Debug + Sized,
1401    {
1402        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1403    }
1404
1405    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1406    fn unsubscribe_instrument_close(
1407        &mut self,
1408        instrument_id: InstrumentId,
1409        client_id: Option<ClientId>,
1410        params: Option<IndexMap<String, String>>,
1411    ) where
1412        Self: 'static + Debug + Sized,
1413    {
1414        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1415    }
1416
1417    #[cfg(feature = "defi")]
1418    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1419    fn unsubscribe_blocks(
1420        &mut self,
1421        chain: Blockchain,
1422        client_id: Option<ClientId>,
1423        params: Option<IndexMap<String, String>>,
1424    ) where
1425        Self: 'static + Debug + Sized,
1426    {
1427        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1428    }
1429
1430    #[cfg(feature = "defi")]
1431    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1432    fn unsubscribe_pool(
1433        &mut self,
1434        instrument_id: InstrumentId,
1435        client_id: Option<ClientId>,
1436        params: Option<IndexMap<String, String>>,
1437    ) where
1438        Self: 'static + Debug + Sized,
1439    {
1440        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1441    }
1442
1443    #[cfg(feature = "defi")]
1444    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1445    fn unsubscribe_pool_swaps(
1446        &mut self,
1447        instrument_id: InstrumentId,
1448        client_id: Option<ClientId>,
1449        params: Option<IndexMap<String, String>>,
1450    ) where
1451        Self: 'static + Debug + Sized,
1452    {
1453        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1454    }
1455
1456    #[cfg(feature = "defi")]
1457    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1458    fn unsubscribe_pool_liquidity_updates(
1459        &mut self,
1460        instrument_id: InstrumentId,
1461        client_id: Option<ClientId>,
1462        params: Option<IndexMap<String, String>>,
1463    ) where
1464        Self: 'static + Debug + Sized,
1465    {
1466        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1467    }
1468
1469    /// Request historical custom data of the given `data_type`.
1470    ///
1471    /// # Errors
1472    ///
1473    /// Returns an error if input parameters are invalid.
1474    fn request_data(
1475        &mut self,
1476        data_type: DataType,
1477        client_id: ClientId,
1478        start: Option<DateTime<Utc>>,
1479        end: Option<DateTime<Utc>>,
1480        limit: Option<NonZeroUsize>,
1481        params: Option<IndexMap<String, String>>,
1482    ) -> anyhow::Result<UUID4>
1483    where
1484        Self: 'static + Debug + Sized,
1485    {
1486        let actor_id = self.actor_id().inner();
1487        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1488            move |resp: &CustomDataResponse| {
1489                get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1490            },
1491        )));
1492
1493        DataActorCore::request_data(
1494            self, data_type, client_id, start, end, limit, params, handler,
1495        )
1496    }
1497
1498    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1499    ///
1500    /// # Errors
1501    ///
1502    /// Returns an error if input parameters are invalid.
1503    fn request_instrument(
1504        &mut self,
1505        instrument_id: InstrumentId,
1506        start: Option<DateTime<Utc>>,
1507        end: Option<DateTime<Utc>>,
1508        client_id: Option<ClientId>,
1509        params: Option<IndexMap<String, String>>,
1510    ) -> anyhow::Result<UUID4>
1511    where
1512        Self: 'static + Debug + Sized,
1513    {
1514        let actor_id = self.actor_id().inner();
1515        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1516            move |resp: &InstrumentResponse| {
1517                get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1518            },
1519        )));
1520
1521        DataActorCore::request_instrument(
1522            self,
1523            instrument_id,
1524            start,
1525            end,
1526            client_id,
1527            params,
1528            handler,
1529        )
1530    }
1531
1532    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1533    ///
1534    /// # Errors
1535    ///
1536    /// Returns an error if input parameters are invalid.
1537    fn request_instruments(
1538        &mut self,
1539        venue: Option<Venue>,
1540        start: Option<DateTime<Utc>>,
1541        end: Option<DateTime<Utc>>,
1542        client_id: Option<ClientId>,
1543        params: Option<IndexMap<String, String>>,
1544    ) -> anyhow::Result<UUID4>
1545    where
1546        Self: 'static + Debug + Sized,
1547    {
1548        let actor_id = self.actor_id().inner();
1549        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1550            move |resp: &InstrumentsResponse| {
1551                get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1552            },
1553        )));
1554
1555        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1556    }
1557
1558    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1559    ///
1560    /// # Errors
1561    ///
1562    /// Returns an error if input parameters are invalid.
1563    fn request_book_snapshot(
1564        &mut self,
1565        instrument_id: InstrumentId,
1566        depth: Option<NonZeroUsize>,
1567        client_id: Option<ClientId>,
1568        params: Option<IndexMap<String, String>>,
1569    ) -> anyhow::Result<UUID4>
1570    where
1571        Self: 'static + Debug + Sized,
1572    {
1573        let actor_id = self.actor_id().inner();
1574        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1575            move |resp: &BookResponse| {
1576                get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1577            },
1578        )));
1579
1580        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1581    }
1582
1583    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1584    ///
1585    /// # Errors
1586    ///
1587    /// Returns an error if input parameters are invalid.
1588    fn request_quotes(
1589        &mut self,
1590        instrument_id: InstrumentId,
1591        start: Option<DateTime<Utc>>,
1592        end: Option<DateTime<Utc>>,
1593        limit: Option<NonZeroUsize>,
1594        client_id: Option<ClientId>,
1595        params: Option<IndexMap<String, String>>,
1596    ) -> anyhow::Result<UUID4>
1597    where
1598        Self: 'static + Debug + Sized,
1599    {
1600        let actor_id = self.actor_id().inner();
1601        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1602            move |resp: &QuotesResponse| {
1603                get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1604            },
1605        )));
1606
1607        DataActorCore::request_quotes(
1608            self,
1609            instrument_id,
1610            start,
1611            end,
1612            limit,
1613            client_id,
1614            params,
1615            handler,
1616        )
1617    }
1618
1619    /// Request historical [`TradeTick`] data for the given `instrument_id`.
1620    ///
1621    /// # Errors
1622    ///
1623    /// Returns an error if input parameters are invalid.
1624    fn request_trades(
1625        &mut self,
1626        instrument_id: InstrumentId,
1627        start: Option<DateTime<Utc>>,
1628        end: Option<DateTime<Utc>>,
1629        limit: Option<NonZeroUsize>,
1630        client_id: Option<ClientId>,
1631        params: Option<IndexMap<String, String>>,
1632    ) -> anyhow::Result<UUID4>
1633    where
1634        Self: 'static + Debug + Sized,
1635    {
1636        let actor_id = self.actor_id().inner();
1637        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1638            move |resp: &TradesResponse| {
1639                get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1640            },
1641        )));
1642
1643        DataActorCore::request_trades(
1644            self,
1645            instrument_id,
1646            start,
1647            end,
1648            limit,
1649            client_id,
1650            params,
1651            handler,
1652        )
1653    }
1654
1655    /// Request historical [`Bar`] data for the given `bar_type`.
1656    ///
1657    /// # Errors
1658    ///
1659    /// Returns an error if input parameters are invalid.
1660    fn request_bars(
1661        &mut self,
1662        bar_type: BarType,
1663        start: Option<DateTime<Utc>>,
1664        end: Option<DateTime<Utc>>,
1665        limit: Option<NonZeroUsize>,
1666        client_id: Option<ClientId>,
1667        params: Option<IndexMap<String, String>>,
1668    ) -> anyhow::Result<UUID4>
1669    where
1670        Self: 'static + Debug + Sized,
1671    {
1672        let actor_id = self.actor_id().inner();
1673        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1674            move |resp: &BarsResponse| {
1675                get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1676            },
1677        )));
1678
1679        DataActorCore::request_bars(
1680            self, bar_type, start, end, limit, client_id, params, handler,
1681        )
1682    }
1683}
1684
1685// Blanket implementation: any DataActor automatically implements Actor
1686impl<T> Actor for T
1687where
1688    T: DataActor + Debug + 'static,
1689{
1690    fn id(&self) -> Ustr {
1691        self.actor_id.inner()
1692    }
1693
1694    #[allow(unused_variables)]
1695    fn handle(&mut self, msg: &dyn Any) {
1696        // Default empty implementation - concrete actors can override if needed
1697    }
1698
1699    fn as_any(&self) -> &dyn Any {
1700        self
1701    }
1702}
1703
1704// Blanket implementation: any DataActor automatically implements Component
1705impl<T> Component for T
1706where
1707    T: DataActor + Debug + 'static,
1708{
1709    fn component_id(&self) -> ComponentId {
1710        ComponentId::new(self.actor_id.inner().as_str())
1711    }
1712
1713    fn state(&self) -> ComponentState {
1714        self.state
1715    }
1716
1717    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1718        self.state = self.state.transition(&trigger)?;
1719        log::info!("{}", self.state.variant_name());
1720        Ok(())
1721    }
1722
1723    fn register(
1724        &mut self,
1725        trader_id: TraderId,
1726        clock: Rc<RefCell<dyn Clock>>,
1727        cache: Rc<RefCell<Cache>>,
1728    ) -> anyhow::Result<()> {
1729        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1730
1731        // Register default time event handler for this actor
1732        let actor_id = self.actor_id().inner();
1733        let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1734            if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1735                actor.handle_time_event(&event);
1736            } else {
1737                log::error!("Actor {actor_id} not found for time event handling");
1738            }
1739        }));
1740
1741        clock.borrow_mut().register_default_handler(callback);
1742
1743        self.initialize()
1744    }
1745
1746    fn on_start(&mut self) -> anyhow::Result<()> {
1747        DataActor::on_start(self)
1748    }
1749
1750    fn on_stop(&mut self) -> anyhow::Result<()> {
1751        DataActor::on_stop(self)
1752    }
1753
1754    fn on_resume(&mut self) -> anyhow::Result<()> {
1755        DataActor::on_resume(self)
1756    }
1757
1758    fn on_degrade(&mut self) -> anyhow::Result<()> {
1759        DataActor::on_degrade(self)
1760    }
1761
1762    fn on_fault(&mut self) -> anyhow::Result<()> {
1763        DataActor::on_fault(self)
1764    }
1765
1766    fn on_reset(&mut self) -> anyhow::Result<()> {
1767        DataActor::on_reset(self)
1768    }
1769
1770    fn on_dispose(&mut self) -> anyhow::Result<()> {
1771        DataActor::on_dispose(self)
1772    }
1773}
1774
1775/// Core functionality for all actors.
1776#[allow(dead_code)] // TODO: Under development (pending_requests, signal_classes)
1777pub struct DataActorCore {
1778    /// The actor identifier.
1779    pub actor_id: ActorId,
1780    /// The actors configuration.
1781    pub config: DataActorConfig,
1782    trader_id: Option<TraderId>,
1783    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
1784    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
1785    state: ComponentState,
1786    warning_events: AHashSet<String>, // TODO: TBD
1787    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
1788    signal_classes: AHashMap<String, String>,
1789    #[cfg(feature = "indicators")]
1790    indicators: Indicators,
1791    topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
1792}
1793
1794impl Debug for DataActorCore {
1795    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1796        f.debug_struct(stringify!(DataActorCore))
1797            .field("actor_id", &self.actor_id)
1798            .field("config", &self.config)
1799            .field("state", &self.state)
1800            .field("trader_id", &self.trader_id)
1801            .finish()
1802    }
1803}
1804
1805impl DataActorCore {
1806    /// Adds a subscription handler for the `topic`.
1807    ///
1808    //// Logs a warning if the actor is already subscribed to the topic.
1809    fn add_subscription(&mut self, topic: MStr<Topic>, handler: ShareableMessageHandler) {
1810        if self.topic_handlers.contains_key(&topic) {
1811            log::warn!(
1812                "Actor {} attempted duplicate subscription to topic '{topic}'",
1813                self.actor_id,
1814            );
1815            return;
1816        }
1817
1818        self.topic_handlers.insert(topic, handler.clone());
1819        msgbus::subscribe_topic(topic, handler, None);
1820    }
1821
1822    /// Removes a subscription handler for the `topic` if present.
1823    ///
1824    /// Logs a warning if the actor is not currently subscribed to the topic.
1825    fn remove_subscription(&mut self, topic: MStr<Topic>) {
1826        if let Some(handler) = self.topic_handlers.remove(&topic) {
1827            msgbus::unsubscribe_topic(topic, handler.clone());
1828        } else {
1829            log::warn!(
1830                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
1831                self.actor_id,
1832            );
1833        }
1834    }
1835
1836    /// Creates a new [`DataActorCore`] instance.
1837    pub fn new(config: DataActorConfig) -> Self {
1838        let actor_id = config
1839            .actor_id
1840            .unwrap_or_else(|| Self::default_actor_id(&config));
1841
1842        Self {
1843            actor_id,
1844            config,
1845            trader_id: None, // None until registered
1846            clock: None,     // None until registered
1847            cache: None,     // None until registered
1848            state: ComponentState::default(),
1849            warning_events: AHashSet::new(),
1850            pending_requests: AHashMap::new(),
1851            signal_classes: AHashMap::new(),
1852            #[cfg(feature = "indicators")]
1853            indicators: Indicators::default(),
1854            topic_handlers: AHashMap::new(),
1855        }
1856    }
1857
1858    /// Returns the memory address of this instance as a hexadecimal string.
1859    #[must_use]
1860    pub fn mem_address(&self) -> String {
1861        format!("{self:p}")
1862    }
1863
1864    /// Returns the actors state.
1865    pub fn state(&self) -> ComponentState {
1866        self.state
1867    }
1868
1869    /// Returns the trader ID this actor is registered to.
1870    pub fn trader_id(&self) -> Option<TraderId> {
1871        self.trader_id
1872    }
1873
1874    /// Returns the actors ID.
1875    pub fn actor_id(&self) -> ActorId {
1876        self.actor_id
1877    }
1878
1879    fn default_actor_id(config: &DataActorConfig) -> ActorId {
1880        let memory_address = std::ptr::from_ref(config) as *const _ as usize;
1881        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
1882    }
1883
1884    /// Returns a UNIX nanoseconds timestamp from the actors internal clock.
1885    pub fn timestamp_ns(&self) -> UnixNanos {
1886        self.clock_ref().timestamp_ns()
1887    }
1888
1889    /// Returns the clock for the actor (if registered).
1890    ///
1891    /// # Panics
1892    ///
1893    /// Panics if the actor has not been registered with a trader.
1894    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
1895        self.clock
1896            .as_ref()
1897            .unwrap_or_else(|| {
1898                panic!(
1899                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
1900                    self.actor_id, self.trader_id
1901                )
1902            })
1903            .borrow_mut()
1904    }
1905
1906    /// Returns a clone of the reference-counted clock.
1907    ///
1908    /// # Panics
1909    ///
1910    /// Panics if the actor has not yet been registered (clock is `None`).
1911    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
1912        self.clock
1913            .as_ref()
1914            .expect("DataActor must be registered before accessing clock")
1915            .clone()
1916    }
1917
1918    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
1919        self.clock
1920            .as_ref()
1921            .unwrap_or_else(|| {
1922                panic!(
1923                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
1924                    self.actor_id, self.trader_id
1925                )
1926            })
1927            .borrow()
1928    }
1929
1930    // -- REGISTRATION ----------------------------------------------------------------------------
1931
1932    /// Register the data actor with a trader.
1933    ///
1934    /// # Errors
1935    ///
1936    /// Returns an error if the actor has already been registered with a trader
1937    /// or if the provided dependencies are invalid.
1938    pub fn register(
1939        &mut self,
1940        trader_id: TraderId,
1941        clock: Rc<RefCell<dyn Clock>>,
1942        cache: Rc<RefCell<Cache>>,
1943    ) -> anyhow::Result<()> {
1944        if let Some(existing_trader_id) = self.trader_id {
1945            anyhow::bail!(
1946                "DataActor {} already registered with trader {existing_trader_id}",
1947                self.actor_id
1948            );
1949        }
1950
1951        // Validate clock by attempting to access it
1952        {
1953            let _timestamp = clock.borrow().timestamp_ns();
1954        }
1955
1956        // Validate cache by attempting to access it
1957        {
1958            let _cache_borrow = cache.borrow();
1959        }
1960
1961        self.trader_id = Some(trader_id);
1962        self.clock = Some(clock);
1963        self.cache = Some(cache);
1964
1965        // Verify complete registration
1966        if !self.is_properly_registered() {
1967            anyhow::bail!(
1968                "DataActor {} registration incomplete - validation failed",
1969                self.actor_id
1970            );
1971        }
1972
1973        log::info!("Registered {} with trader {trader_id}", self.actor_id);
1974        Ok(())
1975    }
1976
1977    /// Register an event type for warning log levels.
1978    pub fn register_warning_event(&mut self, event_type: &str) {
1979        self.warning_events.insert(event_type.to_string());
1980        log::debug!("Registered event type '{event_type}' for warning logs")
1981    }
1982
1983    /// Deregister an event type from warning log levels.
1984    pub fn deregister_warning_event(&mut self, event_type: &str) {
1985        self.warning_events.remove(event_type);
1986        log::debug!("Deregistered event type '{event_type}' from warning logs")
1987    }
1988
1989    pub fn is_registered(&self) -> bool {
1990        self.trader_id.is_some()
1991    }
1992
1993    fn check_registered(&self) {
1994        assert!(
1995            self.is_registered(),
1996            "Actor has not been registered with a Trader"
1997        );
1998    }
1999
2000    /// Validates registration state without panicking.
2001    fn is_properly_registered(&self) -> bool {
2002        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2003    }
2004
2005    fn send_data_cmd(&self, command: DataCommand) {
2006        if self.config.log_commands {
2007            log::info!("{CMD}{SEND} {command:?}");
2008        }
2009
2010        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2011        msgbus::send_any(endpoint, command.as_any())
2012    }
2013
2014    #[allow(dead_code)] // TODO: Under development
2015    fn send_data_req(&self, request: RequestCommand) {
2016        if self.config.log_commands {
2017            log::info!("{REQ}{SEND} {request:?}");
2018        }
2019
2020        // For now, simplified approach - data requests without dynamic handlers
2021        // TODO: Implement proper dynamic dispatch for response handlers
2022        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2023        msgbus::send_any(endpoint, request.as_any())
2024    }
2025
2026    /// Sends a shutdown command to the system with an optional reason.
2027    ///
2028    /// # Panics
2029    ///
2030    /// Panics if the actor is not registered or has no trader ID.
2031    pub fn shutdown_system(&self, reason: Option<String>) {
2032        self.check_registered();
2033
2034        // SAFETY: Checked registered before unwrapping trader ID
2035        let command = ShutdownSystem::new(
2036            self.trader_id().unwrap(),
2037            self.actor_id.inner(),
2038            reason,
2039            UUID4::new(),
2040            self.timestamp_ns(),
2041        );
2042
2043        let endpoint = "command.system.shutdown".into();
2044        msgbus::send_any(endpoint, command.as_any());
2045    }
2046
2047    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
2048
2049    /// Helper method for registering data subscriptions from the trait.
2050    ///
2051    /// # Panics
2052    ///
2053    /// Panics if the actor is not properly registered.
2054    pub fn subscribe_data(
2055        &mut self,
2056        handler: ShareableMessageHandler,
2057        data_type: DataType,
2058        client_id: Option<ClientId>,
2059        params: Option<IndexMap<String, String>>,
2060    ) {
2061        if !self.is_properly_registered() {
2062            panic!(
2063                "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2064                self.actor_id,
2065                self.trader_id,
2066                self.clock.is_some(),
2067                self.cache.is_some()
2068            );
2069        }
2070
2071        let topic = get_custom_topic(&data_type);
2072        self.add_subscription(topic, handler);
2073
2074        // If no client ID specified, just subscribe to the topic
2075        if client_id.is_none() {
2076            return;
2077        }
2078
2079        let command = SubscribeCommand::Data(SubscribeCustomData {
2080            data_type,
2081            client_id,
2082            venue: None,
2083            command_id: UUID4::new(),
2084            ts_init: self.timestamp_ns(),
2085            params,
2086        });
2087
2088        self.send_data_cmd(DataCommand::Subscribe(command));
2089    }
2090
2091    /// Helper method for registering quotes subscriptions from the trait.
2092    pub fn subscribe_quotes(
2093        &mut self,
2094        topic: MStr<Topic>,
2095        handler: ShareableMessageHandler,
2096        instrument_id: InstrumentId,
2097        client_id: Option<ClientId>,
2098        params: Option<IndexMap<String, String>>,
2099    ) {
2100        self.check_registered();
2101
2102        self.add_subscription(topic, handler);
2103
2104        let command = SubscribeCommand::Quotes(SubscribeQuotes {
2105            instrument_id,
2106            client_id,
2107            venue: Some(instrument_id.venue),
2108            command_id: UUID4::new(),
2109            ts_init: self.timestamp_ns(),
2110            params,
2111        });
2112
2113        self.send_data_cmd(DataCommand::Subscribe(command));
2114    }
2115
2116    /// Helper method for registering instruments subscriptions from the trait.
2117    pub fn subscribe_instruments(
2118        &mut self,
2119        topic: MStr<Topic>,
2120        handler: ShareableMessageHandler,
2121        venue: Venue,
2122        client_id: Option<ClientId>,
2123        params: Option<IndexMap<String, String>>,
2124    ) {
2125        self.check_registered();
2126
2127        self.add_subscription(topic, handler);
2128
2129        let command = SubscribeCommand::Instruments(SubscribeInstruments {
2130            client_id,
2131            venue,
2132            command_id: UUID4::new(),
2133            ts_init: self.timestamp_ns(),
2134            params,
2135        });
2136
2137        self.send_data_cmd(DataCommand::Subscribe(command));
2138    }
2139
2140    /// Helper method for registering instrument subscriptions from the trait.
2141    pub fn subscribe_instrument(
2142        &mut self,
2143        topic: MStr<Topic>,
2144        handler: ShareableMessageHandler,
2145        instrument_id: InstrumentId,
2146        client_id: Option<ClientId>,
2147        params: Option<IndexMap<String, String>>,
2148    ) {
2149        self.check_registered();
2150
2151        self.add_subscription(topic, handler);
2152
2153        let command = SubscribeCommand::Instrument(SubscribeInstrument {
2154            instrument_id,
2155            client_id,
2156            venue: Some(instrument_id.venue),
2157            command_id: UUID4::new(),
2158            ts_init: self.timestamp_ns(),
2159            params,
2160        });
2161
2162        self.send_data_cmd(DataCommand::Subscribe(command));
2163    }
2164
2165    /// Helper method for registering book deltas subscriptions from the trait.
2166    #[allow(clippy::too_many_arguments)]
2167    pub fn subscribe_book_deltas(
2168        &mut self,
2169        topic: MStr<Topic>,
2170        handler: ShareableMessageHandler,
2171        instrument_id: InstrumentId,
2172        book_type: BookType,
2173        depth: Option<NonZeroUsize>,
2174        client_id: Option<ClientId>,
2175        managed: bool,
2176        params: Option<IndexMap<String, String>>,
2177    ) {
2178        self.check_registered();
2179
2180        self.add_subscription(topic, handler);
2181
2182        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2183            instrument_id,
2184            book_type,
2185            client_id,
2186            venue: Some(instrument_id.venue),
2187            command_id: UUID4::new(),
2188            ts_init: self.timestamp_ns(),
2189            depth,
2190            managed,
2191            params,
2192        });
2193
2194        self.send_data_cmd(DataCommand::Subscribe(command));
2195    }
2196
2197    /// Helper method for registering book snapshots subscriptions from the trait.
2198    #[allow(clippy::too_many_arguments)]
2199    pub fn subscribe_book_at_interval(
2200        &mut self,
2201        topic: MStr<Topic>,
2202        handler: ShareableMessageHandler,
2203        instrument_id: InstrumentId,
2204        book_type: BookType,
2205        depth: Option<NonZeroUsize>,
2206        interval_ms: NonZeroUsize,
2207        client_id: Option<ClientId>,
2208        params: Option<IndexMap<String, String>>,
2209    ) {
2210        self.check_registered();
2211
2212        self.add_subscription(topic, handler);
2213
2214        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2215            instrument_id,
2216            book_type,
2217            client_id,
2218            venue: Some(instrument_id.venue),
2219            command_id: UUID4::new(),
2220            ts_init: self.timestamp_ns(),
2221            depth,
2222            interval_ms,
2223            params,
2224        });
2225
2226        self.send_data_cmd(DataCommand::Subscribe(command));
2227    }
2228
2229    /// Helper method for registering trades subscriptions from the trait.
2230    pub fn subscribe_trades(
2231        &mut self,
2232        topic: MStr<Topic>,
2233        handler: ShareableMessageHandler,
2234        instrument_id: InstrumentId,
2235        client_id: Option<ClientId>,
2236        params: Option<IndexMap<String, String>>,
2237    ) {
2238        self.check_registered();
2239
2240        self.add_subscription(topic, handler);
2241
2242        let command = SubscribeCommand::Trades(SubscribeTrades {
2243            instrument_id,
2244            client_id,
2245            venue: Some(instrument_id.venue),
2246            command_id: UUID4::new(),
2247            ts_init: self.timestamp_ns(),
2248            params,
2249        });
2250
2251        self.send_data_cmd(DataCommand::Subscribe(command));
2252    }
2253
2254    /// Helper method for registering bars subscriptions from the trait.
2255    pub fn subscribe_bars(
2256        &mut self,
2257        topic: MStr<Topic>,
2258        handler: ShareableMessageHandler,
2259        bar_type: BarType,
2260        client_id: Option<ClientId>,
2261        await_partial: bool,
2262        params: Option<IndexMap<String, String>>,
2263    ) {
2264        self.check_registered();
2265
2266        self.add_subscription(topic, handler);
2267
2268        let command = SubscribeCommand::Bars(SubscribeBars {
2269            bar_type,
2270            client_id,
2271            venue: Some(bar_type.instrument_id().venue),
2272            command_id: UUID4::new(),
2273            ts_init: self.timestamp_ns(),
2274            await_partial,
2275            params,
2276        });
2277
2278        self.send_data_cmd(DataCommand::Subscribe(command));
2279    }
2280
2281    /// Helper method for registering mark prices subscriptions from the trait.
2282    pub fn subscribe_mark_prices(
2283        &mut self,
2284        topic: MStr<Topic>,
2285        handler: ShareableMessageHandler,
2286        instrument_id: InstrumentId,
2287        client_id: Option<ClientId>,
2288        params: Option<IndexMap<String, String>>,
2289    ) {
2290        self.check_registered();
2291
2292        self.add_subscription(topic, handler);
2293
2294        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2295            instrument_id,
2296            client_id,
2297            venue: Some(instrument_id.venue),
2298            command_id: UUID4::new(),
2299            ts_init: self.timestamp_ns(),
2300            params,
2301        });
2302
2303        self.send_data_cmd(DataCommand::Subscribe(command));
2304    }
2305
2306    /// Helper method for registering index prices subscriptions from the trait.
2307    pub fn subscribe_index_prices(
2308        &mut self,
2309        topic: MStr<Topic>,
2310        handler: ShareableMessageHandler,
2311        instrument_id: InstrumentId,
2312        client_id: Option<ClientId>,
2313        params: Option<IndexMap<String, String>>,
2314    ) {
2315        self.check_registered();
2316
2317        self.add_subscription(topic, handler);
2318
2319        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2320            instrument_id,
2321            client_id,
2322            venue: Some(instrument_id.venue),
2323            command_id: UUID4::new(),
2324            ts_init: self.timestamp_ns(),
2325            params,
2326        });
2327
2328        self.send_data_cmd(DataCommand::Subscribe(command));
2329    }
2330
2331    /// Helper method for registering funding rates subscriptions from the trait.
2332    pub fn subscribe_funding_rates(
2333        &mut self,
2334        topic: MStr<Topic>,
2335        handler: ShareableMessageHandler,
2336        instrument_id: InstrumentId,
2337        client_id: Option<ClientId>,
2338        params: Option<IndexMap<String, String>>,
2339    ) {
2340        self.check_registered();
2341
2342        self.add_subscription(topic, handler);
2343
2344        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
2345            instrument_id,
2346            client_id,
2347            venue: Some(instrument_id.venue),
2348            command_id: UUID4::new(),
2349            ts_init: self.timestamp_ns(),
2350            params,
2351        });
2352
2353        self.send_data_cmd(DataCommand::Subscribe(command));
2354    }
2355
2356    /// Helper method for registering instrument status subscriptions from the trait.
2357    pub fn subscribe_instrument_status(
2358        &mut self,
2359        topic: MStr<Topic>,
2360        handler: ShareableMessageHandler,
2361        instrument_id: InstrumentId,
2362        client_id: Option<ClientId>,
2363        params: Option<IndexMap<String, String>>,
2364    ) {
2365        self.check_registered();
2366
2367        self.add_subscription(topic, handler);
2368
2369        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2370            instrument_id,
2371            client_id,
2372            venue: Some(instrument_id.venue),
2373            command_id: UUID4::new(),
2374            ts_init: self.timestamp_ns(),
2375            params,
2376        });
2377
2378        self.send_data_cmd(DataCommand::Subscribe(command));
2379    }
2380
2381    /// Helper method for registering instrument close subscriptions from the trait.
2382    pub fn subscribe_instrument_close(
2383        &mut self,
2384        topic: MStr<Topic>,
2385        handler: ShareableMessageHandler,
2386        instrument_id: InstrumentId,
2387        client_id: Option<ClientId>,
2388        params: Option<IndexMap<String, String>>,
2389    ) {
2390        self.check_registered();
2391
2392        self.add_subscription(topic, handler);
2393
2394        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2395            instrument_id,
2396            client_id,
2397            venue: Some(instrument_id.venue),
2398            command_id: UUID4::new(),
2399            ts_init: self.timestamp_ns(),
2400            params,
2401        });
2402
2403        self.send_data_cmd(DataCommand::Subscribe(command));
2404    }
2405
2406    #[cfg(feature = "defi")]
2407    /// Helper method for registering block subscriptions from the trait.
2408    pub fn subscribe_blocks(
2409        &mut self,
2410        topic: MStr<Topic>,
2411        handler: ShareableMessageHandler,
2412        chain: Blockchain,
2413        client_id: Option<ClientId>,
2414        params: Option<IndexMap<String, String>>,
2415    ) {
2416        use crate::messages::defi::{DefiSubscribeCommand, SubscribeBlocks};
2417
2418        self.check_registered();
2419
2420        self.add_subscription(topic, handler);
2421
2422        let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
2423            chain,
2424            client_id,
2425            command_id: UUID4::new(),
2426            ts_init: self.timestamp_ns(),
2427            params,
2428        });
2429
2430        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2431    }
2432
2433    #[cfg(feature = "defi")]
2434    /// Helper method for registering pool subscriptions from the trait.
2435    pub fn subscribe_pool(
2436        &mut self,
2437        topic: MStr<Topic>,
2438        handler: ShareableMessageHandler,
2439        instrument_id: InstrumentId,
2440        client_id: Option<ClientId>,
2441        params: Option<IndexMap<String, String>>,
2442    ) {
2443        use crate::messages::defi::{DefiSubscribeCommand, SubscribePool};
2444
2445        self.check_registered();
2446
2447        self.add_subscription(topic, handler);
2448
2449        let command = DefiSubscribeCommand::Pool(SubscribePool {
2450            instrument_id,
2451            client_id,
2452            command_id: UUID4::new(),
2453            ts_init: self.timestamp_ns(),
2454            params,
2455        });
2456
2457        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2458    }
2459
2460    #[cfg(feature = "defi")]
2461    /// Helper method for registering pool swap subscriptions from the trait.
2462    pub fn subscribe_pool_swaps(
2463        &mut self,
2464        topic: MStr<Topic>,
2465        handler: ShareableMessageHandler,
2466        instrument_id: InstrumentId,
2467        client_id: Option<ClientId>,
2468        params: Option<IndexMap<String, String>>,
2469    ) {
2470        use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolSwaps};
2471
2472        self.check_registered();
2473
2474        self.add_subscription(topic, handler);
2475
2476        let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
2477            instrument_id,
2478            client_id,
2479            command_id: UUID4::new(),
2480            ts_init: self.timestamp_ns(),
2481            params,
2482        });
2483
2484        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2485    }
2486
2487    #[cfg(feature = "defi")]
2488    /// Helper method for registering pool liquidity update subscriptions from the trait.
2489    pub fn subscribe_pool_liquidity_updates(
2490        &mut self,
2491        topic: MStr<Topic>,
2492        handler: ShareableMessageHandler,
2493        instrument_id: InstrumentId,
2494        client_id: Option<ClientId>,
2495        params: Option<IndexMap<String, String>>,
2496    ) {
2497        use crate::messages::defi::{DefiSubscribeCommand, SubscribePoolLiquidityUpdates};
2498
2499        self.check_registered();
2500
2501        self.add_subscription(topic, handler);
2502
2503        let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
2504            instrument_id,
2505            client_id,
2506            command_id: UUID4::new(),
2507            ts_init: self.timestamp_ns(),
2508            params,
2509        });
2510
2511        self.send_data_cmd(DataCommand::DefiSubscribe(command));
2512    }
2513
2514    /// Helper method for unsubscribing from data.
2515    pub fn unsubscribe_data(
2516        &mut self,
2517        data_type: DataType,
2518        client_id: Option<ClientId>,
2519        params: Option<IndexMap<String, String>>,
2520    ) {
2521        self.check_registered();
2522
2523        let topic = get_custom_topic(&data_type);
2524        self.remove_subscription(topic);
2525
2526        if client_id.is_none() {
2527            return;
2528        }
2529
2530        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2531            data_type,
2532            client_id,
2533            venue: None,
2534            command_id: UUID4::new(),
2535            ts_init: self.timestamp_ns(),
2536            params,
2537        });
2538
2539        self.send_data_cmd(DataCommand::Unsubscribe(command));
2540    }
2541
2542    /// Helper method for unsubscribing from instruments.
2543    pub fn unsubscribe_instruments(
2544        &mut self,
2545        venue: Venue,
2546        client_id: Option<ClientId>,
2547        params: Option<IndexMap<String, String>>,
2548    ) {
2549        self.check_registered();
2550
2551        let topic = get_instruments_topic(venue);
2552        self.remove_subscription(topic);
2553
2554        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2555            client_id,
2556            venue,
2557            command_id: UUID4::new(),
2558            ts_init: self.timestamp_ns(),
2559            params,
2560        });
2561
2562        self.send_data_cmd(DataCommand::Unsubscribe(command));
2563    }
2564
2565    /// Helper method for unsubscribing from instrument.
2566    pub fn unsubscribe_instrument(
2567        &mut self,
2568        instrument_id: InstrumentId,
2569        client_id: Option<ClientId>,
2570        params: Option<IndexMap<String, String>>,
2571    ) {
2572        self.check_registered();
2573
2574        let topic = get_instrument_topic(instrument_id);
2575        self.remove_subscription(topic);
2576
2577        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2578            instrument_id,
2579            client_id,
2580            venue: Some(instrument_id.venue),
2581            command_id: UUID4::new(),
2582            ts_init: self.timestamp_ns(),
2583            params,
2584        });
2585
2586        self.send_data_cmd(DataCommand::Unsubscribe(command));
2587    }
2588
2589    /// Helper method for unsubscribing from book deltas.
2590    pub fn unsubscribe_book_deltas(
2591        &mut self,
2592        instrument_id: InstrumentId,
2593        client_id: Option<ClientId>,
2594        params: Option<IndexMap<String, String>>,
2595    ) {
2596        self.check_registered();
2597
2598        let topic = get_book_deltas_topic(instrument_id);
2599        self.remove_subscription(topic);
2600
2601        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2602            instrument_id,
2603            client_id,
2604            venue: Some(instrument_id.venue),
2605            command_id: UUID4::new(),
2606            ts_init: self.timestamp_ns(),
2607            params,
2608        });
2609
2610        self.send_data_cmd(DataCommand::Unsubscribe(command));
2611    }
2612
2613    /// Helper method for unsubscribing from book snapshots at interval.
2614    pub fn unsubscribe_book_at_interval(
2615        &mut self,
2616        instrument_id: InstrumentId,
2617        interval_ms: NonZeroUsize,
2618        client_id: Option<ClientId>,
2619        params: Option<IndexMap<String, String>>,
2620    ) {
2621        self.check_registered();
2622
2623        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
2624        self.remove_subscription(topic);
2625
2626        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2627            instrument_id,
2628            client_id,
2629            venue: Some(instrument_id.venue),
2630            command_id: UUID4::new(),
2631            ts_init: self.timestamp_ns(),
2632            params,
2633        });
2634
2635        self.send_data_cmd(DataCommand::Unsubscribe(command));
2636    }
2637
2638    /// Helper method for unsubscribing from quotes.
2639    pub fn unsubscribe_quotes(
2640        &mut self,
2641        instrument_id: InstrumentId,
2642        client_id: Option<ClientId>,
2643        params: Option<IndexMap<String, String>>,
2644    ) {
2645        self.check_registered();
2646
2647        let topic = get_quotes_topic(instrument_id);
2648        self.remove_subscription(topic);
2649
2650        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2651            instrument_id,
2652            client_id,
2653            venue: Some(instrument_id.venue),
2654            command_id: UUID4::new(),
2655            ts_init: self.timestamp_ns(),
2656            params,
2657        });
2658
2659        self.send_data_cmd(DataCommand::Unsubscribe(command));
2660    }
2661
2662    /// Helper method for unsubscribing from trades.
2663    pub fn unsubscribe_trades(
2664        &mut self,
2665        instrument_id: InstrumentId,
2666        client_id: Option<ClientId>,
2667        params: Option<IndexMap<String, String>>,
2668    ) {
2669        self.check_registered();
2670
2671        let topic = get_trades_topic(instrument_id);
2672        self.remove_subscription(topic);
2673
2674        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2675            instrument_id,
2676            client_id,
2677            venue: Some(instrument_id.venue),
2678            command_id: UUID4::new(),
2679            ts_init: self.timestamp_ns(),
2680            params,
2681        });
2682
2683        self.send_data_cmd(DataCommand::Unsubscribe(command));
2684    }
2685
2686    /// Helper method for unsubscribing from bars.
2687    pub fn unsubscribe_bars(
2688        &mut self,
2689        bar_type: BarType,
2690        client_id: Option<ClientId>,
2691        params: Option<IndexMap<String, String>>,
2692    ) {
2693        self.check_registered();
2694
2695        let topic = get_bars_topic(bar_type);
2696        self.remove_subscription(topic);
2697
2698        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2699            bar_type,
2700            client_id,
2701            venue: Some(bar_type.instrument_id().venue),
2702            command_id: UUID4::new(),
2703            ts_init: self.timestamp_ns(),
2704            params,
2705        });
2706
2707        self.send_data_cmd(DataCommand::Unsubscribe(command));
2708    }
2709
2710    /// Helper method for unsubscribing from mark prices.
2711    pub fn unsubscribe_mark_prices(
2712        &mut self,
2713        instrument_id: InstrumentId,
2714        client_id: Option<ClientId>,
2715        params: Option<IndexMap<String, String>>,
2716    ) {
2717        self.check_registered();
2718
2719        let topic = get_mark_price_topic(instrument_id);
2720        self.remove_subscription(topic);
2721
2722        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2723            instrument_id,
2724            client_id,
2725            venue: Some(instrument_id.venue),
2726            command_id: UUID4::new(),
2727            ts_init: self.timestamp_ns(),
2728            params,
2729        });
2730
2731        self.send_data_cmd(DataCommand::Unsubscribe(command));
2732    }
2733
2734    /// Helper method for unsubscribing from index prices.
2735    pub fn unsubscribe_index_prices(
2736        &mut self,
2737        instrument_id: InstrumentId,
2738        client_id: Option<ClientId>,
2739        params: Option<IndexMap<String, String>>,
2740    ) {
2741        self.check_registered();
2742
2743        let topic = get_index_price_topic(instrument_id);
2744        self.remove_subscription(topic);
2745
2746        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2747            instrument_id,
2748            client_id,
2749            venue: Some(instrument_id.venue),
2750            command_id: UUID4::new(),
2751            ts_init: self.timestamp_ns(),
2752            params,
2753        });
2754
2755        self.send_data_cmd(DataCommand::Unsubscribe(command));
2756    }
2757
2758    /// Helper method for unsubscribing from funding rates.
2759    pub fn unsubscribe_funding_rates(
2760        &mut self,
2761        instrument_id: InstrumentId,
2762        client_id: Option<ClientId>,
2763        params: Option<IndexMap<String, String>>,
2764    ) {
2765        self.check_registered();
2766
2767        let topic = get_funding_rate_topic(instrument_id);
2768        self.remove_subscription(topic);
2769
2770        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
2771            instrument_id,
2772            client_id,
2773            venue: Some(instrument_id.venue),
2774            command_id: UUID4::new(),
2775            ts_init: self.timestamp_ns(),
2776            params,
2777        });
2778
2779        self.send_data_cmd(DataCommand::Unsubscribe(command));
2780    }
2781
2782    /// Helper method for unsubscribing from instrument status.
2783    pub fn unsubscribe_instrument_status(
2784        &mut self,
2785        instrument_id: InstrumentId,
2786        client_id: Option<ClientId>,
2787        params: Option<IndexMap<String, String>>,
2788    ) {
2789        self.check_registered();
2790
2791        let topic = get_instrument_status_topic(instrument_id);
2792        self.remove_subscription(topic);
2793
2794        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2795            instrument_id,
2796            client_id,
2797            venue: Some(instrument_id.venue),
2798            command_id: UUID4::new(),
2799            ts_init: self.timestamp_ns(),
2800            params,
2801        });
2802
2803        self.send_data_cmd(DataCommand::Unsubscribe(command));
2804    }
2805
2806    /// Helper method for unsubscribing from instrument close.
2807    pub fn unsubscribe_instrument_close(
2808        &mut self,
2809        instrument_id: InstrumentId,
2810        client_id: Option<ClientId>,
2811        params: Option<IndexMap<String, String>>,
2812    ) {
2813        self.check_registered();
2814
2815        let topic = get_instrument_close_topic(instrument_id);
2816        self.remove_subscription(topic);
2817
2818        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
2819            instrument_id,
2820            client_id,
2821            venue: Some(instrument_id.venue),
2822            command_id: UUID4::new(),
2823            ts_init: self.timestamp_ns(),
2824            params,
2825        });
2826
2827        self.send_data_cmd(DataCommand::Unsubscribe(command));
2828    }
2829
2830    #[cfg(feature = "defi")]
2831    /// Helper method for unsubscribing from blocks.
2832    pub fn unsubscribe_blocks(
2833        &mut self,
2834        chain: Blockchain,
2835        client_id: Option<ClientId>,
2836        params: Option<IndexMap<String, String>>,
2837    ) {
2838        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribeBlocks};
2839
2840        self.check_registered();
2841
2842        let topic = get_defi_blocks_topic(chain);
2843        self.remove_subscription(topic);
2844
2845        let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
2846            chain,
2847            client_id,
2848            command_id: UUID4::new(),
2849            ts_init: self.timestamp_ns(),
2850            params,
2851        });
2852
2853        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2854    }
2855
2856    #[cfg(feature = "defi")]
2857    /// Helper method for unsubscribing from pool definition updates.
2858    pub fn unsubscribe_pool(
2859        &mut self,
2860        instrument_id: InstrumentId,
2861        client_id: Option<ClientId>,
2862        params: Option<IndexMap<String, String>>,
2863    ) {
2864        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePool};
2865
2866        self.check_registered();
2867
2868        let topic = get_defi_pool_topic(instrument_id);
2869        self.remove_subscription(topic);
2870
2871        let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
2872            instrument_id,
2873            client_id,
2874            command_id: UUID4::new(),
2875            ts_init: self.timestamp_ns(),
2876            params,
2877        });
2878
2879        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2880    }
2881
2882    #[cfg(feature = "defi")]
2883    /// Helper method for unsubscribing from pool swaps.
2884    pub fn unsubscribe_pool_swaps(
2885        &mut self,
2886        instrument_id: InstrumentId,
2887        client_id: Option<ClientId>,
2888        params: Option<IndexMap<String, String>>,
2889    ) {
2890        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolSwaps};
2891
2892        self.check_registered();
2893
2894        let topic = get_defi_pool_swaps_topic(instrument_id);
2895        self.remove_subscription(topic);
2896
2897        let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
2898            instrument_id,
2899            client_id,
2900            command_id: UUID4::new(),
2901            ts_init: self.timestamp_ns(),
2902            params,
2903        });
2904
2905        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2906    }
2907
2908    #[cfg(feature = "defi")]
2909    /// Helper method for unsubscribing from pool liquidity updates.
2910    pub fn unsubscribe_pool_liquidity_updates(
2911        &mut self,
2912        instrument_id: InstrumentId,
2913        client_id: Option<ClientId>,
2914        params: Option<IndexMap<String, String>>,
2915    ) {
2916        use crate::messages::defi::{DefiUnsubscribeCommand, UnsubscribePoolLiquidityUpdates};
2917
2918        self.check_registered();
2919
2920        let topic = get_defi_liquidity_topic(instrument_id);
2921        self.remove_subscription(topic);
2922
2923        let command =
2924            DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
2925                instrument_id,
2926                client_id,
2927                command_id: UUID4::new(),
2928                ts_init: self.timestamp_ns(),
2929                params,
2930            });
2931
2932        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
2933    }
2934
2935    /// Helper method for requesting data.
2936    ///
2937    /// # Errors
2938    ///
2939    /// Returns an error if input parameters are invalid.
2940    #[allow(clippy::too_many_arguments)]
2941    pub fn request_data(
2942        &self,
2943        data_type: DataType,
2944        client_id: ClientId,
2945        start: Option<DateTime<Utc>>,
2946        end: Option<DateTime<Utc>>,
2947        limit: Option<NonZeroUsize>,
2948        params: Option<IndexMap<String, String>>,
2949        handler: ShareableMessageHandler,
2950    ) -> anyhow::Result<UUID4> {
2951        self.check_registered();
2952
2953        let now = self.clock_ref().utc_now();
2954        check_timestamps(now, start, end)?;
2955
2956        let request_id = UUID4::new();
2957        let command = RequestCommand::Data(RequestCustomData {
2958            client_id,
2959            data_type,
2960            start,
2961            end,
2962            limit,
2963            request_id,
2964            ts_init: self.timestamp_ns(),
2965            params,
2966        });
2967
2968        get_message_bus()
2969            .borrow_mut()
2970            .register_response_handler(command.request_id(), handler)?;
2971
2972        self.send_data_cmd(DataCommand::Request(command));
2973
2974        Ok(request_id)
2975    }
2976
2977    /// Helper method for requesting instrument.
2978    ///
2979    /// # Errors
2980    ///
2981    /// Returns an error if input parameters are invalid.
2982    pub fn request_instrument(
2983        &self,
2984        instrument_id: InstrumentId,
2985        start: Option<DateTime<Utc>>,
2986        end: Option<DateTime<Utc>>,
2987        client_id: Option<ClientId>,
2988        params: Option<IndexMap<String, String>>,
2989        handler: ShareableMessageHandler,
2990    ) -> anyhow::Result<UUID4> {
2991        self.check_registered();
2992
2993        let now = self.clock_ref().utc_now();
2994        check_timestamps(now, start, end)?;
2995
2996        let request_id = UUID4::new();
2997        let command = RequestCommand::Instrument(RequestInstrument {
2998            instrument_id,
2999            start,
3000            end,
3001            client_id,
3002            request_id,
3003            ts_init: now.into(),
3004            params,
3005        });
3006
3007        get_message_bus()
3008            .borrow_mut()
3009            .register_response_handler(command.request_id(), handler)?;
3010
3011        self.send_data_cmd(DataCommand::Request(command));
3012
3013        Ok(request_id)
3014    }
3015
3016    /// Helper method for requesting instruments.
3017    ///
3018    /// # Errors
3019    ///
3020    /// Returns an error if input parameters are invalid.
3021    pub fn request_instruments(
3022        &self,
3023        venue: Option<Venue>,
3024        start: Option<DateTime<Utc>>,
3025        end: Option<DateTime<Utc>>,
3026        client_id: Option<ClientId>,
3027        params: Option<IndexMap<String, String>>,
3028        handler: ShareableMessageHandler,
3029    ) -> anyhow::Result<UUID4> {
3030        self.check_registered();
3031
3032        let now = self.clock_ref().utc_now();
3033        check_timestamps(now, start, end)?;
3034
3035        let request_id = UUID4::new();
3036        let command = RequestCommand::Instruments(RequestInstruments {
3037            venue,
3038            start,
3039            end,
3040            client_id,
3041            request_id,
3042            ts_init: now.into(),
3043            params,
3044        });
3045
3046        get_message_bus()
3047            .borrow_mut()
3048            .register_response_handler(command.request_id(), handler)?;
3049
3050        self.send_data_cmd(DataCommand::Request(command));
3051
3052        Ok(request_id)
3053    }
3054
3055    /// Helper method for requesting book snapshot.
3056    ///
3057    /// # Errors
3058    ///
3059    /// Returns an error if input parameters are invalid.
3060    pub fn request_book_snapshot(
3061        &self,
3062        instrument_id: InstrumentId,
3063        depth: Option<NonZeroUsize>,
3064        client_id: Option<ClientId>,
3065        params: Option<IndexMap<String, String>>,
3066        handler: ShareableMessageHandler,
3067    ) -> anyhow::Result<UUID4> {
3068        self.check_registered();
3069
3070        let request_id = UUID4::new();
3071        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3072            instrument_id,
3073            depth,
3074            client_id,
3075            request_id,
3076            ts_init: self.timestamp_ns(),
3077            params,
3078        });
3079
3080        get_message_bus()
3081            .borrow_mut()
3082            .register_response_handler(command.request_id(), handler)?;
3083
3084        self.send_data_cmd(DataCommand::Request(command));
3085
3086        Ok(request_id)
3087    }
3088
3089    /// Helper method for requesting quotes.
3090    ///
3091    /// # Errors
3092    ///
3093    /// Returns an error if input parameters are invalid.
3094    #[allow(clippy::too_many_arguments)]
3095    pub fn request_quotes(
3096        &self,
3097        instrument_id: InstrumentId,
3098        start: Option<DateTime<Utc>>,
3099        end: Option<DateTime<Utc>>,
3100        limit: Option<NonZeroUsize>,
3101        client_id: Option<ClientId>,
3102        params: Option<IndexMap<String, String>>,
3103        handler: ShareableMessageHandler,
3104    ) -> anyhow::Result<UUID4> {
3105        self.check_registered();
3106
3107        let now = self.clock_ref().utc_now();
3108        check_timestamps(now, start, end)?;
3109
3110        let request_id = UUID4::new();
3111        let command = RequestCommand::Quotes(RequestQuotes {
3112            instrument_id,
3113            start,
3114            end,
3115            limit,
3116            client_id,
3117            request_id,
3118            ts_init: now.into(),
3119            params,
3120        });
3121
3122        get_message_bus()
3123            .borrow_mut()
3124            .register_response_handler(command.request_id(), handler)?;
3125
3126        self.send_data_cmd(DataCommand::Request(command));
3127
3128        Ok(request_id)
3129    }
3130
3131    /// Helper method for requesting trades.
3132    ///
3133    /// # Errors
3134    ///
3135    /// Returns an error if input parameters are invalid.
3136    #[allow(clippy::too_many_arguments)]
3137    pub fn request_trades(
3138        &self,
3139        instrument_id: InstrumentId,
3140        start: Option<DateTime<Utc>>,
3141        end: Option<DateTime<Utc>>,
3142        limit: Option<NonZeroUsize>,
3143        client_id: Option<ClientId>,
3144        params: Option<IndexMap<String, String>>,
3145        handler: ShareableMessageHandler,
3146    ) -> anyhow::Result<UUID4> {
3147        self.check_registered();
3148
3149        let now = self.clock_ref().utc_now();
3150        check_timestamps(now, start, end)?;
3151
3152        let request_id = UUID4::new();
3153        let command = RequestCommand::Trades(RequestTrades {
3154            instrument_id,
3155            start,
3156            end,
3157            limit,
3158            client_id,
3159            request_id,
3160            ts_init: now.into(),
3161            params,
3162        });
3163
3164        get_message_bus()
3165            .borrow_mut()
3166            .register_response_handler(command.request_id(), handler)?;
3167
3168        self.send_data_cmd(DataCommand::Request(command));
3169
3170        Ok(request_id)
3171    }
3172
3173    /// Helper method for requesting bars.
3174    ///
3175    /// # Errors
3176    ///
3177    /// Returns an error if input parameters are invalid.
3178    #[allow(clippy::too_many_arguments)]
3179    pub fn request_bars(
3180        &self,
3181        bar_type: BarType,
3182        start: Option<DateTime<Utc>>,
3183        end: Option<DateTime<Utc>>,
3184        limit: Option<NonZeroUsize>,
3185        client_id: Option<ClientId>,
3186        params: Option<IndexMap<String, String>>,
3187        handler: ShareableMessageHandler,
3188    ) -> anyhow::Result<UUID4> {
3189        self.check_registered();
3190
3191        let now = self.clock_ref().utc_now();
3192        check_timestamps(now, start, end)?;
3193
3194        let request_id = UUID4::new();
3195        let command = RequestCommand::Bars(RequestBars {
3196            bar_type,
3197            start,
3198            end,
3199            limit,
3200            client_id,
3201            request_id,
3202            ts_init: now.into(),
3203            params,
3204        });
3205
3206        get_message_bus()
3207            .borrow_mut()
3208            .register_response_handler(command.request_id(), handler)?;
3209
3210        self.send_data_cmd(DataCommand::Request(command));
3211
3212        Ok(request_id)
3213    }
3214}
3215
3216fn check_timestamps(
3217    now: DateTime<Utc>,
3218    start: Option<DateTime<Utc>>,
3219    end: Option<DateTime<Utc>>,
3220) -> anyhow::Result<()> {
3221    if let Some(start) = start {
3222        check_predicate_true(start <= now, "start was > now")?
3223    }
3224    if let Some(end) = end {
3225        check_predicate_true(end <= now, "end was > now")?
3226    }
3227
3228    if let (Some(start), Some(end)) = (start, end) {
3229        check_predicate_true(start < end, "start was >= end")?
3230    }
3231
3232    Ok(())
3233}
3234
3235fn log_error(e: &anyhow::Error) {
3236    log::error!("{e}");
3237}
3238
3239fn log_not_running<T>(msg: &T)
3240where
3241    T: Debug,
3242{
3243    // TODO: Potentially temporary for development? drop level at some stage
3244    log::warn!("Received message when not running - skipping {msg:?}");
3245}
3246
3247fn log_received<T>(msg: &T)
3248where
3249    T: Debug,
3250{
3251    log::debug!("{RECV} {msg:?}");
3252}