nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use chrono::{DateTime, Utc};
28use indexmap::IndexMap;
29use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
30#[cfg(feature = "defi")]
31use nautilus_model::defi::{
32    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
33};
34use nautilus_model::{
35    data::{
36        Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
37        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
38    },
39    enums::BookType,
40    events::order::filled::OrderFilled,
41    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
42    instruments::InstrumentAny,
43    orderbook::OrderBook,
44};
45use ustr::Ustr;
46
47#[cfg(feature = "indicators")]
48use super::indicators::Indicators;
49use super::{
50    Actor,
51    registry::{get_actor_unchecked, try_get_actor_unchecked},
52};
53#[cfg(feature = "defi")]
54use crate::defi;
55#[cfg(feature = "defi")]
56#[allow(unused_imports)]
57use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
58use crate::{
59    cache::Cache,
60    clock::Clock,
61    component::Component,
62    enums::{ComponentState, ComponentTrigger},
63    logging::{CMD, RECV, REQ, SEND},
64    messages::{
65        data::{
66            BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
67            InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
68            RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
69            SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
70            SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
71            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
72            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
73            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
74            UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
75            UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
76            UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
77        },
78        system::ShutdownSystem,
79    },
80    msgbus::{
81        self, MStr, Topic, get_message_bus,
82        handler::{ShareableMessageHandler, TypedMessageHandler},
83        switchboard::{
84            MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
85            get_custom_topic, get_funding_rate_topic, get_index_price_topic,
86            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
87            get_instruments_topic, get_mark_price_topic, get_order_fills_topic, get_quotes_topic,
88            get_trades_topic,
89        },
90    },
91    signal::Signal,
92    timer::{TimeEvent, TimeEventCallback},
93};
94
95/// Common configuration for [`DataActor`] based components.
96#[derive(Debug, Clone)]
97#[cfg_attr(
98    feature = "python",
99    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", subclass)
100)]
101pub struct DataActorConfig {
102    /// The custom identifier for the Actor.
103    pub actor_id: Option<ActorId>,
104    /// If events should be logged.
105    pub log_events: bool,
106    /// If commands should be logged.
107    pub log_commands: bool,
108}
109
110impl Default for DataActorConfig {
111    fn default() -> Self {
112        Self {
113            actor_id: None,
114            log_events: true,
115            log_commands: true,
116        }
117    }
118}
119
120/// Configuration for creating actors from importable paths.
121#[derive(Debug, Clone)]
122#[cfg_attr(
123    feature = "python",
124    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
125)]
126pub struct ImportableActorConfig {
127    /// The fully qualified name of the Actor class.
128    pub actor_path: String,
129    /// The fully qualified name of the Actor config class.
130    pub config_path: String,
131    /// The actor configuration as a dictionary.
132    pub config: HashMap<String, serde_json::Value>,
133}
134
135type RequestCallback = Box<dyn Fn(UUID4) + Send + Sync>; // TODO: TBD
136
137pub trait DataActor:
138    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
139{
140    /// Actions to be performed when the actor state is saved.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if saving the actor state fails.
145    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
146        Ok(IndexMap::new())
147    }
148
149    /// Actions to be performed when the actor state is loaded.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if loading the actor state fails.
154    #[allow(unused_variables)]
155    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
156        Ok(())
157    }
158
159    /// Actions to be performed on start.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if starting the actor fails.
164    fn on_start(&mut self) -> anyhow::Result<()> {
165        log::warn!(
166            "The `on_start` handler was called when not overridden, \
167            it's expected that any actions required when starting the actor \
168            occur here, such as subscribing/requesting data"
169        );
170        Ok(())
171    }
172
173    /// Actions to be performed on stop.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if stopping the actor fails.
178    fn on_stop(&mut self) -> anyhow::Result<()> {
179        log::warn!(
180            "The `on_stop` handler was called when not overridden, \
181            it's expected that any actions required when stopping the actor \
182            occur here, such as unsubscribing from data",
183        );
184        Ok(())
185    }
186
187    /// Actions to be performed on resume.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if resuming the actor fails.
192    fn on_resume(&mut self) -> anyhow::Result<()> {
193        log::warn!(
194            "The `on_resume` handler was called when not overridden, \
195            it's expected that any actions required when resuming the actor \
196            following a stop occur here"
197        );
198        Ok(())
199    }
200
201    /// Actions to be performed on reset.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if resetting the actor fails.
206    fn on_reset(&mut self) -> anyhow::Result<()> {
207        log::warn!(
208            "The `on_reset` handler was called when not overridden, \
209            it's expected that any actions required when resetting the actor \
210            occur here, such as resetting indicators and other state"
211        );
212        Ok(())
213    }
214
215    /// Actions to be performed on dispose.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if disposing the actor fails.
220    fn on_dispose(&mut self) -> anyhow::Result<()> {
221        Ok(())
222    }
223
224    /// Actions to be performed on degrade.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if degrading the actor fails.
229    fn on_degrade(&mut self) -> anyhow::Result<()> {
230        Ok(())
231    }
232
233    /// Actions to be performed on fault.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if faulting the actor fails.
238    fn on_fault(&mut self) -> anyhow::Result<()> {
239        Ok(())
240    }
241
242    /// Actions to be performed when receiving a time event.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if handling the time event fails.
247    #[allow(unused_variables)]
248    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
249        Ok(())
250    }
251
252    /// Actions to be performed when receiving custom data.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if handling the data fails.
257    #[allow(unused_variables)]
258    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
259        Ok(())
260    }
261
262    /// Actions to be performed when receiving a signal.
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if handling the signal fails.
267    #[allow(unused_variables)]
268    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
269        Ok(())
270    }
271
272    /// Actions to be performed when receiving an instrument.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if handling the instrument fails.
277    #[allow(unused_variables)]
278    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
279        Ok(())
280    }
281
282    /// Actions to be performed when receiving order book deltas.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if handling the book deltas fails.
287    #[allow(unused_variables)]
288    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
289        Ok(())
290    }
291
292    /// Actions to be performed when receiving an order book.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if handling the book fails.
297    #[allow(unused_variables)]
298    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
299        Ok(())
300    }
301
302    /// Actions to be performed when receiving a quote.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if handling the quote fails.
307    #[allow(unused_variables)]
308    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
309        Ok(())
310    }
311
312    /// Actions to be performed when receiving a trade.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if handling the trade fails.
317    #[allow(unused_variables)]
318    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
319        Ok(())
320    }
321
322    /// Actions to be performed when receiving a bar.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if handling the bar fails.
327    #[allow(unused_variables)]
328    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
329        Ok(())
330    }
331
332    /// Actions to be performed when receiving a mark price update.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if handling the mark price update fails.
337    #[allow(unused_variables)]
338    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
339        Ok(())
340    }
341
342    /// Actions to be performed when receiving an index price update.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if handling the index price update fails.
347    #[allow(unused_variables)]
348    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
349        Ok(())
350    }
351
352    /// Actions to be performed when receiving a funding rate update.
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if handling the funding rate update fails.
357    #[allow(unused_variables)]
358    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
359        Ok(())
360    }
361
362    /// Actions to be performed when receiving an instrument status update.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if handling the instrument status update fails.
367    #[allow(unused_variables)]
368    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
369        Ok(())
370    }
371
372    /// Actions to be performed when receiving an instrument close update.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if handling the instrument close update fails.
377    #[allow(unused_variables)]
378    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
379        Ok(())
380    }
381
382    /// Actions to be performed when receiving an order filled event.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if handling the order filled event fails.
387    #[allow(unused_variables)]
388    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
389        Ok(())
390    }
391
392    #[cfg(feature = "defi")]
393    /// Actions to be performed when receiving a block.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if handling the block fails.
398    #[allow(unused_variables)]
399    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
400        Ok(())
401    }
402
403    #[cfg(feature = "defi")]
404    /// Actions to be performed when receiving a pool.
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if handling the pool fails.
409    #[allow(unused_variables)]
410    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
411        Ok(())
412    }
413
414    #[cfg(feature = "defi")]
415    /// Actions to be performed when receiving a pool swap.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if handling the pool swap fails.
420    #[allow(unused_variables)]
421    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
422        Ok(())
423    }
424
425    #[cfg(feature = "defi")]
426    /// Actions to be performed when receiving a pool liquidity update.
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if handling the pool liquidity update fails.
431    #[allow(unused_variables)]
432    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
433        Ok(())
434    }
435
436    #[cfg(feature = "defi")]
437    /// Actions to be performed when receiving a pool fee collect event.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if handling the pool fee collect fails.
442    #[allow(unused_variables)]
443    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
444        Ok(())
445    }
446
447    #[cfg(feature = "defi")]
448    /// Actions to be performed when receiving a pool flash event.
449    ///
450    /// # Errors
451    ///
452    /// Returns an error if handling the pool flash fails.
453    #[allow(unused_variables)]
454    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
455        Ok(())
456    }
457
458    /// Actions to be performed when receiving historical data.
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if handling the historical data fails.
463    #[allow(unused_variables)]
464    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
465        Ok(())
466    }
467
468    /// Actions to be performed when receiving historical quotes.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if handling the historical quotes fails.
473    #[allow(unused_variables)]
474    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
475        Ok(())
476    }
477
478    /// Actions to be performed when receiving historical trades.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if handling the historical trades fails.
483    #[allow(unused_variables)]
484    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
485        Ok(())
486    }
487
488    /// Actions to be performed when receiving historical bars.
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if handling the historical bars fails.
493    #[allow(unused_variables)]
494    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
495        Ok(())
496    }
497
498    /// Actions to be performed when receiving historical mark prices.
499    ///
500    /// # Errors
501    ///
502    /// Returns an error if handling the historical mark prices fails.
503    #[allow(unused_variables)]
504    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
505        Ok(())
506    }
507
508    /// Actions to be performed when receiving historical index prices.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if handling the historical index prices fails.
513    #[allow(unused_variables)]
514    fn on_historical_index_prices(
515        &mut self,
516        index_prices: &[IndexPriceUpdate],
517    ) -> anyhow::Result<()> {
518        Ok(())
519    }
520
521    /// Handles a received time event.
522    fn handle_time_event(&mut self, event: &TimeEvent) {
523        log_received(&event);
524
525        if let Err(e) = DataActor::on_time_event(self, event) {
526            log_error(&e);
527        }
528    }
529
530    /// Handles a received custom data point.
531    fn handle_data(&mut self, data: &dyn Any) {
532        log_received(&data);
533
534        if self.not_running() {
535            log_not_running(&data);
536            return;
537        }
538
539        if let Err(e) = self.on_data(data) {
540            log_error(&e);
541        }
542    }
543
544    /// Handles a received signal.
545    fn handle_signal(&mut self, signal: &Signal) {
546        log_received(&signal);
547
548        if self.not_running() {
549            log_not_running(&signal);
550            return;
551        }
552
553        if let Err(e) = self.on_signal(signal) {
554            log_error(&e);
555        }
556    }
557
558    /// Handles a received instrument.
559    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
560        log_received(&instrument);
561
562        if self.not_running() {
563            log_not_running(&instrument);
564            return;
565        }
566
567        if let Err(e) = self.on_instrument(instrument) {
568            log_error(&e);
569        }
570    }
571
572    /// Handles received order book deltas.
573    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
574        log_received(&deltas);
575
576        if self.not_running() {
577            log_not_running(&deltas);
578            return;
579        }
580
581        if let Err(e) = self.on_book_deltas(deltas) {
582            log_error(&e);
583        }
584    }
585
586    /// Handles a received order book reference.
587    fn handle_book(&mut self, book: &OrderBook) {
588        log_received(&book);
589
590        if self.not_running() {
591            log_not_running(&book);
592            return;
593        }
594
595        if let Err(e) = self.on_book(book) {
596            log_error(&e);
597        };
598    }
599
600    /// Handles a received quote.
601    fn handle_quote(&mut self, quote: &QuoteTick) {
602        log_received(&quote);
603
604        if self.not_running() {
605            log_not_running(&quote);
606            return;
607        }
608
609        if let Err(e) = self.on_quote(quote) {
610            log_error(&e);
611        }
612    }
613
614    /// Handles a received trade.
615    fn handle_trade(&mut self, trade: &TradeTick) {
616        log_received(&trade);
617
618        if self.not_running() {
619            log_not_running(&trade);
620            return;
621        }
622
623        if let Err(e) = self.on_trade(trade) {
624            log_error(&e);
625        }
626    }
627
628    /// Handles a receiving bar.
629    fn handle_bar(&mut self, bar: &Bar) {
630        log_received(&bar);
631
632        if self.not_running() {
633            log_not_running(&bar);
634            return;
635        }
636
637        if let Err(e) = self.on_bar(bar) {
638            log_error(&e);
639        }
640    }
641
642    /// Handles a received mark price update.
643    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
644        log_received(&mark_price);
645
646        if self.not_running() {
647            log_not_running(&mark_price);
648            return;
649        }
650
651        if let Err(e) = self.on_mark_price(mark_price) {
652            log_error(&e);
653        }
654    }
655
656    /// Handles a received index price update.
657    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
658        log_received(&index_price);
659
660        if self.not_running() {
661            log_not_running(&index_price);
662            return;
663        }
664
665        if let Err(e) = self.on_index_price(index_price) {
666            log_error(&e);
667        }
668    }
669
670    /// Handles a received funding rate update.
671    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
672        log_received(&funding_rate);
673
674        if self.not_running() {
675            log_not_running(&funding_rate);
676            return;
677        }
678
679        if let Err(e) = self.on_funding_rate(funding_rate) {
680            log_error(&e);
681        }
682    }
683
684    /// Handles a received instrument status.
685    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
686        log_received(&status);
687
688        if self.not_running() {
689            log_not_running(&status);
690            return;
691        }
692
693        if let Err(e) = self.on_instrument_status(status) {
694            log_error(&e);
695        }
696    }
697
698    /// Handles a received instrument close.
699    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
700        log_received(&close);
701
702        if self.not_running() {
703            log_not_running(&close);
704            return;
705        }
706
707        if let Err(e) = self.on_instrument_close(close) {
708            log_error(&e);
709        }
710    }
711
712    /// Handles a received order filled event.
713    fn handle_order_filled(&mut self, event: &OrderFilled) {
714        log_received(&event);
715
716        // Check for double-handling: if the event's strategy_id matches this actor's id,
717        // it means a Strategy is receiving its own fill event through both automatic
718        // subscription and manual subscribe_order_fills, so skip the manual handler.
719        if event.strategy_id.inner() == self.actor_id().inner() {
720            return;
721        }
722
723        if self.not_running() {
724            log_not_running(&event);
725            return;
726        }
727
728        if let Err(e) = self.on_order_filled(event) {
729            log_error(&e);
730        }
731    }
732
733    #[cfg(feature = "defi")]
734    /// Handles a received block.
735    fn handle_block(&mut self, block: &Block) {
736        log_received(&block);
737
738        if self.not_running() {
739            log_not_running(&block);
740            return;
741        }
742
743        if let Err(e) = self.on_block(block) {
744            log_error(&e);
745        }
746    }
747
748    #[cfg(feature = "defi")]
749    /// Handles a received pool definition update.
750    fn handle_pool(&mut self, pool: &Pool) {
751        log_received(&pool);
752
753        if self.not_running() {
754            log_not_running(&pool);
755            return;
756        }
757
758        if let Err(e) = self.on_pool(pool) {
759            log_error(&e);
760        }
761    }
762
763    #[cfg(feature = "defi")]
764    /// Handles a received pool swap.
765    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
766        log_received(&swap);
767
768        if self.not_running() {
769            log_not_running(&swap);
770            return;
771        }
772
773        if let Err(e) = self.on_pool_swap(swap) {
774            log_error(&e);
775        }
776    }
777
778    #[cfg(feature = "defi")]
779    /// Handles a received pool liquidity update.
780    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
781        log_received(&update);
782
783        if self.not_running() {
784            log_not_running(&update);
785            return;
786        }
787
788        if let Err(e) = self.on_pool_liquidity_update(update) {
789            log_error(&e);
790        }
791    }
792
793    #[cfg(feature = "defi")]
794    /// Handles a received pool fee collect.
795    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
796        log_received(&collect);
797
798        if self.not_running() {
799            log_not_running(&collect);
800            return;
801        }
802
803        if let Err(e) = self.on_pool_fee_collect(collect) {
804            log_error(&e);
805        }
806    }
807
808    #[cfg(feature = "defi")]
809    /// Handles a received pool flash event.
810    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
811        log_received(&flash);
812
813        if self.not_running() {
814            log_not_running(&flash);
815            return;
816        }
817
818        if let Err(e) = self.on_pool_flash(flash) {
819            log_error(&e);
820        }
821    }
822
823    /// Handles received historical data.
824    fn handle_historical_data(&mut self, data: &dyn Any) {
825        log_received(&data);
826
827        if let Err(e) = self.on_historical_data(data) {
828            log_error(&e);
829        }
830    }
831
832    /// Handles a data response.
833    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
834        log_received(&resp);
835
836        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
837            log_error(&e);
838        }
839    }
840
841    /// Handles an instrument response.
842    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
843        log_received(&resp);
844
845        if let Err(e) = self.on_instrument(&resp.data) {
846            log_error(&e);
847        }
848    }
849
850    /// Handles an instruments response.
851    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
852        log_received(&resp);
853
854        for inst in &resp.data {
855            if let Err(e) = self.on_instrument(inst) {
856                log_error(&e);
857            }
858        }
859    }
860
861    /// Handles a book response.
862    fn handle_book_response(&mut self, resp: &BookResponse) {
863        log_received(&resp);
864
865        if let Err(e) = self.on_book(&resp.data) {
866            log_error(&e);
867        }
868    }
869
870    /// Handles a quotes response.
871    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
872        log_received(&resp);
873
874        if let Err(e) = self.on_historical_quotes(&resp.data) {
875            log_error(&e);
876        }
877    }
878
879    /// Handles a trades response.
880    fn handle_trades_response(&mut self, resp: &TradesResponse) {
881        log_received(&resp);
882
883        if let Err(e) = self.on_historical_trades(&resp.data) {
884            log_error(&e);
885        }
886    }
887
888    /// Handles a bars response.
889    fn handle_bars_response(&mut self, resp: &BarsResponse) {
890        log_received(&resp);
891
892        if let Err(e) = self.on_historical_bars(&resp.data) {
893            log_error(&e);
894        }
895    }
896
897    /// Subscribe to streaming `data_type` data.
898    fn subscribe_data(
899        &mut self,
900        data_type: DataType,
901        client_id: Option<ClientId>,
902        params: Option<IndexMap<String, String>>,
903    ) where
904        Self: 'static + Debug + Sized,
905    {
906        let actor_id = self.actor_id().inner();
907        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
908            move |data: &dyn Any| {
909                get_actor_unchecked::<Self>(&actor_id).handle_data(data);
910            },
911        )));
912
913        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
914    }
915
916    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
917    fn subscribe_quotes(
918        &mut self,
919        instrument_id: InstrumentId,
920        client_id: Option<ClientId>,
921        params: Option<IndexMap<String, String>>,
922    ) where
923        Self: 'static + Debug + Sized,
924    {
925        let actor_id = self.actor_id().inner();
926        let topic = get_quotes_topic(instrument_id);
927
928        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
929            move |quote: &QuoteTick| {
930                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
931                    actor.handle_quote(quote);
932                } else {
933                    log::error!("Actor {actor_id} not found for quote handling");
934                }
935            },
936        )));
937
938        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
939    }
940
941    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
942    fn subscribe_instruments(
943        &mut self,
944        venue: Venue,
945        client_id: Option<ClientId>,
946        params: Option<IndexMap<String, String>>,
947    ) where
948        Self: 'static + Debug + Sized,
949    {
950        let actor_id = self.actor_id().inner();
951        let topic = get_instruments_topic(venue);
952
953        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
954            move |instrument: &InstrumentAny| {
955                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
956                    actor.handle_instrument(instrument);
957                } else {
958                    log::error!("Actor {actor_id} not found for instruments handling");
959                }
960            },
961        )));
962
963        DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
964    }
965
966    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
967    fn subscribe_instrument(
968        &mut self,
969        instrument_id: InstrumentId,
970        client_id: Option<ClientId>,
971        params: Option<IndexMap<String, String>>,
972    ) where
973        Self: 'static + Debug + Sized,
974    {
975        let actor_id = self.actor_id().inner();
976        let topic = get_instrument_topic(instrument_id);
977
978        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
979            move |instrument: &InstrumentAny| {
980                if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
981                    actor.handle_instrument(instrument);
982                } else {
983                    log::error!("Actor {actor_id} not found for instrument handling");
984                }
985            },
986        )));
987
988        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
989    }
990
991    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
992    fn subscribe_book_deltas(
993        &mut self,
994        instrument_id: InstrumentId,
995        book_type: BookType,
996        depth: Option<NonZeroUsize>,
997        client_id: Option<ClientId>,
998        managed: bool,
999        params: Option<IndexMap<String, String>>,
1000    ) where
1001        Self: 'static + Debug + Sized,
1002    {
1003        let actor_id = self.actor_id().inner();
1004        let topic = get_book_deltas_topic(instrument_id);
1005
1006        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1007            move |deltas: &OrderBookDeltas| {
1008                get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1009            },
1010        )));
1011
1012        DataActorCore::subscribe_book_deltas(
1013            self,
1014            topic,
1015            handler,
1016            instrument_id,
1017            book_type,
1018            depth,
1019            client_id,
1020            managed,
1021            params,
1022        );
1023    }
1024
1025    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1026    fn subscribe_book_at_interval(
1027        &mut self,
1028        instrument_id: InstrumentId,
1029        book_type: BookType,
1030        depth: Option<NonZeroUsize>,
1031        interval_ms: NonZeroUsize,
1032        client_id: Option<ClientId>,
1033        params: Option<IndexMap<String, String>>,
1034    ) where
1035        Self: 'static + Debug + Sized,
1036    {
1037        let actor_id = self.actor_id().inner();
1038        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1039
1040        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1041            move |book: &OrderBook| {
1042                get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1043            },
1044        )));
1045
1046        DataActorCore::subscribe_book_at_interval(
1047            self,
1048            topic,
1049            handler,
1050            instrument_id,
1051            book_type,
1052            depth,
1053            interval_ms,
1054            client_id,
1055            params,
1056        );
1057    }
1058
1059    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1060    fn subscribe_trades(
1061        &mut self,
1062        instrument_id: InstrumentId,
1063        client_id: Option<ClientId>,
1064        params: Option<IndexMap<String, String>>,
1065    ) where
1066        Self: 'static + Debug + Sized,
1067    {
1068        let actor_id = self.actor_id().inner();
1069        let topic = get_trades_topic(instrument_id);
1070
1071        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1072            move |trade: &TradeTick| {
1073                get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1074            },
1075        )));
1076
1077        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1078    }
1079
1080    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1081    fn subscribe_bars(
1082        &mut self,
1083        bar_type: BarType,
1084        client_id: Option<ClientId>,
1085        params: Option<IndexMap<String, String>>,
1086    ) where
1087        Self: 'static + Debug + Sized,
1088    {
1089        let actor_id = self.actor_id().inner();
1090        let topic = get_bars_topic(bar_type);
1091
1092        let handler =
1093            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1094                get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1095            })));
1096
1097        DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1098    }
1099
1100    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1101    fn subscribe_mark_prices(
1102        &mut self,
1103        instrument_id: InstrumentId,
1104        client_id: Option<ClientId>,
1105        params: Option<IndexMap<String, String>>,
1106    ) where
1107        Self: 'static + Debug + Sized,
1108    {
1109        let actor_id = self.actor_id().inner();
1110        let topic = get_mark_price_topic(instrument_id);
1111
1112        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1113            move |mark_price: &MarkPriceUpdate| {
1114                get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1115            },
1116        )));
1117
1118        DataActorCore::subscribe_mark_prices(
1119            self,
1120            topic,
1121            handler,
1122            instrument_id,
1123            client_id,
1124            params,
1125        );
1126    }
1127
1128    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1129    fn subscribe_index_prices(
1130        &mut self,
1131        instrument_id: InstrumentId,
1132        client_id: Option<ClientId>,
1133        params: Option<IndexMap<String, String>>,
1134    ) where
1135        Self: 'static + Debug + Sized,
1136    {
1137        let actor_id = self.actor_id().inner();
1138        let topic = get_index_price_topic(instrument_id);
1139
1140        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1141            move |index_price: &IndexPriceUpdate| {
1142                get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1143            },
1144        )));
1145
1146        DataActorCore::subscribe_index_prices(
1147            self,
1148            topic,
1149            handler,
1150            instrument_id,
1151            client_id,
1152            params,
1153        );
1154    }
1155
1156    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1157    fn subscribe_funding_rates(
1158        &mut self,
1159        instrument_id: InstrumentId,
1160        client_id: Option<ClientId>,
1161        params: Option<IndexMap<String, String>>,
1162    ) where
1163        Self: 'static + Debug + Sized,
1164    {
1165        let actor_id = self.actor_id().inner();
1166        let topic = get_funding_rate_topic(instrument_id);
1167
1168        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1169            move |funding_rate: &FundingRateUpdate| {
1170                get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1171            },
1172        )));
1173
1174        DataActorCore::subscribe_funding_rates(
1175            self,
1176            topic,
1177            handler,
1178            instrument_id,
1179            client_id,
1180            params,
1181        );
1182    }
1183
1184    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1185    fn subscribe_instrument_status(
1186        &mut self,
1187        instrument_id: InstrumentId,
1188        client_id: Option<ClientId>,
1189        params: Option<IndexMap<String, String>>,
1190    ) where
1191        Self: 'static + Debug + Sized,
1192    {
1193        let actor_id = self.actor_id().inner();
1194        let topic = get_instrument_status_topic(instrument_id);
1195
1196        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1197            move |status: &InstrumentStatus| {
1198                get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1199            },
1200        )));
1201
1202        DataActorCore::subscribe_instrument_status(
1203            self,
1204            topic,
1205            handler,
1206            instrument_id,
1207            client_id,
1208            params,
1209        );
1210    }
1211
1212    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1213    fn subscribe_instrument_close(
1214        &mut self,
1215        instrument_id: InstrumentId,
1216        client_id: Option<ClientId>,
1217        params: Option<IndexMap<String, String>>,
1218    ) where
1219        Self: 'static + Debug + Sized,
1220    {
1221        let actor_id = self.actor_id().inner();
1222        let topic = get_instrument_close_topic(instrument_id);
1223
1224        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1225            move |close: &InstrumentClose| {
1226                get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1227            },
1228        )));
1229
1230        DataActorCore::subscribe_instrument_close(
1231            self,
1232            topic,
1233            handler,
1234            instrument_id,
1235            client_id,
1236            params,
1237        );
1238    }
1239
1240    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1241    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1242    where
1243        Self: 'static + Debug + Sized,
1244    {
1245        let actor_id = self.actor_id().inner();
1246        let topic = get_order_fills_topic(instrument_id);
1247
1248        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1249            move |event: &OrderFilled| {
1250                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(event);
1251            },
1252        )));
1253
1254        DataActorCore::subscribe_order_fills(self, topic, handler);
1255    }
1256
1257    #[cfg(feature = "defi")]
1258    /// Subscribe to streaming [`Block`] data for the `chain`.
1259    fn subscribe_blocks(
1260        &mut self,
1261        chain: Blockchain,
1262        client_id: Option<ClientId>,
1263        params: Option<IndexMap<String, String>>,
1264    ) where
1265        Self: 'static + Debug + Sized,
1266    {
1267        let actor_id = self.actor_id().inner();
1268        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1269
1270        let handler =
1271            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1272                get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1273            })));
1274
1275        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1276    }
1277
1278    #[cfg(feature = "defi")]
1279    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1280    fn subscribe_pool(
1281        &mut self,
1282        instrument_id: InstrumentId,
1283        client_id: Option<ClientId>,
1284        params: Option<IndexMap<String, String>>,
1285    ) where
1286        Self: 'static + Debug + Sized,
1287    {
1288        let actor_id = self.actor_id().inner();
1289        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1290
1291        let handler =
1292            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1293                get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1294            })));
1295
1296        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1297    }
1298
1299    #[cfg(feature = "defi")]
1300    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1301    fn subscribe_pool_swaps(
1302        &mut self,
1303        instrument_id: InstrumentId,
1304        client_id: Option<ClientId>,
1305        params: Option<IndexMap<String, String>>,
1306    ) where
1307        Self: 'static + Debug + Sized,
1308    {
1309        let actor_id = self.actor_id().inner();
1310        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1311
1312        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1313            move |swap: &PoolSwap| {
1314                get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1315            },
1316        )));
1317
1318        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1319    }
1320
1321    #[cfg(feature = "defi")]
1322    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1323    fn subscribe_pool_liquidity_updates(
1324        &mut self,
1325        instrument_id: InstrumentId,
1326        client_id: Option<ClientId>,
1327        params: Option<IndexMap<String, String>>,
1328    ) where
1329        Self: 'static + Debug + Sized,
1330    {
1331        let actor_id = self.actor_id().inner();
1332        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1333
1334        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1335            move |update: &PoolLiquidityUpdate| {
1336                get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1337            },
1338        )));
1339
1340        DataActorCore::subscribe_pool_liquidity_updates(
1341            self,
1342            topic,
1343            handler,
1344            instrument_id,
1345            client_id,
1346            params,
1347        );
1348    }
1349
1350    #[cfg(feature = "defi")]
1351    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1352    fn subscribe_pool_fee_collects(
1353        &mut self,
1354        instrument_id: InstrumentId,
1355        client_id: Option<ClientId>,
1356        params: Option<IndexMap<String, String>>,
1357    ) where
1358        Self: 'static + Debug + Sized,
1359    {
1360        let actor_id = self.actor_id().inner();
1361        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1362
1363        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1364            move |collect: &PoolFeeCollect| {
1365                get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1366            },
1367        )));
1368
1369        DataActorCore::subscribe_pool_fee_collects(
1370            self,
1371            topic,
1372            handler,
1373            instrument_id,
1374            client_id,
1375            params,
1376        );
1377    }
1378
1379    #[cfg(feature = "defi")]
1380    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1381    fn subscribe_pool_flash_events(
1382        &mut self,
1383        instrument_id: InstrumentId,
1384        client_id: Option<ClientId>,
1385        params: Option<IndexMap<String, String>>,
1386    ) where
1387        Self: 'static + Debug + Sized,
1388    {
1389        let actor_id = self.actor_id().inner();
1390        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1391
1392        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1393            move |flash: &PoolFlash| {
1394                get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1395            },
1396        )));
1397
1398        DataActorCore::subscribe_pool_flash_events(
1399            self,
1400            topic,
1401            handler,
1402            instrument_id,
1403            client_id,
1404            params,
1405        );
1406    }
1407
1408    /// Unsubscribe from streaming `data_type` data.
1409    fn unsubscribe_data(
1410        &mut self,
1411        data_type: DataType,
1412        client_id: Option<ClientId>,
1413        params: Option<IndexMap<String, String>>,
1414    ) where
1415        Self: 'static + Debug + Sized,
1416    {
1417        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1418    }
1419
1420    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1421    fn unsubscribe_instruments(
1422        &mut self,
1423        venue: Venue,
1424        client_id: Option<ClientId>,
1425        params: Option<IndexMap<String, String>>,
1426    ) where
1427        Self: 'static + Debug + Sized,
1428    {
1429        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1430    }
1431
1432    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1433    fn unsubscribe_instrument(
1434        &mut self,
1435        instrument_id: InstrumentId,
1436        client_id: Option<ClientId>,
1437        params: Option<IndexMap<String, String>>,
1438    ) where
1439        Self: 'static + Debug + Sized,
1440    {
1441        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1442    }
1443
1444    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1445    fn unsubscribe_book_deltas(
1446        &mut self,
1447        instrument_id: InstrumentId,
1448        client_id: Option<ClientId>,
1449        params: Option<IndexMap<String, String>>,
1450    ) where
1451        Self: 'static + Debug + Sized,
1452    {
1453        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1454    }
1455
1456    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1457    fn unsubscribe_book_at_interval(
1458        &mut self,
1459        instrument_id: InstrumentId,
1460        interval_ms: NonZeroUsize,
1461        client_id: Option<ClientId>,
1462        params: Option<IndexMap<String, String>>,
1463    ) where
1464        Self: 'static + Debug + Sized,
1465    {
1466        DataActorCore::unsubscribe_book_at_interval(
1467            self,
1468            instrument_id,
1469            interval_ms,
1470            client_id,
1471            params,
1472        );
1473    }
1474
1475    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1476    fn unsubscribe_quotes(
1477        &mut self,
1478        instrument_id: InstrumentId,
1479        client_id: Option<ClientId>,
1480        params: Option<IndexMap<String, String>>,
1481    ) where
1482        Self: 'static + Debug + Sized,
1483    {
1484        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1485    }
1486
1487    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1488    fn unsubscribe_trades(
1489        &mut self,
1490        instrument_id: InstrumentId,
1491        client_id: Option<ClientId>,
1492        params: Option<IndexMap<String, String>>,
1493    ) where
1494        Self: 'static + Debug + Sized,
1495    {
1496        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1497    }
1498
1499    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1500    fn unsubscribe_bars(
1501        &mut self,
1502        bar_type: BarType,
1503        client_id: Option<ClientId>,
1504        params: Option<IndexMap<String, String>>,
1505    ) where
1506        Self: 'static + Debug + Sized,
1507    {
1508        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1509    }
1510
1511    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1512    fn unsubscribe_mark_prices(
1513        &mut self,
1514        instrument_id: InstrumentId,
1515        client_id: Option<ClientId>,
1516        params: Option<IndexMap<String, String>>,
1517    ) where
1518        Self: 'static + Debug + Sized,
1519    {
1520        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1521    }
1522
1523    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1524    fn unsubscribe_index_prices(
1525        &mut self,
1526        instrument_id: InstrumentId,
1527        client_id: Option<ClientId>,
1528        params: Option<IndexMap<String, String>>,
1529    ) where
1530        Self: 'static + Debug + Sized,
1531    {
1532        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1533    }
1534
1535    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1536    fn unsubscribe_funding_rates(
1537        &mut self,
1538        instrument_id: InstrumentId,
1539        client_id: Option<ClientId>,
1540        params: Option<IndexMap<String, String>>,
1541    ) where
1542        Self: 'static + Debug + Sized,
1543    {
1544        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1545    }
1546
1547    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1548    fn unsubscribe_instrument_status(
1549        &mut self,
1550        instrument_id: InstrumentId,
1551        client_id: Option<ClientId>,
1552        params: Option<IndexMap<String, String>>,
1553    ) where
1554        Self: 'static + Debug + Sized,
1555    {
1556        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1557    }
1558
1559    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1560    fn unsubscribe_instrument_close(
1561        &mut self,
1562        instrument_id: InstrumentId,
1563        client_id: Option<ClientId>,
1564        params: Option<IndexMap<String, String>>,
1565    ) where
1566        Self: 'static + Debug + Sized,
1567    {
1568        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1569    }
1570
1571    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
1572    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1573    where
1574        Self: 'static + Debug + Sized,
1575    {
1576        DataActorCore::unsubscribe_order_fills(self, instrument_id);
1577    }
1578
1579    #[cfg(feature = "defi")]
1580    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1581    fn unsubscribe_blocks(
1582        &mut self,
1583        chain: Blockchain,
1584        client_id: Option<ClientId>,
1585        params: Option<IndexMap<String, String>>,
1586    ) where
1587        Self: 'static + Debug + Sized,
1588    {
1589        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1590    }
1591
1592    #[cfg(feature = "defi")]
1593    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1594    fn unsubscribe_pool(
1595        &mut self,
1596        instrument_id: InstrumentId,
1597        client_id: Option<ClientId>,
1598        params: Option<IndexMap<String, String>>,
1599    ) where
1600        Self: 'static + Debug + Sized,
1601    {
1602        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1603    }
1604
1605    #[cfg(feature = "defi")]
1606    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1607    fn unsubscribe_pool_swaps(
1608        &mut self,
1609        instrument_id: InstrumentId,
1610        client_id: Option<ClientId>,
1611        params: Option<IndexMap<String, String>>,
1612    ) where
1613        Self: 'static + Debug + Sized,
1614    {
1615        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1616    }
1617
1618    #[cfg(feature = "defi")]
1619    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1620    fn unsubscribe_pool_liquidity_updates(
1621        &mut self,
1622        instrument_id: InstrumentId,
1623        client_id: Option<ClientId>,
1624        params: Option<IndexMap<String, String>>,
1625    ) where
1626        Self: 'static + Debug + Sized,
1627    {
1628        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1629    }
1630
1631    #[cfg(feature = "defi")]
1632    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
1633    fn unsubscribe_pool_fee_collects(
1634        &mut self,
1635        instrument_id: InstrumentId,
1636        client_id: Option<ClientId>,
1637        params: Option<IndexMap<String, String>>,
1638    ) where
1639        Self: 'static + Debug + Sized,
1640    {
1641        DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1642    }
1643
1644    #[cfg(feature = "defi")]
1645    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
1646    fn unsubscribe_pool_flash_events(
1647        &mut self,
1648        instrument_id: InstrumentId,
1649        client_id: Option<ClientId>,
1650        params: Option<IndexMap<String, String>>,
1651    ) where
1652        Self: 'static + Debug + Sized,
1653    {
1654        DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1655    }
1656
1657    /// Request historical custom data of the given `data_type`.
1658    ///
1659    /// # Errors
1660    ///
1661    /// Returns an error if input parameters are invalid.
1662    fn request_data(
1663        &mut self,
1664        data_type: DataType,
1665        client_id: ClientId,
1666        start: Option<DateTime<Utc>>,
1667        end: Option<DateTime<Utc>>,
1668        limit: Option<NonZeroUsize>,
1669        params: Option<IndexMap<String, String>>,
1670    ) -> anyhow::Result<UUID4>
1671    where
1672        Self: 'static + Debug + Sized,
1673    {
1674        let actor_id = self.actor_id().inner();
1675        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1676            move |resp: &CustomDataResponse| {
1677                get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1678            },
1679        )));
1680
1681        DataActorCore::request_data(
1682            self, data_type, client_id, start, end, limit, params, handler,
1683        )
1684    }
1685
1686    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1687    ///
1688    /// # Errors
1689    ///
1690    /// Returns an error if input parameters are invalid.
1691    fn request_instrument(
1692        &mut self,
1693        instrument_id: InstrumentId,
1694        start: Option<DateTime<Utc>>,
1695        end: Option<DateTime<Utc>>,
1696        client_id: Option<ClientId>,
1697        params: Option<IndexMap<String, String>>,
1698    ) -> anyhow::Result<UUID4>
1699    where
1700        Self: 'static + Debug + Sized,
1701    {
1702        let actor_id = self.actor_id().inner();
1703        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1704            move |resp: &InstrumentResponse| {
1705                get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1706            },
1707        )));
1708
1709        DataActorCore::request_instrument(
1710            self,
1711            instrument_id,
1712            start,
1713            end,
1714            client_id,
1715            params,
1716            handler,
1717        )
1718    }
1719
1720    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1721    ///
1722    /// # Errors
1723    ///
1724    /// Returns an error if input parameters are invalid.
1725    fn request_instruments(
1726        &mut self,
1727        venue: Option<Venue>,
1728        start: Option<DateTime<Utc>>,
1729        end: Option<DateTime<Utc>>,
1730        client_id: Option<ClientId>,
1731        params: Option<IndexMap<String, String>>,
1732    ) -> anyhow::Result<UUID4>
1733    where
1734        Self: 'static + Debug + Sized,
1735    {
1736        let actor_id = self.actor_id().inner();
1737        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1738            move |resp: &InstrumentsResponse| {
1739                get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1740            },
1741        )));
1742
1743        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1744    }
1745
1746    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1747    ///
1748    /// # Errors
1749    ///
1750    /// Returns an error if input parameters are invalid.
1751    fn request_book_snapshot(
1752        &mut self,
1753        instrument_id: InstrumentId,
1754        depth: Option<NonZeroUsize>,
1755        client_id: Option<ClientId>,
1756        params: Option<IndexMap<String, String>>,
1757    ) -> anyhow::Result<UUID4>
1758    where
1759        Self: 'static + Debug + Sized,
1760    {
1761        let actor_id = self.actor_id().inner();
1762        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1763            move |resp: &BookResponse| {
1764                get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1765            },
1766        )));
1767
1768        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1769    }
1770
1771    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1772    ///
1773    /// # Errors
1774    ///
1775    /// Returns an error if input parameters are invalid.
1776    fn request_quotes(
1777        &mut self,
1778        instrument_id: InstrumentId,
1779        start: Option<DateTime<Utc>>,
1780        end: Option<DateTime<Utc>>,
1781        limit: Option<NonZeroUsize>,
1782        client_id: Option<ClientId>,
1783        params: Option<IndexMap<String, String>>,
1784    ) -> anyhow::Result<UUID4>
1785    where
1786        Self: 'static + Debug + Sized,
1787    {
1788        let actor_id = self.actor_id().inner();
1789        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1790            move |resp: &QuotesResponse| {
1791                get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1792            },
1793        )));
1794
1795        DataActorCore::request_quotes(
1796            self,
1797            instrument_id,
1798            start,
1799            end,
1800            limit,
1801            client_id,
1802            params,
1803            handler,
1804        )
1805    }
1806
1807    /// Request historical [`TradeTick`] data for the given `instrument_id`.
1808    ///
1809    /// # Errors
1810    ///
1811    /// Returns an error if input parameters are invalid.
1812    fn request_trades(
1813        &mut self,
1814        instrument_id: InstrumentId,
1815        start: Option<DateTime<Utc>>,
1816        end: Option<DateTime<Utc>>,
1817        limit: Option<NonZeroUsize>,
1818        client_id: Option<ClientId>,
1819        params: Option<IndexMap<String, String>>,
1820    ) -> anyhow::Result<UUID4>
1821    where
1822        Self: 'static + Debug + Sized,
1823    {
1824        let actor_id = self.actor_id().inner();
1825        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1826            move |resp: &TradesResponse| {
1827                get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1828            },
1829        )));
1830
1831        DataActorCore::request_trades(
1832            self,
1833            instrument_id,
1834            start,
1835            end,
1836            limit,
1837            client_id,
1838            params,
1839            handler,
1840        )
1841    }
1842
1843    /// Request historical [`Bar`] data for the given `bar_type`.
1844    ///
1845    /// # Errors
1846    ///
1847    /// Returns an error if input parameters are invalid.
1848    fn request_bars(
1849        &mut self,
1850        bar_type: BarType,
1851        start: Option<DateTime<Utc>>,
1852        end: Option<DateTime<Utc>>,
1853        limit: Option<NonZeroUsize>,
1854        client_id: Option<ClientId>,
1855        params: Option<IndexMap<String, String>>,
1856    ) -> anyhow::Result<UUID4>
1857    where
1858        Self: 'static + Debug + Sized,
1859    {
1860        let actor_id = self.actor_id().inner();
1861        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1862            move |resp: &BarsResponse| {
1863                get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1864            },
1865        )));
1866
1867        DataActorCore::request_bars(
1868            self, bar_type, start, end, limit, client_id, params, handler,
1869        )
1870    }
1871}
1872
1873// Blanket implementation: any DataActor automatically implements Actor
1874impl<T> Actor for T
1875where
1876    T: DataActor + Debug + 'static,
1877{
1878    fn id(&self) -> Ustr {
1879        self.actor_id.inner()
1880    }
1881
1882    #[allow(unused_variables)]
1883    fn handle(&mut self, msg: &dyn Any) {
1884        // Default empty implementation - concrete actors can override if needed
1885    }
1886
1887    fn as_any(&self) -> &dyn Any {
1888        self
1889    }
1890}
1891
1892// Blanket implementation: any DataActor automatically implements Component
1893impl<T> Component for T
1894where
1895    T: DataActor + Debug + 'static,
1896{
1897    fn component_id(&self) -> ComponentId {
1898        ComponentId::new(self.actor_id.inner().as_str())
1899    }
1900
1901    fn state(&self) -> ComponentState {
1902        self.state
1903    }
1904
1905    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1906        self.state = self.state.transition(&trigger)?;
1907        log::info!("{}", self.state.variant_name());
1908        Ok(())
1909    }
1910
1911    fn register(
1912        &mut self,
1913        trader_id: TraderId,
1914        clock: Rc<RefCell<dyn Clock>>,
1915        cache: Rc<RefCell<Cache>>,
1916    ) -> anyhow::Result<()> {
1917        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1918
1919        // Register default time event handler for this actor
1920        let actor_id = self.actor_id().inner();
1921        let callback = TimeEventCallback::from(move |event: TimeEvent| {
1922            if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1923                actor.handle_time_event(&event);
1924            } else {
1925                log::error!("Actor {actor_id} not found for time event handling");
1926            }
1927        });
1928
1929        clock.borrow_mut().register_default_handler(callback);
1930
1931        self.initialize()
1932    }
1933
1934    fn on_start(&mut self) -> anyhow::Result<()> {
1935        DataActor::on_start(self)
1936    }
1937
1938    fn on_stop(&mut self) -> anyhow::Result<()> {
1939        DataActor::on_stop(self)
1940    }
1941
1942    fn on_resume(&mut self) -> anyhow::Result<()> {
1943        DataActor::on_resume(self)
1944    }
1945
1946    fn on_degrade(&mut self) -> anyhow::Result<()> {
1947        DataActor::on_degrade(self)
1948    }
1949
1950    fn on_fault(&mut self) -> anyhow::Result<()> {
1951        DataActor::on_fault(self)
1952    }
1953
1954    fn on_reset(&mut self) -> anyhow::Result<()> {
1955        DataActor::on_reset(self)
1956    }
1957
1958    fn on_dispose(&mut self) -> anyhow::Result<()> {
1959        DataActor::on_dispose(self)
1960    }
1961}
1962
1963/// Core functionality for all actors.
1964#[allow(
1965    dead_code,
1966    reason = "TODO: Under development (pending_requests, signal_classes)"
1967)]
1968pub struct DataActorCore {
1969    /// The actor identifier.
1970    pub actor_id: ActorId,
1971    /// The actors configuration.
1972    pub config: DataActorConfig,
1973    trader_id: Option<TraderId>,
1974    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
1975    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
1976    state: ComponentState,
1977    topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
1978    warning_events: AHashSet<String>, // TODO: TBD
1979    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
1980    signal_classes: AHashMap<String, String>,
1981    #[cfg(feature = "indicators")]
1982    indicators: Indicators,
1983}
1984
1985impl Debug for DataActorCore {
1986    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1987        f.debug_struct(stringify!(DataActorCore))
1988            .field("actor_id", &self.actor_id)
1989            .field("config", &self.config)
1990            .field("state", &self.state)
1991            .field("trader_id", &self.trader_id)
1992            .finish()
1993    }
1994}
1995
1996impl DataActorCore {
1997    /// Adds a subscription handler for the `topic`.
1998    ///
1999    //// Logs a warning if the actor is already subscribed to the topic.
2000    pub(crate) fn add_subscription(
2001        &mut self,
2002        topic: MStr<Topic>,
2003        handler: ShareableMessageHandler,
2004    ) {
2005        if self.topic_handlers.contains_key(&topic) {
2006            log::warn!(
2007                "Actor {} attempted duplicate subscription to topic '{topic}'",
2008                self.actor_id,
2009            );
2010            return;
2011        }
2012
2013        self.topic_handlers.insert(topic, handler.clone());
2014        msgbus::subscribe_topic(topic, handler, None);
2015    }
2016
2017    /// Removes a subscription handler for the `topic` if present.
2018    ///
2019    /// Logs a warning if the actor is not currently subscribed to the topic.
2020    pub(crate) fn remove_subscription(&mut self, topic: MStr<Topic>) {
2021        if let Some(handler) = self.topic_handlers.remove(&topic) {
2022            msgbus::unsubscribe_topic(topic, handler.clone());
2023        } else {
2024            log::warn!(
2025                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2026                self.actor_id,
2027            );
2028        }
2029    }
2030
2031    /// Creates a new [`DataActorCore`] instance.
2032    pub fn new(config: DataActorConfig) -> Self {
2033        let actor_id = config
2034            .actor_id
2035            .unwrap_or_else(|| Self::default_actor_id(&config));
2036
2037        Self {
2038            actor_id,
2039            config,
2040            trader_id: None, // None until registered
2041            clock: None,     // None until registered
2042            cache: None,     // None until registered
2043            state: ComponentState::default(),
2044            topic_handlers: AHashMap::new(),
2045            warning_events: AHashSet::new(),
2046            pending_requests: AHashMap::new(),
2047            signal_classes: AHashMap::new(),
2048            #[cfg(feature = "indicators")]
2049            indicators: Indicators::default(),
2050        }
2051    }
2052
2053    /// Returns the memory address of this instance as a hexadecimal string.
2054    #[must_use]
2055    pub fn mem_address(&self) -> String {
2056        format!("{self:p}")
2057    }
2058
2059    /// Returns the actors state.
2060    pub fn state(&self) -> ComponentState {
2061        self.state
2062    }
2063
2064    /// Returns the trader ID this actor is registered to.
2065    pub fn trader_id(&self) -> Option<TraderId> {
2066        self.trader_id
2067    }
2068
2069    /// Returns the actors ID.
2070    pub fn actor_id(&self) -> ActorId {
2071        self.actor_id
2072    }
2073
2074    fn default_actor_id(config: &DataActorConfig) -> ActorId {
2075        let memory_address = std::ptr::from_ref(config) as *const _ as usize;
2076        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2077    }
2078
2079    /// Returns a UNIX nanoseconds timestamp from the actors internal clock.
2080    pub fn timestamp_ns(&self) -> UnixNanos {
2081        self.clock_ref().timestamp_ns()
2082    }
2083
2084    /// Returns the clock for the actor (if registered).
2085    ///
2086    /// # Panics
2087    ///
2088    /// Panics if the actor has not been registered with a trader.
2089    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2090        self.clock
2091            .as_ref()
2092            .unwrap_or_else(|| {
2093                panic!(
2094                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2095                    self.actor_id, self.trader_id
2096                )
2097            })
2098            .borrow_mut()
2099    }
2100
2101    /// Returns a clone of the reference-counted clock.
2102    ///
2103    /// # Panics
2104    ///
2105    /// Panics if the actor has not yet been registered (clock is `None`).
2106    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2107        self.clock
2108            .as_ref()
2109            .expect("DataActor must be registered before accessing clock")
2110            .clone()
2111    }
2112
2113    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2114        self.clock
2115            .as_ref()
2116            .unwrap_or_else(|| {
2117                panic!(
2118                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2119                    self.actor_id, self.trader_id
2120                )
2121            })
2122            .borrow()
2123    }
2124
2125    /// Returns a read-only reference to the cache.
2126    ///
2127    /// # Panics
2128    ///
2129    /// Panics if the actor has not yet been registered (cache is `None`).
2130    pub fn cache(&self) -> Ref<'_, Cache> {
2131        self.cache
2132            .as_ref()
2133            .expect("DataActor must be registered before accessing cache")
2134            .borrow()
2135    }
2136
2137    /// Returns a clone of the reference-counted cache.
2138    ///
2139    /// # Panics
2140    ///
2141    /// Panics if the actor has not yet been registered (cache is `None`).
2142    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2143        self.cache
2144            .as_ref()
2145            .expect("DataActor must be registered before accessing cache")
2146            .clone()
2147    }
2148
2149    // -- REGISTRATION ----------------------------------------------------------------------------
2150
2151    /// Register the data actor with a trader.
2152    ///
2153    /// # Errors
2154    ///
2155    /// Returns an error if the actor has already been registered with a trader
2156    /// or if the provided dependencies are invalid.
2157    pub fn register(
2158        &mut self,
2159        trader_id: TraderId,
2160        clock: Rc<RefCell<dyn Clock>>,
2161        cache: Rc<RefCell<Cache>>,
2162    ) -> anyhow::Result<()> {
2163        if let Some(existing_trader_id) = self.trader_id {
2164            anyhow::bail!(
2165                "DataActor {} already registered with trader {existing_trader_id}",
2166                self.actor_id
2167            );
2168        }
2169
2170        // Validate clock by attempting to access it
2171        {
2172            let _timestamp = clock.borrow().timestamp_ns();
2173        }
2174
2175        // Validate cache by attempting to access it
2176        {
2177            let _cache_borrow = cache.borrow();
2178        }
2179
2180        self.trader_id = Some(trader_id);
2181        self.clock = Some(clock);
2182        self.cache = Some(cache);
2183
2184        // Verify complete registration
2185        if !self.is_properly_registered() {
2186            anyhow::bail!(
2187                "DataActor {} registration incomplete - validation failed",
2188                self.actor_id
2189            );
2190        }
2191
2192        log::info!("Registered {} with trader {trader_id}", self.actor_id);
2193        Ok(())
2194    }
2195
2196    /// Register an event type for warning log levels.
2197    pub fn register_warning_event(&mut self, event_type: &str) {
2198        self.warning_events.insert(event_type.to_string());
2199        log::debug!("Registered event type '{event_type}' for warning logs")
2200    }
2201
2202    /// Deregister an event type from warning log levels.
2203    pub fn deregister_warning_event(&mut self, event_type: &str) {
2204        self.warning_events.remove(event_type);
2205        log::debug!("Deregistered event type '{event_type}' from warning logs")
2206    }
2207
2208    pub fn is_registered(&self) -> bool {
2209        self.trader_id.is_some()
2210    }
2211
2212    pub(crate) fn check_registered(&self) {
2213        assert!(
2214            self.is_registered(),
2215            "Actor has not been registered with a Trader"
2216        );
2217    }
2218
2219    /// Validates registration state without panicking.
2220    fn is_properly_registered(&self) -> bool {
2221        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2222    }
2223
2224    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2225        if self.config.log_commands {
2226            log::info!("{CMD}{SEND} {command:?}");
2227        }
2228
2229        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2230        msgbus::send_any(endpoint, command.as_any())
2231    }
2232
2233    #[allow(dead_code, reason = "TODO: Under development")]
2234    fn send_data_req(&self, request: RequestCommand) {
2235        if self.config.log_commands {
2236            log::info!("{REQ}{SEND} {request:?}");
2237        }
2238
2239        // For now, simplified approach - data requests without dynamic handlers
2240        // TODO: Implement proper dynamic dispatch for response handlers
2241        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2242        msgbus::send_any(endpoint, request.as_any())
2243    }
2244
2245    /// Sends a shutdown command to the system with an optional reason.
2246    ///
2247    /// # Panics
2248    ///
2249    /// Panics if the actor is not registered or has no trader ID.
2250    pub fn shutdown_system(&self, reason: Option<String>) {
2251        self.check_registered();
2252
2253        // SAFETY: Checked registered before unwrapping trader ID
2254        let command = ShutdownSystem::new(
2255            self.trader_id().unwrap(),
2256            self.actor_id.inner(),
2257            reason,
2258            UUID4::new(),
2259            self.timestamp_ns(),
2260        );
2261
2262        let endpoint = "command.system.shutdown".into();
2263        msgbus::send_any(endpoint, command.as_any());
2264    }
2265
2266    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
2267
2268    /// Helper method for registering data subscriptions from the trait.
2269    ///
2270    /// # Panics
2271    ///
2272    /// Panics if the actor is not properly registered.
2273    pub fn subscribe_data(
2274        &mut self,
2275        handler: ShareableMessageHandler,
2276        data_type: DataType,
2277        client_id: Option<ClientId>,
2278        params: Option<IndexMap<String, String>>,
2279    ) {
2280        if !self.is_properly_registered() {
2281            panic!(
2282                "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2283                self.actor_id,
2284                self.trader_id,
2285                self.clock.is_some(),
2286                self.cache.is_some()
2287            );
2288        }
2289
2290        let topic = get_custom_topic(&data_type);
2291        self.add_subscription(topic, handler);
2292
2293        // If no client ID specified, just subscribe to the topic
2294        if client_id.is_none() {
2295            return;
2296        }
2297
2298        let command = SubscribeCommand::Data(SubscribeCustomData {
2299            data_type,
2300            client_id,
2301            venue: None,
2302            command_id: UUID4::new(),
2303            ts_init: self.timestamp_ns(),
2304            params,
2305        });
2306
2307        self.send_data_cmd(DataCommand::Subscribe(command));
2308    }
2309
2310    /// Helper method for registering quotes subscriptions from the trait.
2311    pub fn subscribe_quotes(
2312        &mut self,
2313        topic: MStr<Topic>,
2314        handler: ShareableMessageHandler,
2315        instrument_id: InstrumentId,
2316        client_id: Option<ClientId>,
2317        params: Option<IndexMap<String, String>>,
2318    ) {
2319        self.check_registered();
2320
2321        self.add_subscription(topic, handler);
2322
2323        let command = SubscribeCommand::Quotes(SubscribeQuotes {
2324            instrument_id,
2325            client_id,
2326            venue: Some(instrument_id.venue),
2327            command_id: UUID4::new(),
2328            ts_init: self.timestamp_ns(),
2329            params,
2330        });
2331
2332        self.send_data_cmd(DataCommand::Subscribe(command));
2333    }
2334
2335    /// Helper method for registering instruments subscriptions from the trait.
2336    pub fn subscribe_instruments(
2337        &mut self,
2338        topic: MStr<Topic>,
2339        handler: ShareableMessageHandler,
2340        venue: Venue,
2341        client_id: Option<ClientId>,
2342        params: Option<IndexMap<String, String>>,
2343    ) {
2344        self.check_registered();
2345
2346        self.add_subscription(topic, handler);
2347
2348        let command = SubscribeCommand::Instruments(SubscribeInstruments {
2349            client_id,
2350            venue,
2351            command_id: UUID4::new(),
2352            ts_init: self.timestamp_ns(),
2353            params,
2354        });
2355
2356        self.send_data_cmd(DataCommand::Subscribe(command));
2357    }
2358
2359    /// Helper method for registering instrument subscriptions from the trait.
2360    pub fn subscribe_instrument(
2361        &mut self,
2362        topic: MStr<Topic>,
2363        handler: ShareableMessageHandler,
2364        instrument_id: InstrumentId,
2365        client_id: Option<ClientId>,
2366        params: Option<IndexMap<String, String>>,
2367    ) {
2368        self.check_registered();
2369
2370        self.add_subscription(topic, handler);
2371
2372        let command = SubscribeCommand::Instrument(SubscribeInstrument {
2373            instrument_id,
2374            client_id,
2375            venue: Some(instrument_id.venue),
2376            command_id: UUID4::new(),
2377            ts_init: self.timestamp_ns(),
2378            params,
2379        });
2380
2381        self.send_data_cmd(DataCommand::Subscribe(command));
2382    }
2383
2384    /// Helper method for registering book deltas subscriptions from the trait.
2385    #[allow(clippy::too_many_arguments)]
2386    pub fn subscribe_book_deltas(
2387        &mut self,
2388        topic: MStr<Topic>,
2389        handler: ShareableMessageHandler,
2390        instrument_id: InstrumentId,
2391        book_type: BookType,
2392        depth: Option<NonZeroUsize>,
2393        client_id: Option<ClientId>,
2394        managed: bool,
2395        params: Option<IndexMap<String, String>>,
2396    ) {
2397        self.check_registered();
2398
2399        self.add_subscription(topic, handler);
2400
2401        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2402            instrument_id,
2403            book_type,
2404            client_id,
2405            venue: Some(instrument_id.venue),
2406            command_id: UUID4::new(),
2407            ts_init: self.timestamp_ns(),
2408            depth,
2409            managed,
2410            params,
2411        });
2412
2413        self.send_data_cmd(DataCommand::Subscribe(command));
2414    }
2415
2416    /// Helper method for registering book snapshots subscriptions from the trait.
2417    #[allow(clippy::too_many_arguments)]
2418    pub fn subscribe_book_at_interval(
2419        &mut self,
2420        topic: MStr<Topic>,
2421        handler: ShareableMessageHandler,
2422        instrument_id: InstrumentId,
2423        book_type: BookType,
2424        depth: Option<NonZeroUsize>,
2425        interval_ms: NonZeroUsize,
2426        client_id: Option<ClientId>,
2427        params: Option<IndexMap<String, String>>,
2428    ) {
2429        self.check_registered();
2430
2431        self.add_subscription(topic, handler);
2432
2433        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2434            instrument_id,
2435            book_type,
2436            client_id,
2437            venue: Some(instrument_id.venue),
2438            command_id: UUID4::new(),
2439            ts_init: self.timestamp_ns(),
2440            depth,
2441            interval_ms,
2442            params,
2443        });
2444
2445        self.send_data_cmd(DataCommand::Subscribe(command));
2446    }
2447
2448    /// Helper method for registering trades subscriptions from the trait.
2449    pub fn subscribe_trades(
2450        &mut self,
2451        topic: MStr<Topic>,
2452        handler: ShareableMessageHandler,
2453        instrument_id: InstrumentId,
2454        client_id: Option<ClientId>,
2455        params: Option<IndexMap<String, String>>,
2456    ) {
2457        self.check_registered();
2458
2459        self.add_subscription(topic, handler);
2460
2461        let command = SubscribeCommand::Trades(SubscribeTrades {
2462            instrument_id,
2463            client_id,
2464            venue: Some(instrument_id.venue),
2465            command_id: UUID4::new(),
2466            ts_init: self.timestamp_ns(),
2467            params,
2468        });
2469
2470        self.send_data_cmd(DataCommand::Subscribe(command));
2471    }
2472
2473    /// Helper method for registering bars subscriptions from the trait.
2474    pub fn subscribe_bars(
2475        &mut self,
2476        topic: MStr<Topic>,
2477        handler: ShareableMessageHandler,
2478        bar_type: BarType,
2479        client_id: Option<ClientId>,
2480        params: Option<IndexMap<String, String>>,
2481    ) {
2482        self.check_registered();
2483
2484        self.add_subscription(topic, handler);
2485
2486        let command = SubscribeCommand::Bars(SubscribeBars {
2487            bar_type,
2488            client_id,
2489            venue: Some(bar_type.instrument_id().venue),
2490            command_id: UUID4::new(),
2491            ts_init: self.timestamp_ns(),
2492            params,
2493        });
2494
2495        self.send_data_cmd(DataCommand::Subscribe(command));
2496    }
2497
2498    /// Helper method for registering mark prices subscriptions from the trait.
2499    pub fn subscribe_mark_prices(
2500        &mut self,
2501        topic: MStr<Topic>,
2502        handler: ShareableMessageHandler,
2503        instrument_id: InstrumentId,
2504        client_id: Option<ClientId>,
2505        params: Option<IndexMap<String, String>>,
2506    ) {
2507        self.check_registered();
2508
2509        self.add_subscription(topic, handler);
2510
2511        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2512            instrument_id,
2513            client_id,
2514            venue: Some(instrument_id.venue),
2515            command_id: UUID4::new(),
2516            ts_init: self.timestamp_ns(),
2517            params,
2518        });
2519
2520        self.send_data_cmd(DataCommand::Subscribe(command));
2521    }
2522
2523    /// Helper method for registering index prices subscriptions from the trait.
2524    pub fn subscribe_index_prices(
2525        &mut self,
2526        topic: MStr<Topic>,
2527        handler: ShareableMessageHandler,
2528        instrument_id: InstrumentId,
2529        client_id: Option<ClientId>,
2530        params: Option<IndexMap<String, String>>,
2531    ) {
2532        self.check_registered();
2533
2534        self.add_subscription(topic, handler);
2535
2536        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2537            instrument_id,
2538            client_id,
2539            venue: Some(instrument_id.venue),
2540            command_id: UUID4::new(),
2541            ts_init: self.timestamp_ns(),
2542            params,
2543        });
2544
2545        self.send_data_cmd(DataCommand::Subscribe(command));
2546    }
2547
2548    /// Helper method for registering funding rates subscriptions from the trait.
2549    pub fn subscribe_funding_rates(
2550        &mut self,
2551        topic: MStr<Topic>,
2552        handler: ShareableMessageHandler,
2553        instrument_id: InstrumentId,
2554        client_id: Option<ClientId>,
2555        params: Option<IndexMap<String, String>>,
2556    ) {
2557        self.check_registered();
2558
2559        self.add_subscription(topic, handler);
2560
2561        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
2562            instrument_id,
2563            client_id,
2564            venue: Some(instrument_id.venue),
2565            command_id: UUID4::new(),
2566            ts_init: self.timestamp_ns(),
2567            params,
2568        });
2569
2570        self.send_data_cmd(DataCommand::Subscribe(command));
2571    }
2572
2573    /// Helper method for registering instrument status subscriptions from the trait.
2574    pub fn subscribe_instrument_status(
2575        &mut self,
2576        topic: MStr<Topic>,
2577        handler: ShareableMessageHandler,
2578        instrument_id: InstrumentId,
2579        client_id: Option<ClientId>,
2580        params: Option<IndexMap<String, String>>,
2581    ) {
2582        self.check_registered();
2583
2584        self.add_subscription(topic, handler);
2585
2586        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2587            instrument_id,
2588            client_id,
2589            venue: Some(instrument_id.venue),
2590            command_id: UUID4::new(),
2591            ts_init: self.timestamp_ns(),
2592            params,
2593        });
2594
2595        self.send_data_cmd(DataCommand::Subscribe(command));
2596    }
2597
2598    /// Helper method for registering instrument close subscriptions from the trait.
2599    pub fn subscribe_instrument_close(
2600        &mut self,
2601        topic: MStr<Topic>,
2602        handler: ShareableMessageHandler,
2603        instrument_id: InstrumentId,
2604        client_id: Option<ClientId>,
2605        params: Option<IndexMap<String, String>>,
2606    ) {
2607        self.check_registered();
2608
2609        self.add_subscription(topic, handler);
2610
2611        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2612            instrument_id,
2613            client_id,
2614            venue: Some(instrument_id.venue),
2615            command_id: UUID4::new(),
2616            ts_init: self.timestamp_ns(),
2617            params,
2618        });
2619
2620        self.send_data_cmd(DataCommand::Subscribe(command));
2621    }
2622
2623    /// Helper method for registering order fills subscriptions from the trait.
2624    pub fn subscribe_order_fills(&mut self, topic: MStr<Topic>, handler: ShareableMessageHandler) {
2625        self.check_registered();
2626        self.add_subscription(topic, handler);
2627    }
2628
2629    /// Helper method for unsubscribing from data.
2630    pub fn unsubscribe_data(
2631        &mut self,
2632        data_type: DataType,
2633        client_id: Option<ClientId>,
2634        params: Option<IndexMap<String, String>>,
2635    ) {
2636        self.check_registered();
2637
2638        let topic = get_custom_topic(&data_type);
2639        self.remove_subscription(topic);
2640
2641        if client_id.is_none() {
2642            return;
2643        }
2644
2645        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2646            data_type,
2647            client_id,
2648            venue: None,
2649            command_id: UUID4::new(),
2650            ts_init: self.timestamp_ns(),
2651            params,
2652        });
2653
2654        self.send_data_cmd(DataCommand::Unsubscribe(command));
2655    }
2656
2657    /// Helper method for unsubscribing from instruments.
2658    pub fn unsubscribe_instruments(
2659        &mut self,
2660        venue: Venue,
2661        client_id: Option<ClientId>,
2662        params: Option<IndexMap<String, String>>,
2663    ) {
2664        self.check_registered();
2665
2666        let topic = get_instruments_topic(venue);
2667        self.remove_subscription(topic);
2668
2669        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2670            client_id,
2671            venue,
2672            command_id: UUID4::new(),
2673            ts_init: self.timestamp_ns(),
2674            params,
2675        });
2676
2677        self.send_data_cmd(DataCommand::Unsubscribe(command));
2678    }
2679
2680    /// Helper method for unsubscribing from instrument.
2681    pub fn unsubscribe_instrument(
2682        &mut self,
2683        instrument_id: InstrumentId,
2684        client_id: Option<ClientId>,
2685        params: Option<IndexMap<String, String>>,
2686    ) {
2687        self.check_registered();
2688
2689        let topic = get_instrument_topic(instrument_id);
2690        self.remove_subscription(topic);
2691
2692        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2693            instrument_id,
2694            client_id,
2695            venue: Some(instrument_id.venue),
2696            command_id: UUID4::new(),
2697            ts_init: self.timestamp_ns(),
2698            params,
2699        });
2700
2701        self.send_data_cmd(DataCommand::Unsubscribe(command));
2702    }
2703
2704    /// Helper method for unsubscribing from book deltas.
2705    pub fn unsubscribe_book_deltas(
2706        &mut self,
2707        instrument_id: InstrumentId,
2708        client_id: Option<ClientId>,
2709        params: Option<IndexMap<String, String>>,
2710    ) {
2711        self.check_registered();
2712
2713        let topic = get_book_deltas_topic(instrument_id);
2714        self.remove_subscription(topic);
2715
2716        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2717            instrument_id,
2718            client_id,
2719            venue: Some(instrument_id.venue),
2720            command_id: UUID4::new(),
2721            ts_init: self.timestamp_ns(),
2722            params,
2723        });
2724
2725        self.send_data_cmd(DataCommand::Unsubscribe(command));
2726    }
2727
2728    /// Helper method for unsubscribing from book snapshots at interval.
2729    pub fn unsubscribe_book_at_interval(
2730        &mut self,
2731        instrument_id: InstrumentId,
2732        interval_ms: NonZeroUsize,
2733        client_id: Option<ClientId>,
2734        params: Option<IndexMap<String, String>>,
2735    ) {
2736        self.check_registered();
2737
2738        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
2739        self.remove_subscription(topic);
2740
2741        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2742            instrument_id,
2743            client_id,
2744            venue: Some(instrument_id.venue),
2745            command_id: UUID4::new(),
2746            ts_init: self.timestamp_ns(),
2747            params,
2748        });
2749
2750        self.send_data_cmd(DataCommand::Unsubscribe(command));
2751    }
2752
2753    /// Helper method for unsubscribing from quotes.
2754    pub fn unsubscribe_quotes(
2755        &mut self,
2756        instrument_id: InstrumentId,
2757        client_id: Option<ClientId>,
2758        params: Option<IndexMap<String, String>>,
2759    ) {
2760        self.check_registered();
2761
2762        let topic = get_quotes_topic(instrument_id);
2763        self.remove_subscription(topic);
2764
2765        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2766            instrument_id,
2767            client_id,
2768            venue: Some(instrument_id.venue),
2769            command_id: UUID4::new(),
2770            ts_init: self.timestamp_ns(),
2771            params,
2772        });
2773
2774        self.send_data_cmd(DataCommand::Unsubscribe(command));
2775    }
2776
2777    /// Helper method for unsubscribing from trades.
2778    pub fn unsubscribe_trades(
2779        &mut self,
2780        instrument_id: InstrumentId,
2781        client_id: Option<ClientId>,
2782        params: Option<IndexMap<String, String>>,
2783    ) {
2784        self.check_registered();
2785
2786        let topic = get_trades_topic(instrument_id);
2787        self.remove_subscription(topic);
2788
2789        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2790            instrument_id,
2791            client_id,
2792            venue: Some(instrument_id.venue),
2793            command_id: UUID4::new(),
2794            ts_init: self.timestamp_ns(),
2795            params,
2796        });
2797
2798        self.send_data_cmd(DataCommand::Unsubscribe(command));
2799    }
2800
2801    /// Helper method for unsubscribing from bars.
2802    pub fn unsubscribe_bars(
2803        &mut self,
2804        bar_type: BarType,
2805        client_id: Option<ClientId>,
2806        params: Option<IndexMap<String, String>>,
2807    ) {
2808        self.check_registered();
2809
2810        let topic = get_bars_topic(bar_type);
2811        self.remove_subscription(topic);
2812
2813        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2814            bar_type,
2815            client_id,
2816            venue: Some(bar_type.instrument_id().venue),
2817            command_id: UUID4::new(),
2818            ts_init: self.timestamp_ns(),
2819            params,
2820        });
2821
2822        self.send_data_cmd(DataCommand::Unsubscribe(command));
2823    }
2824
2825    /// Helper method for unsubscribing from mark prices.
2826    pub fn unsubscribe_mark_prices(
2827        &mut self,
2828        instrument_id: InstrumentId,
2829        client_id: Option<ClientId>,
2830        params: Option<IndexMap<String, String>>,
2831    ) {
2832        self.check_registered();
2833
2834        let topic = get_mark_price_topic(instrument_id);
2835        self.remove_subscription(topic);
2836
2837        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2838            instrument_id,
2839            client_id,
2840            venue: Some(instrument_id.venue),
2841            command_id: UUID4::new(),
2842            ts_init: self.timestamp_ns(),
2843            params,
2844        });
2845
2846        self.send_data_cmd(DataCommand::Unsubscribe(command));
2847    }
2848
2849    /// Helper method for unsubscribing from index prices.
2850    pub fn unsubscribe_index_prices(
2851        &mut self,
2852        instrument_id: InstrumentId,
2853        client_id: Option<ClientId>,
2854        params: Option<IndexMap<String, String>>,
2855    ) {
2856        self.check_registered();
2857
2858        let topic = get_index_price_topic(instrument_id);
2859        self.remove_subscription(topic);
2860
2861        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2862            instrument_id,
2863            client_id,
2864            venue: Some(instrument_id.venue),
2865            command_id: UUID4::new(),
2866            ts_init: self.timestamp_ns(),
2867            params,
2868        });
2869
2870        self.send_data_cmd(DataCommand::Unsubscribe(command));
2871    }
2872
2873    /// Helper method for unsubscribing from funding rates.
2874    pub fn unsubscribe_funding_rates(
2875        &mut self,
2876        instrument_id: InstrumentId,
2877        client_id: Option<ClientId>,
2878        params: Option<IndexMap<String, String>>,
2879    ) {
2880        self.check_registered();
2881
2882        let topic = get_funding_rate_topic(instrument_id);
2883        self.remove_subscription(topic);
2884
2885        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
2886            instrument_id,
2887            client_id,
2888            venue: Some(instrument_id.venue),
2889            command_id: UUID4::new(),
2890            ts_init: self.timestamp_ns(),
2891            params,
2892        });
2893
2894        self.send_data_cmd(DataCommand::Unsubscribe(command));
2895    }
2896
2897    /// Helper method for unsubscribing from instrument status.
2898    pub fn unsubscribe_instrument_status(
2899        &mut self,
2900        instrument_id: InstrumentId,
2901        client_id: Option<ClientId>,
2902        params: Option<IndexMap<String, String>>,
2903    ) {
2904        self.check_registered();
2905
2906        let topic = get_instrument_status_topic(instrument_id);
2907        self.remove_subscription(topic);
2908
2909        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2910            instrument_id,
2911            client_id,
2912            venue: Some(instrument_id.venue),
2913            command_id: UUID4::new(),
2914            ts_init: self.timestamp_ns(),
2915            params,
2916        });
2917
2918        self.send_data_cmd(DataCommand::Unsubscribe(command));
2919    }
2920
2921    /// Helper method for unsubscribing from instrument close.
2922    pub fn unsubscribe_instrument_close(
2923        &mut self,
2924        instrument_id: InstrumentId,
2925        client_id: Option<ClientId>,
2926        params: Option<IndexMap<String, String>>,
2927    ) {
2928        self.check_registered();
2929
2930        let topic = get_instrument_close_topic(instrument_id);
2931        self.remove_subscription(topic);
2932
2933        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
2934            instrument_id,
2935            client_id,
2936            venue: Some(instrument_id.venue),
2937            command_id: UUID4::new(),
2938            ts_init: self.timestamp_ns(),
2939            params,
2940        });
2941
2942        self.send_data_cmd(DataCommand::Unsubscribe(command));
2943    }
2944
2945    /// Helper method for unsubscribing from order fills.
2946    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
2947        self.check_registered();
2948
2949        let topic = get_order_fills_topic(instrument_id);
2950        self.remove_subscription(topic);
2951    }
2952
2953    /// Helper method for requesting data.
2954    ///
2955    /// # Errors
2956    ///
2957    /// Returns an error if input parameters are invalid.
2958    #[allow(clippy::too_many_arguments)]
2959    pub fn request_data(
2960        &self,
2961        data_type: DataType,
2962        client_id: ClientId,
2963        start: Option<DateTime<Utc>>,
2964        end: Option<DateTime<Utc>>,
2965        limit: Option<NonZeroUsize>,
2966        params: Option<IndexMap<String, String>>,
2967        handler: ShareableMessageHandler,
2968    ) -> anyhow::Result<UUID4> {
2969        self.check_registered();
2970
2971        let now = self.clock_ref().utc_now();
2972        check_timestamps(now, start, end)?;
2973
2974        let request_id = UUID4::new();
2975        let command = RequestCommand::Data(RequestCustomData {
2976            client_id,
2977            data_type,
2978            start,
2979            end,
2980            limit,
2981            request_id,
2982            ts_init: self.timestamp_ns(),
2983            params,
2984        });
2985
2986        get_message_bus()
2987            .borrow_mut()
2988            .register_response_handler(command.request_id(), handler)?;
2989
2990        self.send_data_cmd(DataCommand::Request(command));
2991
2992        Ok(request_id)
2993    }
2994
2995    /// Helper method for requesting instrument.
2996    ///
2997    /// # Errors
2998    ///
2999    /// Returns an error if input parameters are invalid.
3000    pub fn request_instrument(
3001        &self,
3002        instrument_id: InstrumentId,
3003        start: Option<DateTime<Utc>>,
3004        end: Option<DateTime<Utc>>,
3005        client_id: Option<ClientId>,
3006        params: Option<IndexMap<String, String>>,
3007        handler: ShareableMessageHandler,
3008    ) -> anyhow::Result<UUID4> {
3009        self.check_registered();
3010
3011        let now = self.clock_ref().utc_now();
3012        check_timestamps(now, start, end)?;
3013
3014        let request_id = UUID4::new();
3015        let command = RequestCommand::Instrument(RequestInstrument {
3016            instrument_id,
3017            start,
3018            end,
3019            client_id,
3020            request_id,
3021            ts_init: now.into(),
3022            params,
3023        });
3024
3025        get_message_bus()
3026            .borrow_mut()
3027            .register_response_handler(command.request_id(), handler)?;
3028
3029        self.send_data_cmd(DataCommand::Request(command));
3030
3031        Ok(request_id)
3032    }
3033
3034    /// Helper method for requesting instruments.
3035    ///
3036    /// # Errors
3037    ///
3038    /// Returns an error if input parameters are invalid.
3039    pub fn request_instruments(
3040        &self,
3041        venue: Option<Venue>,
3042        start: Option<DateTime<Utc>>,
3043        end: Option<DateTime<Utc>>,
3044        client_id: Option<ClientId>,
3045        params: Option<IndexMap<String, String>>,
3046        handler: ShareableMessageHandler,
3047    ) -> anyhow::Result<UUID4> {
3048        self.check_registered();
3049
3050        let now = self.clock_ref().utc_now();
3051        check_timestamps(now, start, end)?;
3052
3053        let request_id = UUID4::new();
3054        let command = RequestCommand::Instruments(RequestInstruments {
3055            venue,
3056            start,
3057            end,
3058            client_id,
3059            request_id,
3060            ts_init: now.into(),
3061            params,
3062        });
3063
3064        get_message_bus()
3065            .borrow_mut()
3066            .register_response_handler(command.request_id(), handler)?;
3067
3068        self.send_data_cmd(DataCommand::Request(command));
3069
3070        Ok(request_id)
3071    }
3072
3073    /// Helper method for requesting book snapshot.
3074    ///
3075    /// # Errors
3076    ///
3077    /// Returns an error if input parameters are invalid.
3078    pub fn request_book_snapshot(
3079        &self,
3080        instrument_id: InstrumentId,
3081        depth: Option<NonZeroUsize>,
3082        client_id: Option<ClientId>,
3083        params: Option<IndexMap<String, String>>,
3084        handler: ShareableMessageHandler,
3085    ) -> anyhow::Result<UUID4> {
3086        self.check_registered();
3087
3088        let request_id = UUID4::new();
3089        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3090            instrument_id,
3091            depth,
3092            client_id,
3093            request_id,
3094            ts_init: self.timestamp_ns(),
3095            params,
3096        });
3097
3098        get_message_bus()
3099            .borrow_mut()
3100            .register_response_handler(command.request_id(), handler)?;
3101
3102        self.send_data_cmd(DataCommand::Request(command));
3103
3104        Ok(request_id)
3105    }
3106
3107    /// Helper method for requesting quotes.
3108    ///
3109    /// # Errors
3110    ///
3111    /// Returns an error if input parameters are invalid.
3112    #[allow(clippy::too_many_arguments)]
3113    pub fn request_quotes(
3114        &self,
3115        instrument_id: InstrumentId,
3116        start: Option<DateTime<Utc>>,
3117        end: Option<DateTime<Utc>>,
3118        limit: Option<NonZeroUsize>,
3119        client_id: Option<ClientId>,
3120        params: Option<IndexMap<String, String>>,
3121        handler: ShareableMessageHandler,
3122    ) -> anyhow::Result<UUID4> {
3123        self.check_registered();
3124
3125        let now = self.clock_ref().utc_now();
3126        check_timestamps(now, start, end)?;
3127
3128        let request_id = UUID4::new();
3129        let command = RequestCommand::Quotes(RequestQuotes {
3130            instrument_id,
3131            start,
3132            end,
3133            limit,
3134            client_id,
3135            request_id,
3136            ts_init: now.into(),
3137            params,
3138        });
3139
3140        get_message_bus()
3141            .borrow_mut()
3142            .register_response_handler(command.request_id(), handler)?;
3143
3144        self.send_data_cmd(DataCommand::Request(command));
3145
3146        Ok(request_id)
3147    }
3148
3149    /// Helper method for requesting trades.
3150    ///
3151    /// # Errors
3152    ///
3153    /// Returns an error if input parameters are invalid.
3154    #[allow(clippy::too_many_arguments)]
3155    pub fn request_trades(
3156        &self,
3157        instrument_id: InstrumentId,
3158        start: Option<DateTime<Utc>>,
3159        end: Option<DateTime<Utc>>,
3160        limit: Option<NonZeroUsize>,
3161        client_id: Option<ClientId>,
3162        params: Option<IndexMap<String, String>>,
3163        handler: ShareableMessageHandler,
3164    ) -> anyhow::Result<UUID4> {
3165        self.check_registered();
3166
3167        let now = self.clock_ref().utc_now();
3168        check_timestamps(now, start, end)?;
3169
3170        let request_id = UUID4::new();
3171        let command = RequestCommand::Trades(RequestTrades {
3172            instrument_id,
3173            start,
3174            end,
3175            limit,
3176            client_id,
3177            request_id,
3178            ts_init: now.into(),
3179            params,
3180        });
3181
3182        get_message_bus()
3183            .borrow_mut()
3184            .register_response_handler(command.request_id(), handler)?;
3185
3186        self.send_data_cmd(DataCommand::Request(command));
3187
3188        Ok(request_id)
3189    }
3190
3191    /// Helper method for requesting bars.
3192    ///
3193    /// # Errors
3194    ///
3195    /// Returns an error if input parameters are invalid.
3196    #[allow(clippy::too_many_arguments)]
3197    pub fn request_bars(
3198        &self,
3199        bar_type: BarType,
3200        start: Option<DateTime<Utc>>,
3201        end: Option<DateTime<Utc>>,
3202        limit: Option<NonZeroUsize>,
3203        client_id: Option<ClientId>,
3204        params: Option<IndexMap<String, String>>,
3205        handler: ShareableMessageHandler,
3206    ) -> anyhow::Result<UUID4> {
3207        self.check_registered();
3208
3209        let now = self.clock_ref().utc_now();
3210        check_timestamps(now, start, end)?;
3211
3212        let request_id = UUID4::new();
3213        let command = RequestCommand::Bars(RequestBars {
3214            bar_type,
3215            start,
3216            end,
3217            limit,
3218            client_id,
3219            request_id,
3220            ts_init: now.into(),
3221            params,
3222        });
3223
3224        get_message_bus()
3225            .borrow_mut()
3226            .register_response_handler(command.request_id(), handler)?;
3227
3228        self.send_data_cmd(DataCommand::Request(command));
3229
3230        Ok(request_id)
3231    }
3232}
3233
3234fn check_timestamps(
3235    now: DateTime<Utc>,
3236    start: Option<DateTime<Utc>>,
3237    end: Option<DateTime<Utc>>,
3238) -> anyhow::Result<()> {
3239    if let Some(start) = start {
3240        check_predicate_true(start <= now, "start was > now")?
3241    }
3242    if let Some(end) = end {
3243        check_predicate_true(end <= now, "end was > now")?
3244    }
3245
3246    if let (Some(start), Some(end)) = (start, end) {
3247        check_predicate_true(start < end, "start was >= end")?
3248    }
3249
3250    Ok(())
3251}
3252
3253fn log_error(e: &anyhow::Error) {
3254    log::error!("{e}");
3255}
3256
3257fn log_not_running<T>(msg: &T)
3258where
3259    T: Debug,
3260{
3261    // TODO: Potentially temporary for development? drop level at some stage
3262    log::warn!("Received message when not running - skipping {msg:?}");
3263}
3264
3265fn log_received<T>(msg: &T)
3266where
3267    T: Debug,
3268{
3269    log::debug!("{RECV} {msg:?}");
3270}