Skip to main content

nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24    sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39        close::InstrumentClose,
40    },
41    enums::BookType,
42    events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
43    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
44    instruments::InstrumentAny,
45    orderbook::OrderBook,
46};
47use ustr::Ustr;
48
49#[cfg(feature = "indicators")]
50use super::indicators::Indicators;
51use super::{
52    Actor,
53    registry::{get_actor_unchecked, try_get_actor_unchecked},
54};
55#[cfg(feature = "defi")]
56use crate::defi;
57#[cfg(feature = "defi")]
58#[allow(unused_imports)]
59use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
60use crate::{
61    cache::Cache,
62    clock::Clock,
63    component::Component,
64    enums::{ComponentState, ComponentTrigger},
65    logging::{CMD, RECV, REQ, SEND},
66    messages::{
67        data::{
68            BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
69            InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
70            RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
71            RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
72            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
73            SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
74            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
75            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
76            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
77            UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
78            UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
79            UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
80        },
81        system::ShutdownSystem,
82    },
83    msgbus::{
84        self, MStr, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
85        switchboard::{
86            MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
87            get_custom_topic, get_funding_rate_topic, get_index_price_topic,
88            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
89            get_instruments_topic, get_mark_price_topic, get_order_cancels_topic,
90            get_order_fills_topic, get_quotes_topic, get_trades_topic,
91        },
92    },
93    signal::Signal,
94    timer::{TimeEvent, TimeEventCallback},
95};
96
97/// Common configuration for [`DataActor`] based components.
98#[derive(Debug, Clone)]
99#[cfg_attr(
100    feature = "python",
101    pyo3::pyclass(
102        module = "nautilus_trader.core.nautilus_pyo3.common",
103        subclass,
104        from_py_object
105    )
106)]
107pub struct DataActorConfig {
108    /// The custom identifier for the Actor.
109    pub actor_id: Option<ActorId>,
110    /// If events should be logged.
111    pub log_events: bool,
112    /// If commands should be logged.
113    pub log_commands: bool,
114}
115
116impl Default for DataActorConfig {
117    fn default() -> Self {
118        Self {
119            actor_id: None,
120            log_events: true,
121            log_commands: true,
122        }
123    }
124}
125
126/// Configuration for creating actors from importable paths.
127#[derive(Debug, Clone)]
128#[cfg_attr(
129    feature = "python",
130    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
131)]
132pub struct ImportableActorConfig {
133    /// The fully qualified name of the Actor class.
134    pub actor_path: String,
135    /// The fully qualified name of the Actor config class.
136    pub config_path: String,
137    /// The actor configuration as a dictionary.
138    pub config: HashMap<String, serde_json::Value>,
139}
140
141type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
142
143pub trait DataActor:
144    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
145{
146    /// Actions to be performed when the actor state is saved.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if saving the actor state fails.
151    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
152        Ok(IndexMap::new())
153    }
154
155    /// Actions to be performed when the actor state is loaded.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if loading the actor state fails.
160    #[allow(unused_variables)]
161    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
162        Ok(())
163    }
164
165    /// Actions to be performed on start.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if starting the actor fails.
170    fn on_start(&mut self) -> anyhow::Result<()> {
171        log::warn!(
172            "The `on_start` handler was called when not overridden, \
173            it's expected that any actions required when starting the actor \
174            occur here, such as subscribing/requesting data"
175        );
176        Ok(())
177    }
178
179    /// Actions to be performed on stop.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if stopping the actor fails.
184    fn on_stop(&mut self) -> anyhow::Result<()> {
185        log::warn!(
186            "The `on_stop` handler was called when not overridden, \
187            it's expected that any actions required when stopping the actor \
188            occur here, such as unsubscribing from data",
189        );
190        Ok(())
191    }
192
193    /// Actions to be performed on resume.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if resuming the actor fails.
198    fn on_resume(&mut self) -> anyhow::Result<()> {
199        log::warn!(
200            "The `on_resume` handler was called when not overridden, \
201            it's expected that any actions required when resuming the actor \
202            following a stop occur here"
203        );
204        Ok(())
205    }
206
207    /// Actions to be performed on reset.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if resetting the actor fails.
212    fn on_reset(&mut self) -> anyhow::Result<()> {
213        log::warn!(
214            "The `on_reset` handler was called when not overridden, \
215            it's expected that any actions required when resetting the actor \
216            occur here, such as resetting indicators and other state"
217        );
218        Ok(())
219    }
220
221    /// Actions to be performed on dispose.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if disposing the actor fails.
226    fn on_dispose(&mut self) -> anyhow::Result<()> {
227        Ok(())
228    }
229
230    /// Actions to be performed on degrade.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if degrading the actor fails.
235    fn on_degrade(&mut self) -> anyhow::Result<()> {
236        Ok(())
237    }
238
239    /// Actions to be performed on fault.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if faulting the actor fails.
244    fn on_fault(&mut self) -> anyhow::Result<()> {
245        Ok(())
246    }
247
248    /// Actions to be performed when receiving a time event.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if handling the time event fails.
253    #[allow(unused_variables)]
254    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
255        Ok(())
256    }
257
258    /// Actions to be performed when receiving custom data.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if handling the data fails.
263    #[allow(unused_variables)]
264    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
265        Ok(())
266    }
267
268    /// Actions to be performed when receiving a signal.
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if handling the signal fails.
273    #[allow(unused_variables)]
274    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
275        Ok(())
276    }
277
278    /// Actions to be performed when receiving an instrument.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if handling the instrument fails.
283    #[allow(unused_variables)]
284    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
285        Ok(())
286    }
287
288    /// Actions to be performed when receiving order book deltas.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if handling the book deltas fails.
293    #[allow(unused_variables)]
294    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
295        Ok(())
296    }
297
298    /// Actions to be performed when receiving an order book.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if handling the book fails.
303    #[allow(unused_variables)]
304    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
305        Ok(())
306    }
307
308    /// Actions to be performed when receiving a quote.
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if handling the quote fails.
313    #[allow(unused_variables)]
314    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
315        Ok(())
316    }
317
318    /// Actions to be performed when receiving a trade.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if handling the trade fails.
323    #[allow(unused_variables)]
324    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
325        Ok(())
326    }
327
328    /// Actions to be performed when receiving a bar.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if handling the bar fails.
333    #[allow(unused_variables)]
334    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
335        Ok(())
336    }
337
338    /// Actions to be performed when receiving a mark price update.
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if handling the mark price update fails.
343    #[allow(unused_variables)]
344    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
345        Ok(())
346    }
347
348    /// Actions to be performed when receiving an index price update.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if handling the index price update fails.
353    #[allow(unused_variables)]
354    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
355        Ok(())
356    }
357
358    /// Actions to be performed when receiving a funding rate update.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if handling the funding rate update fails.
363    #[allow(unused_variables)]
364    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
365        Ok(())
366    }
367
368    /// Actions to be performed when receiving an instrument status update.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if handling the instrument status update fails.
373    #[allow(unused_variables)]
374    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
375        Ok(())
376    }
377
378    /// Actions to be performed when receiving an instrument close update.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if handling the instrument close update fails.
383    #[allow(unused_variables)]
384    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
385        Ok(())
386    }
387
388    /// Actions to be performed when receiving an order filled event.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if handling the order filled event fails.
393    #[allow(unused_variables)]
394    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
395        Ok(())
396    }
397
398    /// Actions to be performed when receiving an order canceled event.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if handling the order canceled event fails.
403    #[allow(unused_variables)]
404    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
405        Ok(())
406    }
407
408    #[cfg(feature = "defi")]
409    /// Actions to be performed when receiving a block.
410    ///
411    /// # Errors
412    ///
413    /// Returns an error if handling the block fails.
414    #[allow(unused_variables)]
415    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
416        Ok(())
417    }
418
419    #[cfg(feature = "defi")]
420    /// Actions to be performed when receiving a pool.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if handling the pool fails.
425    #[allow(unused_variables)]
426    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
427        Ok(())
428    }
429
430    #[cfg(feature = "defi")]
431    /// Actions to be performed when receiving a pool swap.
432    ///
433    /// # Errors
434    ///
435    /// Returns an error if handling the pool swap fails.
436    #[allow(unused_variables)]
437    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
438        Ok(())
439    }
440
441    #[cfg(feature = "defi")]
442    /// Actions to be performed when receiving a pool liquidity update.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if handling the pool liquidity update fails.
447    #[allow(unused_variables)]
448    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
449        Ok(())
450    }
451
452    #[cfg(feature = "defi")]
453    /// Actions to be performed when receiving a pool fee collect event.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if handling the pool fee collect fails.
458    #[allow(unused_variables)]
459    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
460        Ok(())
461    }
462
463    #[cfg(feature = "defi")]
464    /// Actions to be performed when receiving a pool flash event.
465    ///
466    /// # Errors
467    ///
468    /// Returns an error if handling the pool flash fails.
469    #[allow(unused_variables)]
470    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
471        Ok(())
472    }
473
474    /// Actions to be performed when receiving historical data.
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if handling the historical data fails.
479    #[allow(unused_variables)]
480    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
481        Ok(())
482    }
483
484    /// Actions to be performed when receiving historical quotes.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if handling the historical quotes fails.
489    #[allow(unused_variables)]
490    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
491        Ok(())
492    }
493
494    /// Actions to be performed when receiving historical trades.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if handling the historical trades fails.
499    #[allow(unused_variables)]
500    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
501        Ok(())
502    }
503
504    /// Actions to be performed when receiving historical funding rates.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if handling the historical funding rates fails.
509    #[allow(unused_variables)]
510    fn on_historical_funding_rates(
511        &mut self,
512        funding_rates: &[FundingRateUpdate],
513    ) -> anyhow::Result<()> {
514        Ok(())
515    }
516
517    /// Actions to be performed when receiving historical bars.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if handling the historical bars fails.
522    #[allow(unused_variables)]
523    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
524        Ok(())
525    }
526
527    /// Actions to be performed when receiving historical mark prices.
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if handling the historical mark prices fails.
532    #[allow(unused_variables)]
533    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
534        Ok(())
535    }
536
537    /// Actions to be performed when receiving historical index prices.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if handling the historical index prices fails.
542    #[allow(unused_variables)]
543    fn on_historical_index_prices(
544        &mut self,
545        index_prices: &[IndexPriceUpdate],
546    ) -> anyhow::Result<()> {
547        Ok(())
548    }
549
550    /// Handles a received time event.
551    fn handle_time_event(&mut self, event: &TimeEvent) {
552        log_received(&event);
553
554        if let Err(e) = DataActor::on_time_event(self, event) {
555            log_error(&e);
556        }
557    }
558
559    /// Handles a received custom data point.
560    fn handle_data(&mut self, data: &dyn Any) {
561        log_received(&data);
562
563        if self.not_running() {
564            log_not_running(&data);
565            return;
566        }
567
568        if let Err(e) = self.on_data(data) {
569            log_error(&e);
570        }
571    }
572
573    /// Handles a received signal.
574    fn handle_signal(&mut self, signal: &Signal) {
575        log_received(&signal);
576
577        if self.not_running() {
578            log_not_running(&signal);
579            return;
580        }
581
582        if let Err(e) = self.on_signal(signal) {
583            log_error(&e);
584        }
585    }
586
587    /// Handles a received instrument.
588    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
589        log_received(&instrument);
590
591        if self.not_running() {
592            log_not_running(&instrument);
593            return;
594        }
595
596        if let Err(e) = self.on_instrument(instrument) {
597            log_error(&e);
598        }
599    }
600
601    /// Handles received order book deltas.
602    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
603        log_received(&deltas);
604
605        if self.not_running() {
606            log_not_running(&deltas);
607            return;
608        }
609
610        if let Err(e) = self.on_book_deltas(deltas) {
611            log_error(&e);
612        }
613    }
614
615    /// Handles a received order book reference.
616    fn handle_book(&mut self, book: &OrderBook) {
617        log_received(&book);
618
619        if self.not_running() {
620            log_not_running(&book);
621            return;
622        }
623
624        if let Err(e) = self.on_book(book) {
625            log_error(&e);
626        };
627    }
628
629    /// Handles a received quote.
630    fn handle_quote(&mut self, quote: &QuoteTick) {
631        log_received(&quote);
632
633        if self.not_running() {
634            log_not_running(&quote);
635            return;
636        }
637
638        if let Err(e) = self.on_quote(quote) {
639            log_error(&e);
640        }
641    }
642
643    /// Handles a received trade.
644    fn handle_trade(&mut self, trade: &TradeTick) {
645        log_received(&trade);
646
647        if self.not_running() {
648            log_not_running(&trade);
649            return;
650        }
651
652        if let Err(e) = self.on_trade(trade) {
653            log_error(&e);
654        }
655    }
656
657    /// Handles a receiving bar.
658    fn handle_bar(&mut self, bar: &Bar) {
659        log_received(&bar);
660
661        if self.not_running() {
662            log_not_running(&bar);
663            return;
664        }
665
666        if let Err(e) = self.on_bar(bar) {
667            log_error(&e);
668        }
669    }
670
671    /// Handles a received mark price update.
672    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
673        log_received(&mark_price);
674
675        if self.not_running() {
676            log_not_running(&mark_price);
677            return;
678        }
679
680        if let Err(e) = self.on_mark_price(mark_price) {
681            log_error(&e);
682        }
683    }
684
685    /// Handles a received index price update.
686    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
687        log_received(&index_price);
688
689        if self.not_running() {
690            log_not_running(&index_price);
691            return;
692        }
693
694        if let Err(e) = self.on_index_price(index_price) {
695            log_error(&e);
696        }
697    }
698
699    /// Handles a received funding rate update.
700    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
701        log_received(&funding_rate);
702
703        if self.not_running() {
704            log_not_running(&funding_rate);
705            return;
706        }
707
708        if let Err(e) = self.on_funding_rate(funding_rate) {
709            log_error(&e);
710        }
711    }
712
713    /// Handles a received instrument status.
714    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
715        log_received(&status);
716
717        if self.not_running() {
718            log_not_running(&status);
719            return;
720        }
721
722        if let Err(e) = self.on_instrument_status(status) {
723            log_error(&e);
724        }
725    }
726
727    /// Handles a received instrument close.
728    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
729        log_received(&close);
730
731        if self.not_running() {
732            log_not_running(&close);
733            return;
734        }
735
736        if let Err(e) = self.on_instrument_close(close) {
737            log_error(&e);
738        }
739    }
740
741    /// Handles a received order filled event.
742    fn handle_order_filled(&mut self, event: &OrderFilled) {
743        log_received(&event);
744
745        // Check for double-handling: if the event's strategy_id matches this actor's id,
746        // it means a Strategy is receiving its own fill event through both automatic
747        // subscription and manual subscribe_order_fills, so skip the manual handler.
748        if event.strategy_id.inner() == self.actor_id().inner() {
749            return;
750        }
751
752        if self.not_running() {
753            log_not_running(&event);
754            return;
755        }
756
757        if let Err(e) = self.on_order_filled(event) {
758            log_error(&e);
759        }
760    }
761
762    /// Handles a received order canceled event.
763    fn handle_order_canceled(&mut self, event: &OrderCanceled) {
764        log_received(&event);
765
766        // Check for double-handling: if the event's strategy_id matches this actor's id,
767        // it means a Strategy is receiving its own cancel event through both automatic
768        // subscription and manual subscribe_order_cancels, so skip the manual handler.
769        if event.strategy_id.inner() == self.actor_id().inner() {
770            return;
771        }
772
773        if self.not_running() {
774            log_not_running(&event);
775            return;
776        }
777
778        if let Err(e) = self.on_order_canceled(event) {
779            log_error(&e);
780        }
781    }
782
783    #[cfg(feature = "defi")]
784    /// Handles a received block.
785    fn handle_block(&mut self, block: &Block) {
786        log_received(&block);
787
788        if self.not_running() {
789            log_not_running(&block);
790            return;
791        }
792
793        if let Err(e) = self.on_block(block) {
794            log_error(&e);
795        }
796    }
797
798    #[cfg(feature = "defi")]
799    /// Handles a received pool definition update.
800    fn handle_pool(&mut self, pool: &Pool) {
801        log_received(&pool);
802
803        if self.not_running() {
804            log_not_running(&pool);
805            return;
806        }
807
808        if let Err(e) = self.on_pool(pool) {
809            log_error(&e);
810        }
811    }
812
813    #[cfg(feature = "defi")]
814    /// Handles a received pool swap.
815    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
816        log_received(&swap);
817
818        if self.not_running() {
819            log_not_running(&swap);
820            return;
821        }
822
823        if let Err(e) = self.on_pool_swap(swap) {
824            log_error(&e);
825        }
826    }
827
828    #[cfg(feature = "defi")]
829    /// Handles a received pool liquidity update.
830    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
831        log_received(&update);
832
833        if self.not_running() {
834            log_not_running(&update);
835            return;
836        }
837
838        if let Err(e) = self.on_pool_liquidity_update(update) {
839            log_error(&e);
840        }
841    }
842
843    #[cfg(feature = "defi")]
844    /// Handles a received pool fee collect.
845    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
846        log_received(&collect);
847
848        if self.not_running() {
849            log_not_running(&collect);
850            return;
851        }
852
853        if let Err(e) = self.on_pool_fee_collect(collect) {
854            log_error(&e);
855        }
856    }
857
858    #[cfg(feature = "defi")]
859    /// Handles a received pool flash event.
860    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
861        log_received(&flash);
862
863        if self.not_running() {
864            log_not_running(&flash);
865            return;
866        }
867
868        if let Err(e) = self.on_pool_flash(flash) {
869            log_error(&e);
870        }
871    }
872
873    /// Handles received historical data.
874    fn handle_historical_data(&mut self, data: &dyn Any) {
875        log_received(&data);
876
877        if let Err(e) = self.on_historical_data(data) {
878            log_error(&e);
879        }
880    }
881
882    /// Handles a data response.
883    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
884        log_received(&resp);
885
886        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
887            log_error(&e);
888        }
889    }
890
891    /// Handles an instrument response.
892    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
893        log_received(&resp);
894
895        if let Err(e) = self.on_instrument(&resp.data) {
896            log_error(&e);
897        }
898    }
899
900    /// Handles an instruments response.
901    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
902        log_received(&resp);
903
904        for inst in &resp.data {
905            if let Err(e) = self.on_instrument(inst) {
906                log_error(&e);
907            }
908        }
909    }
910
911    /// Handles a book response.
912    fn handle_book_response(&mut self, resp: &BookResponse) {
913        log_received(&resp);
914
915        if let Err(e) = self.on_book(&resp.data) {
916            log_error(&e);
917        }
918    }
919
920    /// Handles a quotes response.
921    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
922        log_received(&resp);
923
924        if let Err(e) = self.on_historical_quotes(&resp.data) {
925            log_error(&e);
926        }
927    }
928
929    /// Handles a trades response.
930    fn handle_trades_response(&mut self, resp: &TradesResponse) {
931        log_received(&resp);
932
933        if let Err(e) = self.on_historical_trades(&resp.data) {
934            log_error(&e);
935        }
936    }
937
938    /// Handles a funding rates response.
939    fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
940        log_received(&resp);
941
942        if let Err(e) = self.on_historical_funding_rates(&resp.data) {
943            log_error(&e);
944        }
945    }
946
947    /// Handles a bars response.
948    fn handle_bars_response(&mut self, resp: &BarsResponse) {
949        log_received(&resp);
950
951        if let Err(e) = self.on_historical_bars(&resp.data) {
952            log_error(&e);
953        }
954    }
955
956    /// Subscribe to streaming `data_type` data.
957    fn subscribe_data(
958        &mut self,
959        data_type: DataType,
960        client_id: Option<ClientId>,
961        params: Option<IndexMap<String, String>>,
962    ) where
963        Self: 'static + Debug + Sized,
964    {
965        let actor_id = self.actor_id().inner();
966        let handler = ShareableMessageHandler::from_any(move |data: &dyn Any| {
967            get_actor_unchecked::<Self>(&actor_id).handle_data(data);
968        });
969
970        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
971    }
972
973    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
974    fn subscribe_quotes(
975        &mut self,
976        instrument_id: InstrumentId,
977        client_id: Option<ClientId>,
978        params: Option<IndexMap<String, String>>,
979    ) where
980        Self: 'static + Debug + Sized,
981    {
982        let actor_id = self.actor_id().inner();
983        let topic = get_quotes_topic(instrument_id);
984
985        let handler = TypedHandler::from(move |quote: &QuoteTick| {
986            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
987                actor.handle_quote(quote);
988            } else {
989                log::error!("Actor {actor_id} not found for quote handling");
990            }
991        });
992
993        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
994    }
995
996    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
997    fn subscribe_instruments(
998        &mut self,
999        venue: Venue,
1000        client_id: Option<ClientId>,
1001        params: Option<IndexMap<String, String>>,
1002    ) where
1003        Self: 'static + Debug + Sized,
1004    {
1005        let actor_id = self.actor_id().inner();
1006        let topic = get_instruments_topic(venue);
1007
1008        let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1009            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1010                actor.handle_instrument(instrument);
1011            } else {
1012                log::error!("Actor {actor_id} not found for instruments handling");
1013            }
1014        });
1015
1016        DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
1017    }
1018
1019    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
1020    fn subscribe_instrument(
1021        &mut self,
1022        instrument_id: InstrumentId,
1023        client_id: Option<ClientId>,
1024        params: Option<IndexMap<String, String>>,
1025    ) where
1026        Self: 'static + Debug + Sized,
1027    {
1028        let actor_id = self.actor_id().inner();
1029        let topic = get_instrument_topic(instrument_id);
1030
1031        let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1032            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1033                actor.handle_instrument(instrument);
1034            } else {
1035                log::error!("Actor {actor_id} not found for instrument handling");
1036            }
1037        });
1038
1039        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1040    }
1041
1042    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1043    fn subscribe_book_deltas(
1044        &mut self,
1045        instrument_id: InstrumentId,
1046        book_type: BookType,
1047        depth: Option<NonZeroUsize>,
1048        client_id: Option<ClientId>,
1049        managed: bool,
1050        params: Option<IndexMap<String, String>>,
1051    ) where
1052        Self: 'static + Debug + Sized,
1053    {
1054        let actor_id = self.actor_id().inner();
1055        let topic = get_book_deltas_topic(instrument_id);
1056
1057        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1058            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1059        });
1060
1061        DataActorCore::subscribe_book_deltas(
1062            self,
1063            topic,
1064            handler,
1065            instrument_id,
1066            book_type,
1067            depth,
1068            client_id,
1069            managed,
1070            params,
1071        );
1072    }
1073
1074    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1075    fn subscribe_book_at_interval(
1076        &mut self,
1077        instrument_id: InstrumentId,
1078        book_type: BookType,
1079        depth: Option<NonZeroUsize>,
1080        interval_ms: NonZeroUsize,
1081        client_id: Option<ClientId>,
1082        params: Option<IndexMap<String, String>>,
1083    ) where
1084        Self: 'static + Debug + Sized,
1085    {
1086        let actor_id = self.actor_id().inner();
1087        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1088
1089        let handler = TypedHandler::from(move |book: &OrderBook| {
1090            get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1091        });
1092
1093        DataActorCore::subscribe_book_at_interval(
1094            self,
1095            topic,
1096            handler,
1097            instrument_id,
1098            book_type,
1099            depth,
1100            interval_ms,
1101            client_id,
1102            params,
1103        );
1104    }
1105
1106    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1107    fn subscribe_trades(
1108        &mut self,
1109        instrument_id: InstrumentId,
1110        client_id: Option<ClientId>,
1111        params: Option<IndexMap<String, String>>,
1112    ) where
1113        Self: 'static + Debug + Sized,
1114    {
1115        let actor_id = self.actor_id().inner();
1116        let topic = get_trades_topic(instrument_id);
1117
1118        let handler = TypedHandler::from(move |trade: &TradeTick| {
1119            get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1120        });
1121
1122        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1123    }
1124
1125    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1126    fn subscribe_bars(
1127        &mut self,
1128        bar_type: BarType,
1129        client_id: Option<ClientId>,
1130        params: Option<IndexMap<String, String>>,
1131    ) where
1132        Self: 'static + Debug + Sized,
1133    {
1134        let actor_id = self.actor_id().inner();
1135        let topic = get_bars_topic(bar_type);
1136
1137        let handler = TypedHandler::from(move |bar: &Bar| {
1138            get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1139        });
1140
1141        DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1142    }
1143
1144    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1145    fn subscribe_mark_prices(
1146        &mut self,
1147        instrument_id: InstrumentId,
1148        client_id: Option<ClientId>,
1149        params: Option<IndexMap<String, String>>,
1150    ) where
1151        Self: 'static + Debug + Sized,
1152    {
1153        let actor_id = self.actor_id().inner();
1154        let topic = get_mark_price_topic(instrument_id);
1155
1156        let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1157            get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1158        });
1159
1160        DataActorCore::subscribe_mark_prices(
1161            self,
1162            topic,
1163            handler,
1164            instrument_id,
1165            client_id,
1166            params,
1167        );
1168    }
1169
1170    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1171    fn subscribe_index_prices(
1172        &mut self,
1173        instrument_id: InstrumentId,
1174        client_id: Option<ClientId>,
1175        params: Option<IndexMap<String, String>>,
1176    ) where
1177        Self: 'static + Debug + Sized,
1178    {
1179        let actor_id = self.actor_id().inner();
1180        let topic = get_index_price_topic(instrument_id);
1181
1182        let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1183            get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1184        });
1185
1186        DataActorCore::subscribe_index_prices(
1187            self,
1188            topic,
1189            handler,
1190            instrument_id,
1191            client_id,
1192            params,
1193        );
1194    }
1195
1196    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1197    fn subscribe_funding_rates(
1198        &mut self,
1199        instrument_id: InstrumentId,
1200        client_id: Option<ClientId>,
1201        params: Option<IndexMap<String, String>>,
1202    ) where
1203        Self: 'static + Debug + Sized,
1204    {
1205        let actor_id = self.actor_id().inner();
1206        let topic = get_funding_rate_topic(instrument_id);
1207
1208        let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1209            get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1210        });
1211
1212        DataActorCore::subscribe_funding_rates(
1213            self,
1214            topic,
1215            handler,
1216            instrument_id,
1217            client_id,
1218            params,
1219        );
1220    }
1221
1222    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1223    fn subscribe_instrument_status(
1224        &mut self,
1225        instrument_id: InstrumentId,
1226        client_id: Option<ClientId>,
1227        params: Option<IndexMap<String, String>>,
1228    ) where
1229        Self: 'static + Debug + Sized,
1230    {
1231        let actor_id = self.actor_id().inner();
1232        let topic = get_instrument_status_topic(instrument_id);
1233
1234        let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1235            get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1236        });
1237
1238        DataActorCore::subscribe_instrument_status(
1239            self,
1240            topic,
1241            handler,
1242            instrument_id,
1243            client_id,
1244            params,
1245        );
1246    }
1247
1248    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1249    fn subscribe_instrument_close(
1250        &mut self,
1251        instrument_id: InstrumentId,
1252        client_id: Option<ClientId>,
1253        params: Option<IndexMap<String, String>>,
1254    ) where
1255        Self: 'static + Debug + Sized,
1256    {
1257        let actor_id = self.actor_id().inner();
1258        let topic = get_instrument_close_topic(instrument_id);
1259
1260        let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1261            get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1262        });
1263
1264        DataActorCore::subscribe_instrument_close(
1265            self,
1266            topic,
1267            handler,
1268            instrument_id,
1269            client_id,
1270            params,
1271        );
1272    }
1273
1274    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1275    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1276    where
1277        Self: 'static + Debug + Sized,
1278    {
1279        let actor_id = self.actor_id().inner();
1280        let topic = get_order_fills_topic(instrument_id);
1281
1282        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1283            if let OrderEventAny::Filled(filled) = event {
1284                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1285            }
1286        });
1287
1288        DataActorCore::subscribe_order_fills(self, topic, handler);
1289    }
1290
1291    /// Subscribe to [`OrderCanceled`] events for the `instrument_id`.
1292    fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1293    where
1294        Self: 'static + Debug + Sized,
1295    {
1296        let actor_id = self.actor_id().inner();
1297        let topic = get_order_cancels_topic(instrument_id);
1298
1299        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1300            if let OrderEventAny::Canceled(canceled) = event {
1301                get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1302            }
1303        });
1304
1305        DataActorCore::subscribe_order_cancels(self, topic, handler);
1306    }
1307
1308    #[cfg(feature = "defi")]
1309    /// Subscribe to streaming [`Block`] data for the `chain`.
1310    fn subscribe_blocks(
1311        &mut self,
1312        chain: Blockchain,
1313        client_id: Option<ClientId>,
1314        params: Option<IndexMap<String, String>>,
1315    ) where
1316        Self: 'static + Debug + Sized,
1317    {
1318        let actor_id = self.actor_id().inner();
1319        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1320
1321        let handler = TypedHandler::from(move |block: &Block| {
1322            get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1323        });
1324
1325        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1326    }
1327
1328    #[cfg(feature = "defi")]
1329    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1330    fn subscribe_pool(
1331        &mut self,
1332        instrument_id: InstrumentId,
1333        client_id: Option<ClientId>,
1334        params: Option<IndexMap<String, String>>,
1335    ) where
1336        Self: 'static + Debug + Sized,
1337    {
1338        let actor_id = self.actor_id().inner();
1339        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1340
1341        let handler = TypedHandler::from(move |pool: &Pool| {
1342            get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1343        });
1344
1345        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1346    }
1347
1348    #[cfg(feature = "defi")]
1349    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1350    fn subscribe_pool_swaps(
1351        &mut self,
1352        instrument_id: InstrumentId,
1353        client_id: Option<ClientId>,
1354        params: Option<IndexMap<String, String>>,
1355    ) where
1356        Self: 'static + Debug + Sized,
1357    {
1358        let actor_id = self.actor_id().inner();
1359        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1360
1361        let handler = TypedHandler::from(move |swap: &PoolSwap| {
1362            get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1363        });
1364
1365        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1366    }
1367
1368    #[cfg(feature = "defi")]
1369    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1370    fn subscribe_pool_liquidity_updates(
1371        &mut self,
1372        instrument_id: InstrumentId,
1373        client_id: Option<ClientId>,
1374        params: Option<IndexMap<String, String>>,
1375    ) where
1376        Self: 'static + Debug + Sized,
1377    {
1378        let actor_id = self.actor_id().inner();
1379        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1380
1381        let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1382            get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1383        });
1384
1385        DataActorCore::subscribe_pool_liquidity_updates(
1386            self,
1387            topic,
1388            handler,
1389            instrument_id,
1390            client_id,
1391            params,
1392        );
1393    }
1394
1395    #[cfg(feature = "defi")]
1396    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1397    fn subscribe_pool_fee_collects(
1398        &mut self,
1399        instrument_id: InstrumentId,
1400        client_id: Option<ClientId>,
1401        params: Option<IndexMap<String, String>>,
1402    ) where
1403        Self: 'static + Debug + Sized,
1404    {
1405        let actor_id = self.actor_id().inner();
1406        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1407
1408        let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1409            get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1410        });
1411
1412        DataActorCore::subscribe_pool_fee_collects(
1413            self,
1414            topic,
1415            handler,
1416            instrument_id,
1417            client_id,
1418            params,
1419        );
1420    }
1421
1422    #[cfg(feature = "defi")]
1423    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1424    fn subscribe_pool_flash_events(
1425        &mut self,
1426        instrument_id: InstrumentId,
1427        client_id: Option<ClientId>,
1428        params: Option<IndexMap<String, String>>,
1429    ) where
1430        Self: 'static + Debug + Sized,
1431    {
1432        let actor_id = self.actor_id().inner();
1433        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1434
1435        let handler = TypedHandler::from(move |flash: &PoolFlash| {
1436            get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1437        });
1438
1439        DataActorCore::subscribe_pool_flash_events(
1440            self,
1441            topic,
1442            handler,
1443            instrument_id,
1444            client_id,
1445            params,
1446        );
1447    }
1448
1449    /// Unsubscribe from streaming `data_type` data.
1450    fn unsubscribe_data(
1451        &mut self,
1452        data_type: DataType,
1453        client_id: Option<ClientId>,
1454        params: Option<IndexMap<String, String>>,
1455    ) where
1456        Self: 'static + Debug + Sized,
1457    {
1458        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1459    }
1460
1461    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1462    fn unsubscribe_instruments(
1463        &mut self,
1464        venue: Venue,
1465        client_id: Option<ClientId>,
1466        params: Option<IndexMap<String, String>>,
1467    ) where
1468        Self: 'static + Debug + Sized,
1469    {
1470        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1471    }
1472
1473    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1474    fn unsubscribe_instrument(
1475        &mut self,
1476        instrument_id: InstrumentId,
1477        client_id: Option<ClientId>,
1478        params: Option<IndexMap<String, String>>,
1479    ) where
1480        Self: 'static + Debug + Sized,
1481    {
1482        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1483    }
1484
1485    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1486    fn unsubscribe_book_deltas(
1487        &mut self,
1488        instrument_id: InstrumentId,
1489        client_id: Option<ClientId>,
1490        params: Option<IndexMap<String, String>>,
1491    ) where
1492        Self: 'static + Debug + Sized,
1493    {
1494        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1495    }
1496
1497    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1498    fn unsubscribe_book_at_interval(
1499        &mut self,
1500        instrument_id: InstrumentId,
1501        interval_ms: NonZeroUsize,
1502        client_id: Option<ClientId>,
1503        params: Option<IndexMap<String, String>>,
1504    ) where
1505        Self: 'static + Debug + Sized,
1506    {
1507        DataActorCore::unsubscribe_book_at_interval(
1508            self,
1509            instrument_id,
1510            interval_ms,
1511            client_id,
1512            params,
1513        );
1514    }
1515
1516    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1517    fn unsubscribe_quotes(
1518        &mut self,
1519        instrument_id: InstrumentId,
1520        client_id: Option<ClientId>,
1521        params: Option<IndexMap<String, String>>,
1522    ) where
1523        Self: 'static + Debug + Sized,
1524    {
1525        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1526    }
1527
1528    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1529    fn unsubscribe_trades(
1530        &mut self,
1531        instrument_id: InstrumentId,
1532        client_id: Option<ClientId>,
1533        params: Option<IndexMap<String, String>>,
1534    ) where
1535        Self: 'static + Debug + Sized,
1536    {
1537        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1538    }
1539
1540    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1541    fn unsubscribe_bars(
1542        &mut self,
1543        bar_type: BarType,
1544        client_id: Option<ClientId>,
1545        params: Option<IndexMap<String, String>>,
1546    ) where
1547        Self: 'static + Debug + Sized,
1548    {
1549        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1550    }
1551
1552    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1553    fn unsubscribe_mark_prices(
1554        &mut self,
1555        instrument_id: InstrumentId,
1556        client_id: Option<ClientId>,
1557        params: Option<IndexMap<String, String>>,
1558    ) where
1559        Self: 'static + Debug + Sized,
1560    {
1561        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1562    }
1563
1564    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1565    fn unsubscribe_index_prices(
1566        &mut self,
1567        instrument_id: InstrumentId,
1568        client_id: Option<ClientId>,
1569        params: Option<IndexMap<String, String>>,
1570    ) where
1571        Self: 'static + Debug + Sized,
1572    {
1573        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1574    }
1575
1576    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1577    fn unsubscribe_funding_rates(
1578        &mut self,
1579        instrument_id: InstrumentId,
1580        client_id: Option<ClientId>,
1581        params: Option<IndexMap<String, String>>,
1582    ) where
1583        Self: 'static + Debug + Sized,
1584    {
1585        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1586    }
1587
1588    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1589    fn unsubscribe_instrument_status(
1590        &mut self,
1591        instrument_id: InstrumentId,
1592        client_id: Option<ClientId>,
1593        params: Option<IndexMap<String, String>>,
1594    ) where
1595        Self: 'static + Debug + Sized,
1596    {
1597        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1598    }
1599
1600    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1601    fn unsubscribe_instrument_close(
1602        &mut self,
1603        instrument_id: InstrumentId,
1604        client_id: Option<ClientId>,
1605        params: Option<IndexMap<String, String>>,
1606    ) where
1607        Self: 'static + Debug + Sized,
1608    {
1609        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1610    }
1611
1612    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
1613    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1614    where
1615        Self: 'static + Debug + Sized,
1616    {
1617        DataActorCore::unsubscribe_order_fills(self, instrument_id);
1618    }
1619
1620    /// Unsubscribe from [`OrderCanceled`] events for the `instrument_id`.
1621    fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1622    where
1623        Self: 'static + Debug + Sized,
1624    {
1625        DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1626    }
1627
1628    #[cfg(feature = "defi")]
1629    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1630    fn unsubscribe_blocks(
1631        &mut self,
1632        chain: Blockchain,
1633        client_id: Option<ClientId>,
1634        params: Option<IndexMap<String, String>>,
1635    ) where
1636        Self: 'static + Debug + Sized,
1637    {
1638        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1639    }
1640
1641    #[cfg(feature = "defi")]
1642    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1643    fn unsubscribe_pool(
1644        &mut self,
1645        instrument_id: InstrumentId,
1646        client_id: Option<ClientId>,
1647        params: Option<IndexMap<String, String>>,
1648    ) where
1649        Self: 'static + Debug + Sized,
1650    {
1651        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1652    }
1653
1654    #[cfg(feature = "defi")]
1655    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1656    fn unsubscribe_pool_swaps(
1657        &mut self,
1658        instrument_id: InstrumentId,
1659        client_id: Option<ClientId>,
1660        params: Option<IndexMap<String, String>>,
1661    ) where
1662        Self: 'static + Debug + Sized,
1663    {
1664        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1665    }
1666
1667    #[cfg(feature = "defi")]
1668    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1669    fn unsubscribe_pool_liquidity_updates(
1670        &mut self,
1671        instrument_id: InstrumentId,
1672        client_id: Option<ClientId>,
1673        params: Option<IndexMap<String, String>>,
1674    ) where
1675        Self: 'static + Debug + Sized,
1676    {
1677        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1678    }
1679
1680    #[cfg(feature = "defi")]
1681    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
1682    fn unsubscribe_pool_fee_collects(
1683        &mut self,
1684        instrument_id: InstrumentId,
1685        client_id: Option<ClientId>,
1686        params: Option<IndexMap<String, String>>,
1687    ) where
1688        Self: 'static + Debug + Sized,
1689    {
1690        DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1691    }
1692
1693    #[cfg(feature = "defi")]
1694    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
1695    fn unsubscribe_pool_flash_events(
1696        &mut self,
1697        instrument_id: InstrumentId,
1698        client_id: Option<ClientId>,
1699        params: Option<IndexMap<String, String>>,
1700    ) where
1701        Self: 'static + Debug + Sized,
1702    {
1703        DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1704    }
1705
1706    /// Request historical custom data of the given `data_type`.
1707    ///
1708    /// # Errors
1709    ///
1710    /// Returns an error if input parameters are invalid.
1711    fn request_data(
1712        &mut self,
1713        data_type: DataType,
1714        client_id: ClientId,
1715        start: Option<DateTime<Utc>>,
1716        end: Option<DateTime<Utc>>,
1717        limit: Option<NonZeroUsize>,
1718        params: Option<IndexMap<String, String>>,
1719    ) -> anyhow::Result<UUID4>
1720    where
1721        Self: 'static + Debug + Sized,
1722    {
1723        let actor_id = self.actor_id().inner();
1724        let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1725            get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1726        });
1727
1728        DataActorCore::request_data(
1729            self, data_type, client_id, start, end, limit, params, handler,
1730        )
1731    }
1732
1733    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1734    ///
1735    /// # Errors
1736    ///
1737    /// Returns an error if input parameters are invalid.
1738    fn request_instrument(
1739        &mut self,
1740        instrument_id: InstrumentId,
1741        start: Option<DateTime<Utc>>,
1742        end: Option<DateTime<Utc>>,
1743        client_id: Option<ClientId>,
1744        params: Option<IndexMap<String, String>>,
1745    ) -> anyhow::Result<UUID4>
1746    where
1747        Self: 'static + Debug + Sized,
1748    {
1749        let actor_id = self.actor_id().inner();
1750        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1751            get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1752        });
1753
1754        DataActorCore::request_instrument(
1755            self,
1756            instrument_id,
1757            start,
1758            end,
1759            client_id,
1760            params,
1761            handler,
1762        )
1763    }
1764
1765    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1766    ///
1767    /// # Errors
1768    ///
1769    /// Returns an error if input parameters are invalid.
1770    fn request_instruments(
1771        &mut self,
1772        venue: Option<Venue>,
1773        start: Option<DateTime<Utc>>,
1774        end: Option<DateTime<Utc>>,
1775        client_id: Option<ClientId>,
1776        params: Option<IndexMap<String, String>>,
1777    ) -> anyhow::Result<UUID4>
1778    where
1779        Self: 'static + Debug + Sized,
1780    {
1781        let actor_id = self.actor_id().inner();
1782        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1783            get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1784        });
1785
1786        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1787    }
1788
1789    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1790    ///
1791    /// # Errors
1792    ///
1793    /// Returns an error if input parameters are invalid.
1794    fn request_book_snapshot(
1795        &mut self,
1796        instrument_id: InstrumentId,
1797        depth: Option<NonZeroUsize>,
1798        client_id: Option<ClientId>,
1799        params: Option<IndexMap<String, String>>,
1800    ) -> anyhow::Result<UUID4>
1801    where
1802        Self: 'static + Debug + Sized,
1803    {
1804        let actor_id = self.actor_id().inner();
1805        let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
1806            get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1807        });
1808
1809        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1810    }
1811
1812    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1813    ///
1814    /// # Errors
1815    ///
1816    /// Returns an error if input parameters are invalid.
1817    fn request_quotes(
1818        &mut self,
1819        instrument_id: InstrumentId,
1820        start: Option<DateTime<Utc>>,
1821        end: Option<DateTime<Utc>>,
1822        limit: Option<NonZeroUsize>,
1823        client_id: Option<ClientId>,
1824        params: Option<IndexMap<String, String>>,
1825    ) -> anyhow::Result<UUID4>
1826    where
1827        Self: 'static + Debug + Sized,
1828    {
1829        let actor_id = self.actor_id().inner();
1830        let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
1831            get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1832        });
1833
1834        DataActorCore::request_quotes(
1835            self,
1836            instrument_id,
1837            start,
1838            end,
1839            limit,
1840            client_id,
1841            params,
1842            handler,
1843        )
1844    }
1845
1846    /// Request historical [`TradeTick`] data for the given `instrument_id`.
1847    ///
1848    /// # Errors
1849    ///
1850    /// Returns an error if input parameters are invalid.
1851    fn request_trades(
1852        &mut self,
1853        instrument_id: InstrumentId,
1854        start: Option<DateTime<Utc>>,
1855        end: Option<DateTime<Utc>>,
1856        limit: Option<NonZeroUsize>,
1857        client_id: Option<ClientId>,
1858        params: Option<IndexMap<String, String>>,
1859    ) -> anyhow::Result<UUID4>
1860    where
1861        Self: 'static + Debug + Sized,
1862    {
1863        let actor_id = self.actor_id().inner();
1864        let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
1865            get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1866        });
1867
1868        DataActorCore::request_trades(
1869            self,
1870            instrument_id,
1871            start,
1872            end,
1873            limit,
1874            client_id,
1875            params,
1876            handler,
1877        )
1878    }
1879
1880    /// Request historical [`FundingRateUpdate`] data for the given `instrument_id`.
1881    ///
1882    /// # Errors
1883    ///
1884    /// Returns an error if input parameters are invalid.
1885    fn request_funding_rates(
1886        &mut self,
1887        instrument_id: InstrumentId,
1888        start: Option<DateTime<Utc>>,
1889        end: Option<DateTime<Utc>>,
1890        limit: Option<NonZeroUsize>,
1891        client_id: Option<ClientId>,
1892        params: Option<IndexMap<String, String>>,
1893    ) -> anyhow::Result<UUID4>
1894    where
1895        Self: 'static + Debug + Sized,
1896    {
1897        let actor_id = self.actor_id().inner();
1898        let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
1899            get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
1900        });
1901
1902        DataActorCore::request_funding_rates(
1903            self,
1904            instrument_id,
1905            start,
1906            end,
1907            limit,
1908            client_id,
1909            params,
1910            handler,
1911        )
1912    }
1913
1914    /// Request historical [`Bar`] data for the given `bar_type`.
1915    ///
1916    /// # Errors
1917    ///
1918    /// Returns an error if input parameters are invalid.
1919    fn request_bars(
1920        &mut self,
1921        bar_type: BarType,
1922        start: Option<DateTime<Utc>>,
1923        end: Option<DateTime<Utc>>,
1924        limit: Option<NonZeroUsize>,
1925        client_id: Option<ClientId>,
1926        params: Option<IndexMap<String, String>>,
1927    ) -> anyhow::Result<UUID4>
1928    where
1929        Self: 'static + Debug + Sized,
1930    {
1931        let actor_id = self.actor_id().inner();
1932        let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
1933            get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1934        });
1935
1936        DataActorCore::request_bars(
1937            self, bar_type, start, end, limit, client_id, params, handler,
1938        )
1939    }
1940}
1941
1942// Blanket implementation: any DataActor automatically implements Actor
1943impl<T> Actor for T
1944where
1945    T: DataActor + Debug + 'static,
1946{
1947    fn id(&self) -> Ustr {
1948        self.actor_id.inner()
1949    }
1950
1951    #[allow(unused_variables)]
1952    fn handle(&mut self, msg: &dyn Any) {
1953        // Default empty implementation - concrete actors can override if needed
1954    }
1955
1956    fn as_any(&self) -> &dyn Any {
1957        self
1958    }
1959}
1960
1961// Blanket implementation: any DataActor automatically implements Component
1962impl<T> Component for T
1963where
1964    T: DataActor + Debug + 'static,
1965{
1966    fn component_id(&self) -> ComponentId {
1967        ComponentId::new(self.actor_id.inner().as_str())
1968    }
1969
1970    fn state(&self) -> ComponentState {
1971        self.state
1972    }
1973
1974    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1975        self.state = self.state.transition(&trigger)?;
1976        log::info!("{}", self.state.variant_name());
1977        Ok(())
1978    }
1979
1980    fn register(
1981        &mut self,
1982        trader_id: TraderId,
1983        clock: Rc<RefCell<dyn Clock>>,
1984        cache: Rc<RefCell<Cache>>,
1985    ) -> anyhow::Result<()> {
1986        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1987
1988        // Register default time event handler for this actor
1989        let actor_id = self.actor_id().inner();
1990        let callback = TimeEventCallback::from(move |event: TimeEvent| {
1991            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1992                actor.handle_time_event(&event);
1993            } else {
1994                log::error!("Actor {actor_id} not found for time event handling");
1995            }
1996        });
1997
1998        clock.borrow_mut().register_default_handler(callback);
1999
2000        self.initialize()
2001    }
2002
2003    fn on_start(&mut self) -> anyhow::Result<()> {
2004        DataActor::on_start(self)
2005    }
2006
2007    fn on_stop(&mut self) -> anyhow::Result<()> {
2008        DataActor::on_stop(self)
2009    }
2010
2011    fn on_resume(&mut self) -> anyhow::Result<()> {
2012        DataActor::on_resume(self)
2013    }
2014
2015    fn on_degrade(&mut self) -> anyhow::Result<()> {
2016        DataActor::on_degrade(self)
2017    }
2018
2019    fn on_fault(&mut self) -> anyhow::Result<()> {
2020        DataActor::on_fault(self)
2021    }
2022
2023    fn on_reset(&mut self) -> anyhow::Result<()> {
2024        DataActor::on_reset(self)
2025    }
2026
2027    fn on_dispose(&mut self) -> anyhow::Result<()> {
2028        DataActor::on_dispose(self)
2029    }
2030}
2031
2032/// Core functionality for all actors.
2033#[derive(Clone)]
2034#[allow(
2035    dead_code,
2036    reason = "TODO: Under development (pending_requests, signal_classes)"
2037)]
2038pub struct DataActorCore {
2039    /// The actor identifier.
2040    pub actor_id: ActorId,
2041    /// The actors configuration.
2042    pub config: DataActorConfig,
2043    trader_id: Option<TraderId>,
2044    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
2045    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
2046    state: ComponentState,
2047    topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
2048    deltas_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDeltas>>,
2049    depth10_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDepth10>>,
2050    book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2051    quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2052    trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2053    bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2054    mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2055    index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2056    funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2057    order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2058    #[cfg(feature = "defi")]
2059    block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2060    #[cfg(feature = "defi")]
2061    pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2062    #[cfg(feature = "defi")]
2063    pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2064    #[cfg(feature = "defi")]
2065    pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2066    #[cfg(feature = "defi")]
2067    pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2068    #[cfg(feature = "defi")]
2069    pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2070    warning_events: AHashSet<String>, // TODO: TBD
2071    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2072    signal_classes: AHashMap<String, String>,
2073    #[cfg(feature = "indicators")]
2074    indicators: Indicators,
2075}
2076
2077impl Debug for DataActorCore {
2078    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2079        f.debug_struct(stringify!(DataActorCore))
2080            .field("actor_id", &self.actor_id)
2081            .field("config", &self.config)
2082            .field("state", &self.state)
2083            .field("trader_id", &self.trader_id)
2084            .finish()
2085    }
2086}
2087
2088impl DataActorCore {
2089    /// Adds a subscription handler for the `topic`.
2090    ///
2091    //// Logs a warning if the actor is already subscribed to the topic.
2092    pub(crate) fn add_subscription_any(
2093        &mut self,
2094        topic: MStr<Topic>,
2095        handler: ShareableMessageHandler,
2096    ) {
2097        if self.topic_handlers.contains_key(&topic) {
2098            log::warn!(
2099                "Actor {} attempted duplicate subscription to topic '{topic}'",
2100                self.actor_id,
2101            );
2102            return;
2103        }
2104
2105        self.topic_handlers.insert(topic, handler.clone());
2106        msgbus::subscribe_any(topic.into(), handler, None);
2107    }
2108
2109    /// Removes a subscription handler for the `topic` if present.
2110    ///
2111    /// Logs a warning if the actor is not currently subscribed to the topic.
2112    pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2113        if let Some(handler) = self.topic_handlers.remove(&topic) {
2114            msgbus::unsubscribe_any(topic.into(), handler);
2115        } else {
2116            log::warn!(
2117                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2118                self.actor_id,
2119            );
2120        }
2121    }
2122
2123    pub(crate) fn add_quote_subscription(
2124        &mut self,
2125        topic: MStr<Topic>,
2126        handler: TypedHandler<QuoteTick>,
2127    ) {
2128        if self.quote_handlers.contains_key(&topic) {
2129            log::warn!(
2130                "Actor {} attempted duplicate quote subscription to '{topic}'",
2131                self.actor_id
2132            );
2133            return;
2134        }
2135        self.quote_handlers.insert(topic, handler.clone());
2136        msgbus::subscribe_quotes(topic.into(), handler, None);
2137    }
2138
2139    #[allow(dead_code)]
2140    pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2141        if let Some(handler) = self.quote_handlers.remove(&topic) {
2142            msgbus::unsubscribe_quotes(topic.into(), &handler);
2143        }
2144    }
2145
2146    pub(crate) fn add_trade_subscription(
2147        &mut self,
2148        topic: MStr<Topic>,
2149        handler: TypedHandler<TradeTick>,
2150    ) {
2151        if self.trade_handlers.contains_key(&topic) {
2152            log::warn!(
2153                "Actor {} attempted duplicate trade subscription to '{topic}'",
2154                self.actor_id
2155            );
2156            return;
2157        }
2158        self.trade_handlers.insert(topic, handler.clone());
2159        msgbus::subscribe_trades(topic.into(), handler, None);
2160    }
2161
2162    #[allow(dead_code)]
2163    pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2164        if let Some(handler) = self.trade_handlers.remove(&topic) {
2165            msgbus::unsubscribe_trades(topic.into(), &handler);
2166        }
2167    }
2168
2169    pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2170        if self.bar_handlers.contains_key(&topic) {
2171            log::warn!(
2172                "Actor {} attempted duplicate bar subscription to '{topic}'",
2173                self.actor_id
2174            );
2175            return;
2176        }
2177        self.bar_handlers.insert(topic, handler.clone());
2178        msgbus::subscribe_bars(topic.into(), handler, None);
2179    }
2180
2181    #[allow(dead_code)]
2182    pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2183        if let Some(handler) = self.bar_handlers.remove(&topic) {
2184            msgbus::unsubscribe_bars(topic.into(), &handler);
2185        }
2186    }
2187
2188    pub(crate) fn add_order_event_subscription(
2189        &mut self,
2190        topic: MStr<Topic>,
2191        handler: TypedHandler<OrderEventAny>,
2192    ) {
2193        if self.order_event_handlers.contains_key(&topic) {
2194            log::warn!(
2195                "Actor {} attempted duplicate order event subscription to '{topic}'",
2196                self.actor_id
2197            );
2198            return;
2199        }
2200        self.order_event_handlers.insert(topic, handler.clone());
2201        msgbus::subscribe_order_events(topic.into(), handler, None);
2202    }
2203
2204    #[allow(dead_code)]
2205    pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2206        if let Some(handler) = self.order_event_handlers.remove(&topic) {
2207            msgbus::unsubscribe_order_events(topic.into(), &handler);
2208        }
2209    }
2210
2211    pub(crate) fn add_deltas_subscription(
2212        &mut self,
2213        topic: MStr<Topic>,
2214        handler: TypedHandler<OrderBookDeltas>,
2215    ) {
2216        if self.deltas_handlers.contains_key(&topic) {
2217            log::warn!(
2218                "Actor {} attempted duplicate deltas subscription to '{topic}'",
2219                self.actor_id
2220            );
2221            return;
2222        }
2223        self.deltas_handlers.insert(topic, handler.clone());
2224        msgbus::subscribe_book_deltas(topic.into(), handler, None);
2225    }
2226
2227    #[allow(dead_code)]
2228    pub(crate) fn remove_deltas_subscription(&mut self, topic: MStr<Topic>) {
2229        if let Some(handler) = self.deltas_handlers.remove(&topic) {
2230            msgbus::unsubscribe_book_deltas(topic.into(), &handler);
2231        }
2232    }
2233
2234    #[allow(dead_code)]
2235    pub(crate) fn add_depth10_subscription(
2236        &mut self,
2237        topic: MStr<Topic>,
2238        handler: TypedHandler<OrderBookDepth10>,
2239    ) {
2240        if self.depth10_handlers.contains_key(&topic) {
2241            log::warn!(
2242                "Actor {} attempted duplicate depth10 subscription to '{topic}'",
2243                self.actor_id
2244            );
2245            return;
2246        }
2247        self.depth10_handlers.insert(topic, handler.clone());
2248        msgbus::subscribe_book_depth10(topic.into(), handler, None);
2249    }
2250
2251    #[allow(dead_code)]
2252    pub(crate) fn remove_depth10_subscription(&mut self, topic: MStr<Topic>) {
2253        if let Some(handler) = self.depth10_handlers.remove(&topic) {
2254            msgbus::unsubscribe_book_depth10(topic.into(), &handler);
2255        }
2256    }
2257
2258    pub(crate) fn add_instrument_subscription(
2259        &mut self,
2260        topic: MStr<Topic>,
2261        handler: ShareableMessageHandler,
2262    ) {
2263        if self.topic_handlers.contains_key(&topic) {
2264            log::warn!(
2265                "Actor {} attempted duplicate instrument subscription to '{topic}'",
2266                self.actor_id
2267            );
2268            return;
2269        }
2270        self.topic_handlers.insert(topic, handler.clone());
2271        msgbus::subscribe_any(topic.into(), handler, None);
2272    }
2273
2274    #[allow(dead_code)]
2275    pub(crate) fn remove_instrument_subscription(&mut self, topic: MStr<Topic>) {
2276        if let Some(handler) = self.topic_handlers.remove(&topic) {
2277            msgbus::unsubscribe_any(topic.into(), handler);
2278        }
2279    }
2280
2281    pub(crate) fn add_instrument_close_subscription(
2282        &mut self,
2283        topic: MStr<Topic>,
2284        handler: ShareableMessageHandler,
2285    ) {
2286        if self.topic_handlers.contains_key(&topic) {
2287            log::warn!(
2288                "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2289                self.actor_id
2290            );
2291            return;
2292        }
2293        self.topic_handlers.insert(topic, handler.clone());
2294        msgbus::subscribe_any(topic.into(), handler, None);
2295    }
2296
2297    #[allow(dead_code)]
2298    pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2299        if let Some(handler) = self.topic_handlers.remove(&topic) {
2300            msgbus::unsubscribe_any(topic.into(), handler);
2301        }
2302    }
2303
2304    pub(crate) fn add_book_snapshot_subscription(
2305        &mut self,
2306        topic: MStr<Topic>,
2307        handler: TypedHandler<OrderBook>,
2308    ) {
2309        if self.book_handlers.contains_key(&topic) {
2310            log::warn!(
2311                "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2312                self.actor_id
2313            );
2314            return;
2315        }
2316        self.book_handlers.insert(topic, handler.clone());
2317        msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2318    }
2319
2320    #[allow(dead_code)]
2321    pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2322        if let Some(handler) = self.book_handlers.remove(&topic) {
2323            msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2324        }
2325    }
2326
2327    pub(crate) fn add_mark_price_subscription(
2328        &mut self,
2329        topic: MStr<Topic>,
2330        handler: TypedHandler<MarkPriceUpdate>,
2331    ) {
2332        if self.mark_price_handlers.contains_key(&topic) {
2333            log::warn!(
2334                "Actor {} attempted duplicate mark price subscription to '{topic}'",
2335                self.actor_id
2336            );
2337            return;
2338        }
2339        self.mark_price_handlers.insert(topic, handler.clone());
2340        msgbus::subscribe_mark_prices(topic.into(), handler, None);
2341    }
2342
2343    #[allow(dead_code)]
2344    pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2345        if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2346            msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2347        }
2348    }
2349
2350    pub(crate) fn add_index_price_subscription(
2351        &mut self,
2352        topic: MStr<Topic>,
2353        handler: TypedHandler<IndexPriceUpdate>,
2354    ) {
2355        if self.index_price_handlers.contains_key(&topic) {
2356            log::warn!(
2357                "Actor {} attempted duplicate index price subscription to '{topic}'",
2358                self.actor_id
2359            );
2360            return;
2361        }
2362        self.index_price_handlers.insert(topic, handler.clone());
2363        msgbus::subscribe_index_prices(topic.into(), handler, None);
2364    }
2365
2366    #[allow(dead_code)]
2367    pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2368        if let Some(handler) = self.index_price_handlers.remove(&topic) {
2369            msgbus::unsubscribe_index_prices(topic.into(), &handler);
2370        }
2371    }
2372
2373    pub(crate) fn add_funding_rate_subscription(
2374        &mut self,
2375        topic: MStr<Topic>,
2376        handler: TypedHandler<FundingRateUpdate>,
2377    ) {
2378        if self.funding_rate_handlers.contains_key(&topic) {
2379            log::warn!(
2380                "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2381                self.actor_id
2382            );
2383            return;
2384        }
2385        self.funding_rate_handlers.insert(topic, handler.clone());
2386        msgbus::subscribe_funding_rates(topic.into(), handler, None);
2387    }
2388
2389    #[allow(dead_code)]
2390    pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2391        if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2392            msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2393        }
2394    }
2395
2396    #[cfg(feature = "defi")]
2397    pub(crate) fn add_block_subscription(
2398        &mut self,
2399        topic: MStr<Topic>,
2400        handler: TypedHandler<Block>,
2401    ) {
2402        if self.block_handlers.contains_key(&topic) {
2403            log::warn!(
2404                "Actor {} attempted duplicate block subscription to '{topic}'",
2405                self.actor_id
2406            );
2407            return;
2408        }
2409        self.block_handlers.insert(topic, handler.clone());
2410        msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2411    }
2412
2413    #[cfg(feature = "defi")]
2414    #[allow(dead_code)]
2415    pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2416        if let Some(handler) = self.block_handlers.remove(&topic) {
2417            msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2418        }
2419    }
2420
2421    #[cfg(feature = "defi")]
2422    pub(crate) fn add_pool_subscription(
2423        &mut self,
2424        topic: MStr<Topic>,
2425        handler: TypedHandler<Pool>,
2426    ) {
2427        if self.pool_handlers.contains_key(&topic) {
2428            log::warn!(
2429                "Actor {} attempted duplicate pool subscription to '{topic}'",
2430                self.actor_id
2431            );
2432            return;
2433        }
2434        self.pool_handlers.insert(topic, handler.clone());
2435        msgbus::subscribe_defi_pools(topic.into(), handler, None);
2436    }
2437
2438    #[cfg(feature = "defi")]
2439    #[allow(dead_code)]
2440    pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2441        if let Some(handler) = self.pool_handlers.remove(&topic) {
2442            msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2443        }
2444    }
2445
2446    #[cfg(feature = "defi")]
2447    pub(crate) fn add_pool_swap_subscription(
2448        &mut self,
2449        topic: MStr<Topic>,
2450        handler: TypedHandler<PoolSwap>,
2451    ) {
2452        if self.pool_swap_handlers.contains_key(&topic) {
2453            log::warn!(
2454                "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2455                self.actor_id
2456            );
2457            return;
2458        }
2459        self.pool_swap_handlers.insert(topic, handler.clone());
2460        msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2461    }
2462
2463    #[cfg(feature = "defi")]
2464    #[allow(dead_code)]
2465    pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2466        if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2467            msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2468        }
2469    }
2470
2471    #[cfg(feature = "defi")]
2472    pub(crate) fn add_pool_liquidity_subscription(
2473        &mut self,
2474        topic: MStr<Topic>,
2475        handler: TypedHandler<PoolLiquidityUpdate>,
2476    ) {
2477        if self.pool_liquidity_handlers.contains_key(&topic) {
2478            log::warn!(
2479                "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2480                self.actor_id
2481            );
2482            return;
2483        }
2484        self.pool_liquidity_handlers.insert(topic, handler.clone());
2485        msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2486    }
2487
2488    #[cfg(feature = "defi")]
2489    #[allow(dead_code)]
2490    pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2491        if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2492            msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2493        }
2494    }
2495
2496    #[cfg(feature = "defi")]
2497    pub(crate) fn add_pool_collect_subscription(
2498        &mut self,
2499        topic: MStr<Topic>,
2500        handler: TypedHandler<PoolFeeCollect>,
2501    ) {
2502        if self.pool_collect_handlers.contains_key(&topic) {
2503            log::warn!(
2504                "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2505                self.actor_id
2506            );
2507            return;
2508        }
2509        self.pool_collect_handlers.insert(topic, handler.clone());
2510        msgbus::subscribe_defi_collects(topic.into(), handler, None);
2511    }
2512
2513    #[cfg(feature = "defi")]
2514    #[allow(dead_code)]
2515    pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2516        if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2517            msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2518        }
2519    }
2520
2521    #[cfg(feature = "defi")]
2522    pub(crate) fn add_pool_flash_subscription(
2523        &mut self,
2524        topic: MStr<Topic>,
2525        handler: TypedHandler<PoolFlash>,
2526    ) {
2527        if self.pool_flash_handlers.contains_key(&topic) {
2528            log::warn!(
2529                "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2530                self.actor_id
2531            );
2532            return;
2533        }
2534        self.pool_flash_handlers.insert(topic, handler.clone());
2535        msgbus::subscribe_defi_flash(topic.into(), handler, None);
2536    }
2537
2538    #[cfg(feature = "defi")]
2539    #[allow(dead_code)]
2540    pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2541        if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2542            msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2543        }
2544    }
2545
2546    /// Creates a new [`DataActorCore`] instance.
2547    pub fn new(config: DataActorConfig) -> Self {
2548        let actor_id = config
2549            .actor_id
2550            .unwrap_or_else(|| Self::default_actor_id(&config));
2551
2552        Self {
2553            actor_id,
2554            config,
2555            trader_id: None, // None until registered
2556            clock: None,     // None until registered
2557            cache: None,     // None until registered
2558            state: ComponentState::default(),
2559            topic_handlers: AHashMap::new(),
2560            deltas_handlers: AHashMap::new(),
2561            depth10_handlers: AHashMap::new(),
2562            book_handlers: AHashMap::new(),
2563            quote_handlers: AHashMap::new(),
2564            trade_handlers: AHashMap::new(),
2565            bar_handlers: AHashMap::new(),
2566            mark_price_handlers: AHashMap::new(),
2567            index_price_handlers: AHashMap::new(),
2568            funding_rate_handlers: AHashMap::new(),
2569            order_event_handlers: AHashMap::new(),
2570            #[cfg(feature = "defi")]
2571            block_handlers: AHashMap::new(),
2572            #[cfg(feature = "defi")]
2573            pool_handlers: AHashMap::new(),
2574            #[cfg(feature = "defi")]
2575            pool_swap_handlers: AHashMap::new(),
2576            #[cfg(feature = "defi")]
2577            pool_liquidity_handlers: AHashMap::new(),
2578            #[cfg(feature = "defi")]
2579            pool_collect_handlers: AHashMap::new(),
2580            #[cfg(feature = "defi")]
2581            pool_flash_handlers: AHashMap::new(),
2582            warning_events: AHashSet::new(),
2583            pending_requests: AHashMap::new(),
2584            signal_classes: AHashMap::new(),
2585            #[cfg(feature = "indicators")]
2586            indicators: Indicators::default(),
2587        }
2588    }
2589
2590    /// Returns the memory address of this instance as a hexadecimal string.
2591    #[must_use]
2592    pub fn mem_address(&self) -> String {
2593        format!("{self:p}")
2594    }
2595
2596    /// Returns the actors state.
2597    pub fn state(&self) -> ComponentState {
2598        self.state
2599    }
2600
2601    /// Returns the trader ID this actor is registered to.
2602    pub fn trader_id(&self) -> Option<TraderId> {
2603        self.trader_id
2604    }
2605
2606    /// Returns the actors ID.
2607    pub fn actor_id(&self) -> ActorId {
2608        self.actor_id
2609    }
2610
2611    fn default_actor_id(config: &DataActorConfig) -> ActorId {
2612        let memory_address = std::ptr::from_ref(config) as usize;
2613        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2614    }
2615
2616    /// Returns a UNIX nanoseconds timestamp from the actor's internal clock.
2617    pub fn timestamp_ns(&self) -> UnixNanos {
2618        self.clock_ref().timestamp_ns()
2619    }
2620
2621    /// Returns the clock for the actor (if registered).
2622    ///
2623    /// # Panics
2624    ///
2625    /// Panics if the actor has not been registered with a trader.
2626    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2627        self.clock
2628            .as_ref()
2629            .unwrap_or_else(|| {
2630                panic!(
2631                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2632                    self.actor_id, self.trader_id
2633                )
2634            })
2635            .borrow_mut()
2636    }
2637
2638    /// Returns a clone of the reference-counted clock.
2639    ///
2640    /// # Panics
2641    ///
2642    /// Panics if the actor has not yet been registered (clock is `None`).
2643    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2644        self.clock
2645            .as_ref()
2646            .expect("DataActor must be registered before accessing clock")
2647            .clone()
2648    }
2649
2650    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2651        self.clock
2652            .as_ref()
2653            .unwrap_or_else(|| {
2654                panic!(
2655                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2656                    self.actor_id, self.trader_id
2657                )
2658            })
2659            .borrow()
2660    }
2661
2662    /// Returns a read-only reference to the cache.
2663    ///
2664    /// # Panics
2665    ///
2666    /// Panics if the actor has not yet been registered (cache is `None`).
2667    pub fn cache(&self) -> Ref<'_, Cache> {
2668        self.cache
2669            .as_ref()
2670            .expect("DataActor must be registered before accessing cache")
2671            .borrow()
2672    }
2673
2674    /// Returns a clone of the reference-counted cache.
2675    ///
2676    /// # Panics
2677    ///
2678    /// Panics if the actor has not yet been registered (cache is `None`).
2679    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2680        self.cache
2681            .as_ref()
2682            .expect("DataActor must be registered before accessing cache")
2683            .clone()
2684    }
2685
2686    // -- REGISTRATION ----------------------------------------------------------------------------
2687
2688    /// Register the data actor with a trader.
2689    ///
2690    /// # Errors
2691    ///
2692    /// Returns an error if the actor has already been registered with a trader
2693    /// or if the provided dependencies are invalid.
2694    pub fn register(
2695        &mut self,
2696        trader_id: TraderId,
2697        clock: Rc<RefCell<dyn Clock>>,
2698        cache: Rc<RefCell<Cache>>,
2699    ) -> anyhow::Result<()> {
2700        if let Some(existing_trader_id) = self.trader_id {
2701            anyhow::bail!(
2702                "DataActor {} already registered with trader {existing_trader_id}",
2703                self.actor_id
2704            );
2705        }
2706
2707        // Validate clock by attempting to access it
2708        {
2709            let _timestamp = clock.borrow().timestamp_ns();
2710        }
2711
2712        // Validate cache by attempting to access it
2713        {
2714            let _cache_borrow = cache.borrow();
2715        }
2716
2717        self.trader_id = Some(trader_id);
2718        self.clock = Some(clock);
2719        self.cache = Some(cache);
2720
2721        // Verify complete registration
2722        if !self.is_properly_registered() {
2723            anyhow::bail!(
2724                "DataActor {} registration incomplete - validation failed",
2725                self.actor_id
2726            );
2727        }
2728
2729        log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2730        Ok(())
2731    }
2732
2733    /// Register an event type for warning log levels.
2734    pub fn register_warning_event(&mut self, event_type: &str) {
2735        self.warning_events.insert(event_type.to_string());
2736        log::debug!("Registered event type '{event_type}' for warning logs");
2737    }
2738
2739    /// Deregister an event type from warning log levels.
2740    pub fn deregister_warning_event(&mut self, event_type: &str) {
2741        self.warning_events.remove(event_type);
2742        log::debug!("Deregistered event type '{event_type}' from warning logs");
2743    }
2744
2745    pub fn is_registered(&self) -> bool {
2746        self.trader_id.is_some()
2747    }
2748
2749    pub(crate) fn check_registered(&self) {
2750        assert!(
2751            self.is_registered(),
2752            "Actor has not been registered with a Trader"
2753        );
2754    }
2755
2756    /// Validates registration state without panicking.
2757    fn is_properly_registered(&self) -> bool {
2758        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2759    }
2760
2761    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2762        if self.config.log_commands {
2763            log::info!("{CMD}{SEND} {command:?}");
2764        }
2765
2766        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2767        msgbus::send_data_command(endpoint, command);
2768    }
2769
2770    #[allow(dead_code)]
2771    fn send_data_req(&self, request: RequestCommand) {
2772        if self.config.log_commands {
2773            log::info!("{REQ}{SEND} {request:?}");
2774        }
2775
2776        // For now, simplified approach - data requests without dynamic handlers
2777        // TODO: Implement proper dynamic dispatch for response handlers
2778        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2779        msgbus::send_any(endpoint, request.as_any());
2780    }
2781
2782    /// Sends a shutdown command to the system with an optional reason.
2783    ///
2784    /// # Panics
2785    ///
2786    /// Panics if the actor is not registered or has no trader ID.
2787    pub fn shutdown_system(&self, reason: Option<String>) {
2788        self.check_registered();
2789
2790        // SAFETY: Checked registered before unwrapping trader ID
2791        let command = ShutdownSystem::new(
2792            self.trader_id().unwrap(),
2793            self.actor_id.inner(),
2794            reason,
2795            UUID4::new(),
2796            self.timestamp_ns(),
2797        );
2798
2799        let endpoint = "command.system.shutdown".into();
2800        msgbus::send_any(endpoint, command.as_any());
2801    }
2802
2803    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
2804
2805    /// Helper method for registering data subscriptions from the trait.
2806    ///
2807    /// # Panics
2808    ///
2809    /// Panics if the actor is not properly registered.
2810    pub fn subscribe_data(
2811        &mut self,
2812        handler: ShareableMessageHandler,
2813        data_type: DataType,
2814        client_id: Option<ClientId>,
2815        params: Option<IndexMap<String, String>>,
2816    ) {
2817        assert!(
2818            self.is_properly_registered(),
2819            "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2820            self.actor_id,
2821            self.trader_id,
2822            self.clock.is_some(),
2823            self.cache.is_some()
2824        );
2825
2826        let topic = get_custom_topic(&data_type);
2827        self.add_subscription_any(topic, handler);
2828
2829        // If no client ID specified, just subscribe to the topic
2830        if client_id.is_none() {
2831            return;
2832        }
2833
2834        let command = SubscribeCommand::Data(SubscribeCustomData {
2835            data_type,
2836            client_id,
2837            venue: None,
2838            command_id: UUID4::new(),
2839            ts_init: self.timestamp_ns(),
2840            correlation_id: None,
2841            params,
2842        });
2843
2844        self.send_data_cmd(DataCommand::Subscribe(command));
2845    }
2846
2847    /// Helper method for registering quotes subscriptions from the trait.
2848    pub fn subscribe_quotes(
2849        &mut self,
2850        topic: MStr<Topic>,
2851        handler: TypedHandler<QuoteTick>,
2852        instrument_id: InstrumentId,
2853        client_id: Option<ClientId>,
2854        params: Option<IndexMap<String, String>>,
2855    ) {
2856        self.check_registered();
2857
2858        self.add_quote_subscription(topic, handler);
2859
2860        let command = SubscribeCommand::Quotes(SubscribeQuotes {
2861            instrument_id,
2862            client_id,
2863            venue: Some(instrument_id.venue),
2864            command_id: UUID4::new(),
2865            ts_init: self.timestamp_ns(),
2866            correlation_id: None,
2867            params,
2868        });
2869
2870        self.send_data_cmd(DataCommand::Subscribe(command));
2871    }
2872
2873    /// Helper method for registering instruments subscriptions from the trait.
2874    pub fn subscribe_instruments(
2875        &mut self,
2876        topic: MStr<Topic>,
2877        handler: ShareableMessageHandler,
2878        venue: Venue,
2879        client_id: Option<ClientId>,
2880        params: Option<IndexMap<String, String>>,
2881    ) {
2882        self.check_registered();
2883
2884        self.add_instrument_subscription(topic, handler);
2885
2886        let command = SubscribeCommand::Instruments(SubscribeInstruments {
2887            client_id,
2888            venue,
2889            command_id: UUID4::new(),
2890            ts_init: self.timestamp_ns(),
2891            correlation_id: None,
2892            params,
2893        });
2894
2895        self.send_data_cmd(DataCommand::Subscribe(command));
2896    }
2897
2898    /// Helper method for registering instrument subscriptions from the trait.
2899    pub fn subscribe_instrument(
2900        &mut self,
2901        topic: MStr<Topic>,
2902        handler: ShareableMessageHandler,
2903        instrument_id: InstrumentId,
2904        client_id: Option<ClientId>,
2905        params: Option<IndexMap<String, String>>,
2906    ) {
2907        self.check_registered();
2908
2909        self.add_instrument_subscription(topic, handler);
2910
2911        let command = SubscribeCommand::Instrument(SubscribeInstrument {
2912            instrument_id,
2913            client_id,
2914            venue: Some(instrument_id.venue),
2915            command_id: UUID4::new(),
2916            ts_init: self.timestamp_ns(),
2917            correlation_id: None,
2918            params,
2919        });
2920
2921        self.send_data_cmd(DataCommand::Subscribe(command));
2922    }
2923
2924    /// Helper method for registering book deltas subscriptions from the trait.
2925    #[allow(clippy::too_many_arguments)]
2926    pub fn subscribe_book_deltas(
2927        &mut self,
2928        topic: MStr<Topic>,
2929        handler: TypedHandler<OrderBookDeltas>,
2930        instrument_id: InstrumentId,
2931        book_type: BookType,
2932        depth: Option<NonZeroUsize>,
2933        client_id: Option<ClientId>,
2934        managed: bool,
2935        params: Option<IndexMap<String, String>>,
2936    ) {
2937        self.check_registered();
2938
2939        self.add_deltas_subscription(topic, handler);
2940
2941        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2942            instrument_id,
2943            book_type,
2944            client_id,
2945            venue: Some(instrument_id.venue),
2946            command_id: UUID4::new(),
2947            ts_init: self.timestamp_ns(),
2948            depth,
2949            managed,
2950            correlation_id: None,
2951            params,
2952        });
2953
2954        self.send_data_cmd(DataCommand::Subscribe(command));
2955    }
2956
2957    /// Helper method for registering book snapshots subscriptions from the trait.
2958    #[allow(clippy::too_many_arguments)]
2959    pub fn subscribe_book_at_interval(
2960        &mut self,
2961        topic: MStr<Topic>,
2962        handler: TypedHandler<OrderBook>,
2963        instrument_id: InstrumentId,
2964        book_type: BookType,
2965        depth: Option<NonZeroUsize>,
2966        interval_ms: NonZeroUsize,
2967        client_id: Option<ClientId>,
2968        params: Option<IndexMap<String, String>>,
2969    ) {
2970        self.check_registered();
2971
2972        self.add_book_snapshot_subscription(topic, handler);
2973
2974        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2975            instrument_id,
2976            book_type,
2977            client_id,
2978            venue: Some(instrument_id.venue),
2979            command_id: UUID4::new(),
2980            ts_init: self.timestamp_ns(),
2981            depth,
2982            interval_ms,
2983            correlation_id: None,
2984            params,
2985        });
2986
2987        self.send_data_cmd(DataCommand::Subscribe(command));
2988    }
2989
2990    /// Helper method for registering trades subscriptions from the trait.
2991    pub fn subscribe_trades(
2992        &mut self,
2993        topic: MStr<Topic>,
2994        handler: TypedHandler<TradeTick>,
2995        instrument_id: InstrumentId,
2996        client_id: Option<ClientId>,
2997        params: Option<IndexMap<String, String>>,
2998    ) {
2999        self.check_registered();
3000
3001        self.add_trade_subscription(topic, handler);
3002
3003        let command = SubscribeCommand::Trades(SubscribeTrades {
3004            instrument_id,
3005            client_id,
3006            venue: Some(instrument_id.venue),
3007            command_id: UUID4::new(),
3008            ts_init: self.timestamp_ns(),
3009            correlation_id: None,
3010            params,
3011        });
3012
3013        self.send_data_cmd(DataCommand::Subscribe(command));
3014    }
3015
3016    /// Helper method for registering bars subscriptions from the trait.
3017    pub fn subscribe_bars(
3018        &mut self,
3019        topic: MStr<Topic>,
3020        handler: TypedHandler<Bar>,
3021        bar_type: BarType,
3022        client_id: Option<ClientId>,
3023        params: Option<IndexMap<String, String>>,
3024    ) {
3025        self.check_registered();
3026
3027        self.add_bar_subscription(topic, handler);
3028
3029        let command = SubscribeCommand::Bars(SubscribeBars {
3030            bar_type,
3031            client_id,
3032            venue: Some(bar_type.instrument_id().venue),
3033            command_id: UUID4::new(),
3034            ts_init: self.timestamp_ns(),
3035            correlation_id: None,
3036            params,
3037        });
3038
3039        self.send_data_cmd(DataCommand::Subscribe(command));
3040    }
3041
3042    /// Helper method for registering mark prices subscriptions from the trait.
3043    pub fn subscribe_mark_prices(
3044        &mut self,
3045        topic: MStr<Topic>,
3046        handler: TypedHandler<MarkPriceUpdate>,
3047        instrument_id: InstrumentId,
3048        client_id: Option<ClientId>,
3049        params: Option<IndexMap<String, String>>,
3050    ) {
3051        self.check_registered();
3052
3053        self.add_mark_price_subscription(topic, handler);
3054
3055        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3056            instrument_id,
3057            client_id,
3058            venue: Some(instrument_id.venue),
3059            command_id: UUID4::new(),
3060            ts_init: self.timestamp_ns(),
3061            correlation_id: None,
3062            params,
3063        });
3064
3065        self.send_data_cmd(DataCommand::Subscribe(command));
3066    }
3067
3068    /// Helper method for registering index prices subscriptions from the trait.
3069    pub fn subscribe_index_prices(
3070        &mut self,
3071        topic: MStr<Topic>,
3072        handler: TypedHandler<IndexPriceUpdate>,
3073        instrument_id: InstrumentId,
3074        client_id: Option<ClientId>,
3075        params: Option<IndexMap<String, String>>,
3076    ) {
3077        self.check_registered();
3078
3079        self.add_index_price_subscription(topic, handler);
3080
3081        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3082            instrument_id,
3083            client_id,
3084            venue: Some(instrument_id.venue),
3085            command_id: UUID4::new(),
3086            ts_init: self.timestamp_ns(),
3087            correlation_id: None,
3088            params,
3089        });
3090
3091        self.send_data_cmd(DataCommand::Subscribe(command));
3092    }
3093
3094    /// Helper method for registering funding rates subscriptions from the trait.
3095    pub fn subscribe_funding_rates(
3096        &mut self,
3097        topic: MStr<Topic>,
3098        handler: TypedHandler<FundingRateUpdate>,
3099        instrument_id: InstrumentId,
3100        client_id: Option<ClientId>,
3101        params: Option<IndexMap<String, String>>,
3102    ) {
3103        self.check_registered();
3104
3105        self.add_funding_rate_subscription(topic, handler);
3106
3107        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3108            instrument_id,
3109            client_id,
3110            venue: Some(instrument_id.venue),
3111            command_id: UUID4::new(),
3112            ts_init: self.timestamp_ns(),
3113            correlation_id: None,
3114            params,
3115        });
3116
3117        self.send_data_cmd(DataCommand::Subscribe(command));
3118    }
3119
3120    /// Helper method for registering instrument status subscriptions from the trait.
3121    pub fn subscribe_instrument_status(
3122        &mut self,
3123        topic: MStr<Topic>,
3124        handler: ShareableMessageHandler,
3125        instrument_id: InstrumentId,
3126        client_id: Option<ClientId>,
3127        params: Option<IndexMap<String, String>>,
3128    ) {
3129        self.check_registered();
3130
3131        self.add_subscription_any(topic, handler);
3132
3133        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3134            instrument_id,
3135            client_id,
3136            venue: Some(instrument_id.venue),
3137            command_id: UUID4::new(),
3138            ts_init: self.timestamp_ns(),
3139            correlation_id: None,
3140            params,
3141        });
3142
3143        self.send_data_cmd(DataCommand::Subscribe(command));
3144    }
3145
3146    /// Helper method for registering instrument close subscriptions from the trait.
3147    pub fn subscribe_instrument_close(
3148        &mut self,
3149        topic: MStr<Topic>,
3150        handler: ShareableMessageHandler,
3151        instrument_id: InstrumentId,
3152        client_id: Option<ClientId>,
3153        params: Option<IndexMap<String, String>>,
3154    ) {
3155        self.check_registered();
3156
3157        self.add_instrument_close_subscription(topic, handler);
3158
3159        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3160            instrument_id,
3161            client_id,
3162            venue: Some(instrument_id.venue),
3163            command_id: UUID4::new(),
3164            ts_init: self.timestamp_ns(),
3165            correlation_id: None,
3166            params,
3167        });
3168
3169        self.send_data_cmd(DataCommand::Subscribe(command));
3170    }
3171
3172    /// Helper method for registering order fills subscriptions from the trait.
3173    pub fn subscribe_order_fills(
3174        &mut self,
3175        topic: MStr<Topic>,
3176        handler: TypedHandler<OrderEventAny>,
3177    ) {
3178        self.check_registered();
3179        self.add_order_event_subscription(topic, handler);
3180    }
3181
3182    /// Helper method for registering order cancels subscriptions from the trait.
3183    pub fn subscribe_order_cancels(
3184        &mut self,
3185        topic: MStr<Topic>,
3186        handler: TypedHandler<OrderEventAny>,
3187    ) {
3188        self.check_registered();
3189        self.add_order_event_subscription(topic, handler);
3190    }
3191
3192    /// Helper method for unsubscribing from data.
3193    pub fn unsubscribe_data(
3194        &mut self,
3195        data_type: DataType,
3196        client_id: Option<ClientId>,
3197        params: Option<IndexMap<String, String>>,
3198    ) {
3199        self.check_registered();
3200
3201        let topic = get_custom_topic(&data_type);
3202        self.remove_subscription_any(topic);
3203
3204        if client_id.is_none() {
3205            return;
3206        }
3207
3208        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3209            data_type,
3210            client_id,
3211            venue: None,
3212            command_id: UUID4::new(),
3213            ts_init: self.timestamp_ns(),
3214            correlation_id: None,
3215            params,
3216        });
3217
3218        self.send_data_cmd(DataCommand::Unsubscribe(command));
3219    }
3220
3221    /// Helper method for unsubscribing from instruments.
3222    pub fn unsubscribe_instruments(
3223        &mut self,
3224        venue: Venue,
3225        client_id: Option<ClientId>,
3226        params: Option<IndexMap<String, String>>,
3227    ) {
3228        self.check_registered();
3229
3230        let topic = get_instruments_topic(venue);
3231        self.remove_instrument_subscription(topic);
3232
3233        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3234            client_id,
3235            venue,
3236            command_id: UUID4::new(),
3237            ts_init: self.timestamp_ns(),
3238            correlation_id: None,
3239            params,
3240        });
3241
3242        self.send_data_cmd(DataCommand::Unsubscribe(command));
3243    }
3244
3245    /// Helper method for unsubscribing from instrument.
3246    pub fn unsubscribe_instrument(
3247        &mut self,
3248        instrument_id: InstrumentId,
3249        client_id: Option<ClientId>,
3250        params: Option<IndexMap<String, String>>,
3251    ) {
3252        self.check_registered();
3253
3254        let topic = get_instrument_topic(instrument_id);
3255        self.remove_instrument_subscription(topic);
3256
3257        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3258            instrument_id,
3259            client_id,
3260            venue: Some(instrument_id.venue),
3261            command_id: UUID4::new(),
3262            ts_init: self.timestamp_ns(),
3263            correlation_id: None,
3264            params,
3265        });
3266
3267        self.send_data_cmd(DataCommand::Unsubscribe(command));
3268    }
3269
3270    /// Helper method for unsubscribing from book deltas.
3271    pub fn unsubscribe_book_deltas(
3272        &mut self,
3273        instrument_id: InstrumentId,
3274        client_id: Option<ClientId>,
3275        params: Option<IndexMap<String, String>>,
3276    ) {
3277        self.check_registered();
3278
3279        let topic = get_book_deltas_topic(instrument_id);
3280        self.remove_deltas_subscription(topic);
3281
3282        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3283            instrument_id,
3284            client_id,
3285            venue: Some(instrument_id.venue),
3286            command_id: UUID4::new(),
3287            ts_init: self.timestamp_ns(),
3288            correlation_id: None,
3289            params,
3290        });
3291
3292        self.send_data_cmd(DataCommand::Unsubscribe(command));
3293    }
3294
3295    /// Helper method for unsubscribing from book snapshots at interval.
3296    pub fn unsubscribe_book_at_interval(
3297        &mut self,
3298        instrument_id: InstrumentId,
3299        interval_ms: NonZeroUsize,
3300        client_id: Option<ClientId>,
3301        params: Option<IndexMap<String, String>>,
3302    ) {
3303        self.check_registered();
3304
3305        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3306        self.remove_book_snapshot_subscription(topic);
3307
3308        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3309            instrument_id,
3310            client_id,
3311            venue: Some(instrument_id.venue),
3312            command_id: UUID4::new(),
3313            ts_init: self.timestamp_ns(),
3314            correlation_id: None,
3315            params,
3316        });
3317
3318        self.send_data_cmd(DataCommand::Unsubscribe(command));
3319    }
3320
3321    /// Helper method for unsubscribing from quotes.
3322    pub fn unsubscribe_quotes(
3323        &mut self,
3324        instrument_id: InstrumentId,
3325        client_id: Option<ClientId>,
3326        params: Option<IndexMap<String, String>>,
3327    ) {
3328        self.check_registered();
3329
3330        let topic = get_quotes_topic(instrument_id);
3331        self.remove_quote_subscription(topic);
3332
3333        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3334            instrument_id,
3335            client_id,
3336            venue: Some(instrument_id.venue),
3337            command_id: UUID4::new(),
3338            ts_init: self.timestamp_ns(),
3339            correlation_id: None,
3340            params,
3341        });
3342
3343        self.send_data_cmd(DataCommand::Unsubscribe(command));
3344    }
3345
3346    /// Helper method for unsubscribing from trades.
3347    pub fn unsubscribe_trades(
3348        &mut self,
3349        instrument_id: InstrumentId,
3350        client_id: Option<ClientId>,
3351        params: Option<IndexMap<String, String>>,
3352    ) {
3353        self.check_registered();
3354
3355        let topic = get_trades_topic(instrument_id);
3356        self.remove_trade_subscription(topic);
3357
3358        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3359            instrument_id,
3360            client_id,
3361            venue: Some(instrument_id.venue),
3362            command_id: UUID4::new(),
3363            ts_init: self.timestamp_ns(),
3364            correlation_id: None,
3365            params,
3366        });
3367
3368        self.send_data_cmd(DataCommand::Unsubscribe(command));
3369    }
3370
3371    /// Helper method for unsubscribing from bars.
3372    pub fn unsubscribe_bars(
3373        &mut self,
3374        bar_type: BarType,
3375        client_id: Option<ClientId>,
3376        params: Option<IndexMap<String, String>>,
3377    ) {
3378        self.check_registered();
3379
3380        let topic = get_bars_topic(bar_type);
3381        self.remove_bar_subscription(topic);
3382
3383        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3384            bar_type,
3385            client_id,
3386            venue: Some(bar_type.instrument_id().venue),
3387            command_id: UUID4::new(),
3388            ts_init: self.timestamp_ns(),
3389            correlation_id: None,
3390            params,
3391        });
3392
3393        self.send_data_cmd(DataCommand::Unsubscribe(command));
3394    }
3395
3396    /// Helper method for unsubscribing from mark prices.
3397    pub fn unsubscribe_mark_prices(
3398        &mut self,
3399        instrument_id: InstrumentId,
3400        client_id: Option<ClientId>,
3401        params: Option<IndexMap<String, String>>,
3402    ) {
3403        self.check_registered();
3404
3405        let topic = get_mark_price_topic(instrument_id);
3406        self.remove_mark_price_subscription(topic);
3407
3408        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3409            instrument_id,
3410            client_id,
3411            venue: Some(instrument_id.venue),
3412            command_id: UUID4::new(),
3413            ts_init: self.timestamp_ns(),
3414            correlation_id: None,
3415            params,
3416        });
3417
3418        self.send_data_cmd(DataCommand::Unsubscribe(command));
3419    }
3420
3421    /// Helper method for unsubscribing from index prices.
3422    pub fn unsubscribe_index_prices(
3423        &mut self,
3424        instrument_id: InstrumentId,
3425        client_id: Option<ClientId>,
3426        params: Option<IndexMap<String, String>>,
3427    ) {
3428        self.check_registered();
3429
3430        let topic = get_index_price_topic(instrument_id);
3431        self.remove_index_price_subscription(topic);
3432
3433        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3434            instrument_id,
3435            client_id,
3436            venue: Some(instrument_id.venue),
3437            command_id: UUID4::new(),
3438            ts_init: self.timestamp_ns(),
3439            correlation_id: None,
3440            params,
3441        });
3442
3443        self.send_data_cmd(DataCommand::Unsubscribe(command));
3444    }
3445
3446    /// Helper method for unsubscribing from funding rates.
3447    pub fn unsubscribe_funding_rates(
3448        &mut self,
3449        instrument_id: InstrumentId,
3450        client_id: Option<ClientId>,
3451        params: Option<IndexMap<String, String>>,
3452    ) {
3453        self.check_registered();
3454
3455        let topic = get_funding_rate_topic(instrument_id);
3456        self.remove_funding_rate_subscription(topic);
3457
3458        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3459            instrument_id,
3460            client_id,
3461            venue: Some(instrument_id.venue),
3462            command_id: UUID4::new(),
3463            ts_init: self.timestamp_ns(),
3464            correlation_id: None,
3465            params,
3466        });
3467
3468        self.send_data_cmd(DataCommand::Unsubscribe(command));
3469    }
3470
3471    /// Helper method for unsubscribing from instrument status.
3472    pub fn unsubscribe_instrument_status(
3473        &mut self,
3474        instrument_id: InstrumentId,
3475        client_id: Option<ClientId>,
3476        params: Option<IndexMap<String, String>>,
3477    ) {
3478        self.check_registered();
3479
3480        let topic = get_instrument_status_topic(instrument_id);
3481        self.remove_subscription_any(topic);
3482
3483        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3484            instrument_id,
3485            client_id,
3486            venue: Some(instrument_id.venue),
3487            command_id: UUID4::new(),
3488            ts_init: self.timestamp_ns(),
3489            correlation_id: None,
3490            params,
3491        });
3492
3493        self.send_data_cmd(DataCommand::Unsubscribe(command));
3494    }
3495
3496    /// Helper method for unsubscribing from instrument close.
3497    pub fn unsubscribe_instrument_close(
3498        &mut self,
3499        instrument_id: InstrumentId,
3500        client_id: Option<ClientId>,
3501        params: Option<IndexMap<String, String>>,
3502    ) {
3503        self.check_registered();
3504
3505        let topic = get_instrument_close_topic(instrument_id);
3506        self.remove_instrument_close_subscription(topic);
3507
3508        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3509            instrument_id,
3510            client_id,
3511            venue: Some(instrument_id.venue),
3512            command_id: UUID4::new(),
3513            ts_init: self.timestamp_ns(),
3514            correlation_id: None,
3515            params,
3516        });
3517
3518        self.send_data_cmd(DataCommand::Unsubscribe(command));
3519    }
3520
3521    /// Helper method for unsubscribing from order fills.
3522    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3523        self.check_registered();
3524
3525        let topic = get_order_fills_topic(instrument_id);
3526        self.remove_order_event_subscription(topic);
3527    }
3528
3529    /// Helper method for unsubscribing from order cancels.
3530    pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3531        self.check_registered();
3532
3533        let topic = get_order_cancels_topic(instrument_id);
3534        self.remove_order_event_subscription(topic);
3535    }
3536
3537    /// Helper method for requesting data.
3538    ///
3539    /// # Errors
3540    ///
3541    /// Returns an error if input parameters are invalid.
3542    #[allow(clippy::too_many_arguments)]
3543    pub fn request_data(
3544        &self,
3545        data_type: DataType,
3546        client_id: ClientId,
3547        start: Option<DateTime<Utc>>,
3548        end: Option<DateTime<Utc>>,
3549        limit: Option<NonZeroUsize>,
3550        params: Option<IndexMap<String, String>>,
3551        handler: ShareableMessageHandler,
3552    ) -> anyhow::Result<UUID4> {
3553        self.check_registered();
3554
3555        let now = self.clock_ref().utc_now();
3556        check_timestamps(now, start, end)?;
3557
3558        let request_id = UUID4::new();
3559        let command = RequestCommand::Data(RequestCustomData {
3560            client_id,
3561            data_type,
3562            start,
3563            end,
3564            limit,
3565            request_id,
3566            ts_init: self.timestamp_ns(),
3567            params,
3568        });
3569
3570        get_message_bus()
3571            .borrow_mut()
3572            .register_response_handler(command.request_id(), handler)?;
3573
3574        self.send_data_cmd(DataCommand::Request(command));
3575
3576        Ok(request_id)
3577    }
3578
3579    /// Helper method for requesting instrument.
3580    ///
3581    /// # Errors
3582    ///
3583    /// Returns an error if input parameters are invalid.
3584    pub fn request_instrument(
3585        &self,
3586        instrument_id: InstrumentId,
3587        start: Option<DateTime<Utc>>,
3588        end: Option<DateTime<Utc>>,
3589        client_id: Option<ClientId>,
3590        params: Option<IndexMap<String, String>>,
3591        handler: ShareableMessageHandler,
3592    ) -> anyhow::Result<UUID4> {
3593        self.check_registered();
3594
3595        let now = self.clock_ref().utc_now();
3596        check_timestamps(now, start, end)?;
3597
3598        let request_id = UUID4::new();
3599        let command = RequestCommand::Instrument(RequestInstrument {
3600            instrument_id,
3601            start,
3602            end,
3603            client_id,
3604            request_id,
3605            ts_init: now.into(),
3606            params,
3607        });
3608
3609        get_message_bus()
3610            .borrow_mut()
3611            .register_response_handler(command.request_id(), handler)?;
3612
3613        self.send_data_cmd(DataCommand::Request(command));
3614
3615        Ok(request_id)
3616    }
3617
3618    /// Helper method for requesting instruments.
3619    ///
3620    /// # Errors
3621    ///
3622    /// Returns an error if input parameters are invalid.
3623    pub fn request_instruments(
3624        &self,
3625        venue: Option<Venue>,
3626        start: Option<DateTime<Utc>>,
3627        end: Option<DateTime<Utc>>,
3628        client_id: Option<ClientId>,
3629        params: Option<IndexMap<String, String>>,
3630        handler: ShareableMessageHandler,
3631    ) -> anyhow::Result<UUID4> {
3632        self.check_registered();
3633
3634        let now = self.clock_ref().utc_now();
3635        check_timestamps(now, start, end)?;
3636
3637        let request_id = UUID4::new();
3638        let command = RequestCommand::Instruments(RequestInstruments {
3639            venue,
3640            start,
3641            end,
3642            client_id,
3643            request_id,
3644            ts_init: now.into(),
3645            params,
3646        });
3647
3648        get_message_bus()
3649            .borrow_mut()
3650            .register_response_handler(command.request_id(), handler)?;
3651
3652        self.send_data_cmd(DataCommand::Request(command));
3653
3654        Ok(request_id)
3655    }
3656
3657    /// Helper method for requesting book snapshot.
3658    ///
3659    /// # Errors
3660    ///
3661    /// Returns an error if input parameters are invalid.
3662    pub fn request_book_snapshot(
3663        &self,
3664        instrument_id: InstrumentId,
3665        depth: Option<NonZeroUsize>,
3666        client_id: Option<ClientId>,
3667        params: Option<IndexMap<String, String>>,
3668        handler: ShareableMessageHandler,
3669    ) -> anyhow::Result<UUID4> {
3670        self.check_registered();
3671
3672        let request_id = UUID4::new();
3673        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3674            instrument_id,
3675            depth,
3676            client_id,
3677            request_id,
3678            ts_init: self.timestamp_ns(),
3679            params,
3680        });
3681
3682        get_message_bus()
3683            .borrow_mut()
3684            .register_response_handler(command.request_id(), handler)?;
3685
3686        self.send_data_cmd(DataCommand::Request(command));
3687
3688        Ok(request_id)
3689    }
3690
3691    /// Helper method for requesting quotes.
3692    ///
3693    /// # Errors
3694    ///
3695    /// Returns an error if input parameters are invalid.
3696    #[allow(clippy::too_many_arguments)]
3697    pub fn request_quotes(
3698        &self,
3699        instrument_id: InstrumentId,
3700        start: Option<DateTime<Utc>>,
3701        end: Option<DateTime<Utc>>,
3702        limit: Option<NonZeroUsize>,
3703        client_id: Option<ClientId>,
3704        params: Option<IndexMap<String, String>>,
3705        handler: ShareableMessageHandler,
3706    ) -> anyhow::Result<UUID4> {
3707        self.check_registered();
3708
3709        let now = self.clock_ref().utc_now();
3710        check_timestamps(now, start, end)?;
3711
3712        let request_id = UUID4::new();
3713        let command = RequestCommand::Quotes(RequestQuotes {
3714            instrument_id,
3715            start,
3716            end,
3717            limit,
3718            client_id,
3719            request_id,
3720            ts_init: now.into(),
3721            params,
3722        });
3723
3724        get_message_bus()
3725            .borrow_mut()
3726            .register_response_handler(command.request_id(), handler)?;
3727
3728        self.send_data_cmd(DataCommand::Request(command));
3729
3730        Ok(request_id)
3731    }
3732
3733    /// Helper method for requesting trades.
3734    ///
3735    /// # Errors
3736    ///
3737    /// Returns an error if input parameters are invalid.
3738    #[allow(clippy::too_many_arguments)]
3739    pub fn request_trades(
3740        &self,
3741        instrument_id: InstrumentId,
3742        start: Option<DateTime<Utc>>,
3743        end: Option<DateTime<Utc>>,
3744        limit: Option<NonZeroUsize>,
3745        client_id: Option<ClientId>,
3746        params: Option<IndexMap<String, String>>,
3747        handler: ShareableMessageHandler,
3748    ) -> anyhow::Result<UUID4> {
3749        self.check_registered();
3750
3751        let now = self.clock_ref().utc_now();
3752        check_timestamps(now, start, end)?;
3753
3754        let request_id = UUID4::new();
3755        let command = RequestCommand::Trades(RequestTrades {
3756            instrument_id,
3757            start,
3758            end,
3759            limit,
3760            client_id,
3761            request_id,
3762            ts_init: now.into(),
3763            params,
3764        });
3765
3766        get_message_bus()
3767            .borrow_mut()
3768            .register_response_handler(command.request_id(), handler)?;
3769
3770        self.send_data_cmd(DataCommand::Request(command));
3771
3772        Ok(request_id)
3773    }
3774
3775    /// Helper method for requesting funding rates.
3776    ///
3777    /// # Errors
3778    ///
3779    /// Returns an error if input parameters are invalid.
3780    #[allow(clippy::too_many_arguments)]
3781    pub fn request_funding_rates(
3782        &self,
3783        instrument_id: InstrumentId,
3784        start: Option<DateTime<Utc>>,
3785        end: Option<DateTime<Utc>>,
3786        limit: Option<NonZeroUsize>,
3787        client_id: Option<ClientId>,
3788        params: Option<IndexMap<String, String>>,
3789        handler: ShareableMessageHandler,
3790    ) -> anyhow::Result<UUID4> {
3791        self.check_registered();
3792
3793        let now = self.clock_ref().utc_now();
3794        check_timestamps(now, start, end)?;
3795
3796        let request_id = UUID4::new();
3797        let command = RequestCommand::FundingRates(RequestFundingRates {
3798            instrument_id,
3799            start,
3800            end,
3801            limit,
3802            client_id,
3803            request_id,
3804            ts_init: now.into(),
3805            params,
3806        });
3807
3808        get_message_bus()
3809            .borrow_mut()
3810            .register_response_handler(command.request_id(), handler)?;
3811
3812        self.send_data_cmd(DataCommand::Request(command));
3813
3814        Ok(request_id)
3815    }
3816
3817    /// Helper method for requesting bars.
3818    ///
3819    /// # Errors
3820    ///
3821    /// Returns an error if input parameters are invalid.
3822    #[allow(clippy::too_many_arguments)]
3823    pub fn request_bars(
3824        &self,
3825        bar_type: BarType,
3826        start: Option<DateTime<Utc>>,
3827        end: Option<DateTime<Utc>>,
3828        limit: Option<NonZeroUsize>,
3829        client_id: Option<ClientId>,
3830        params: Option<IndexMap<String, String>>,
3831        handler: ShareableMessageHandler,
3832    ) -> anyhow::Result<UUID4> {
3833        self.check_registered();
3834
3835        let now = self.clock_ref().utc_now();
3836        check_timestamps(now, start, end)?;
3837
3838        let request_id = UUID4::new();
3839        let command = RequestCommand::Bars(RequestBars {
3840            bar_type,
3841            start,
3842            end,
3843            limit,
3844            client_id,
3845            request_id,
3846            ts_init: now.into(),
3847            params,
3848        });
3849
3850        get_message_bus()
3851            .borrow_mut()
3852            .register_response_handler(command.request_id(), handler)?;
3853
3854        self.send_data_cmd(DataCommand::Request(command));
3855
3856        Ok(request_id)
3857    }
3858
3859    #[cfg(test)]
3860    pub fn quote_handler_count(&self) -> usize {
3861        self.quote_handlers.len()
3862    }
3863
3864    #[cfg(test)]
3865    pub fn trade_handler_count(&self) -> usize {
3866        self.trade_handlers.len()
3867    }
3868
3869    #[cfg(test)]
3870    pub fn bar_handler_count(&self) -> usize {
3871        self.bar_handlers.len()
3872    }
3873
3874    #[cfg(test)]
3875    pub fn deltas_handler_count(&self) -> usize {
3876        self.deltas_handlers.len()
3877    }
3878
3879    #[cfg(test)]
3880    pub fn has_quote_handler(&self, topic: &str) -> bool {
3881        self.quote_handlers
3882            .contains_key(&MStr::<Topic>::from(topic))
3883    }
3884
3885    #[cfg(test)]
3886    pub fn has_trade_handler(&self, topic: &str) -> bool {
3887        self.trade_handlers
3888            .contains_key(&MStr::<Topic>::from(topic))
3889    }
3890
3891    #[cfg(test)]
3892    pub fn has_bar_handler(&self, topic: &str) -> bool {
3893        self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
3894    }
3895
3896    #[cfg(test)]
3897    pub fn has_deltas_handler(&self, topic: &str) -> bool {
3898        self.deltas_handlers
3899            .contains_key(&MStr::<Topic>::from(topic))
3900    }
3901}
3902
3903fn check_timestamps(
3904    now: DateTime<Utc>,
3905    start: Option<DateTime<Utc>>,
3906    end: Option<DateTime<Utc>>,
3907) -> anyhow::Result<()> {
3908    if let Some(start) = start {
3909        check_predicate_true(start <= now, "start was > now")?;
3910    }
3911    if let Some(end) = end {
3912        check_predicate_true(end <= now, "end was > now")?;
3913    }
3914
3915    if let (Some(start), Some(end)) = (start, end) {
3916        check_predicate_true(start < end, "start was >= end")?;
3917    }
3918
3919    Ok(())
3920}
3921
3922fn log_error(e: &anyhow::Error) {
3923    log::error!("{e}");
3924}
3925
3926fn log_not_running<T>(msg: &T)
3927where
3928    T: Debug,
3929{
3930    log::trace!("Received message when not running - skipping {msg:?}");
3931}
3932
3933fn log_received<T>(msg: &T)
3934where
3935    T: Debug,
3936{
3937    log::debug!("{RECV} {msg:?}");
3938}