nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35use std::{
36    any::Any,
37    cell::{Ref, RefCell},
38    collections::hash_map::Entry,
39    fmt::Display,
40    num::NonZeroUsize,
41    rc::Rc,
42};
43
44use ahash::{AHashMap, AHashSet};
45#[cfg(feature = "defi")]
46use alloy_primitives::Address;
47use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
48use config::DataEngineConfig;
49use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
50use indexmap::IndexMap;
51#[cfg(feature = "defi")]
52use nautilus_common::messages::defi::{DefiSubscribeCommand, DefiUnsubscribeCommand};
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64    timer::TimeEventCallback,
65};
66use nautilus_core::{
67    correctness::{
68        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69    },
70    datetime::millis_to_nanos,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::Blockchain;
74#[cfg(feature = "defi")]
75use nautilus_model::defi::DefiData;
76use nautilus_model::{
77    data::{
78        Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
79        TradeTick,
80        close::InstrumentClose,
81        prices::{IndexPriceUpdate, MarkPriceUpdate},
82    },
83    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
84    identifiers::{ClientId, InstrumentId, Venue},
85    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
86    orderbook::OrderBook,
87};
88use nautilus_persistence::backend::catalog::ParquetDataCatalog;
89use ustr::Ustr;
90
91use crate::{
92    aggregation::{
93        BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
94        VolumeBarAggregator,
95    },
96    client::DataClientAdapter,
97};
98
99/// Provides a high-performance `DataEngine` for all environments.
100#[derive(Debug)]
101pub struct DataEngine {
102    clock: Rc<RefCell<dyn Clock>>,
103    cache: Rc<RefCell<Cache>>,
104    clients: IndexMap<ClientId, DataClientAdapter>,
105    default_client: Option<DataClientAdapter>,
106    external_clients: AHashSet<ClientId>,
107    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108    routing_map: IndexMap<Venue, ClientId>,
109    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117    msgbus_priority: u8,
118    config: DataEngineConfig,
119}
120
121impl DataEngine {
122    /// Creates a new [`DataEngine`] instance.
123    #[must_use]
124    pub fn new(
125        clock: Rc<RefCell<dyn Clock>>,
126        cache: Rc<RefCell<Cache>>,
127        config: Option<DataEngineConfig>,
128    ) -> Self {
129        let config = config.unwrap_or_default();
130
131        let external_clients: AHashSet<ClientId> = config
132            .external_clients
133            .clone()
134            .unwrap_or_default()
135            .into_iter()
136            .collect();
137
138        Self {
139            clock,
140            cache,
141            clients: IndexMap::new(),
142            default_client: None,
143            external_clients,
144            catalogs: AHashMap::new(),
145            routing_map: IndexMap::new(),
146            book_intervals: AHashMap::new(),
147            book_updaters: AHashMap::new(),
148            book_snapshotters: AHashMap::new(),
149            bar_aggregators: AHashMap::new(),
150            bar_aggregator_handlers: AHashMap::new(),
151            _synthetic_quote_feeds: AHashMap::new(),
152            _synthetic_trade_feeds: AHashMap::new(),
153            buffered_deltas_map: AHashMap::new(),
154            msgbus_priority: 10, // High-priority for built-in component
155            config,
156        }
157    }
158
159    /// Returns a read-only reference to the engines clock.
160    #[must_use]
161    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
162        self.clock.borrow()
163    }
164
165    /// Returns a read-only reference to the engines cache.
166    #[must_use]
167    pub fn get_cache(&self) -> Ref<'_, Cache> {
168        self.cache.borrow()
169    }
170
171    /// Registers the `catalog` with the engine with an optional specific `name`.
172    ///
173    /// # Panics
174    ///
175    /// Panics if a catalog with the same `name` has already been registered.
176    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
177        let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
178
179        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
180
181        self.catalogs.insert(name, catalog);
182        log::info!("Registered catalog <{name}>");
183    }
184
185    /// Registers the `client` with the engine with an optional venue `routing`.
186    ///
187    ///
188    /// # Panics
189    ///
190    /// Panics if a client with the same client ID has already been registered.
191    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
192        let client_id = client.client_id();
193
194        if let Some(default_client) = &self.default_client {
195            check_predicate_false(
196                default_client.client_id() == client.client_id(),
197                "client_id already registered as default client",
198            )
199            .expect(FAILED);
200        }
201
202        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
203
204        if let Some(routing) = routing {
205            self.routing_map.insert(routing, client_id);
206            log::info!("Set client {client_id} routing for {routing}");
207        }
208
209        if client.venue.is_none() && self.default_client.is_none() {
210            self.default_client = Some(client);
211            log::info!("Registered client {client_id} for default routing");
212        } else {
213            self.clients.insert(client_id, client);
214            log::info!("Registered client {client_id}");
215        }
216    }
217
218    /// Deregisters the client for the `client_id`.
219    ///
220    /// # Panics
221    ///
222    /// Panics if the client ID has not been registered.
223    pub fn deregister_client(&mut self, client_id: &ClientId) {
224        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
225
226        self.clients.shift_remove(client_id);
227        log::info!("Deregistered client {client_id}");
228    }
229
230    /// Registers the data `client` with the engine as the default routing client.
231    ///
232    /// When a specific venue routing cannot be found, this client will receive messages.
233    ///
234    /// # Warnings
235    ///
236    /// Any existing default routing client will be overwritten.
237    ///
238    /// # Panics
239    ///
240    /// Panics if a default client has already been registered.
241    pub fn register_default_client(&mut self, client: DataClientAdapter) {
242        check_predicate_true(
243            self.default_client.is_none(),
244            "default client already registered",
245        )
246        .expect(FAILED);
247
248        let client_id = client.client_id();
249
250        self.default_client = Some(client);
251        log::info!("Registered default client {client_id}");
252    }
253
254    /// Starts all registered data clients.
255    pub fn start(&mut self) {
256        for client in self.get_clients_mut() {
257            if let Err(e) = client.start() {
258                log::error!("{e}");
259            }
260        }
261    }
262
263    /// Stops all registered data clients.
264    pub fn stop(&mut self) {
265        for client in self.get_clients_mut() {
266            if let Err(e) = client.stop() {
267                log::error!("{e}");
268            }
269        }
270    }
271
272    /// Resets all registered data clients to their initial state.
273    pub fn reset(&mut self) {
274        for client in self.get_clients_mut() {
275            if let Err(e) = client.reset() {
276                log::error!("{e}");
277            }
278        }
279    }
280
281    /// Disposes the engine, stopping all clients and cancelling any timers.
282    pub fn dispose(&mut self) {
283        for client in self.get_clients_mut() {
284            if let Err(e) = client.dispose() {
285                log::error!("{e}");
286            }
287        }
288
289        self.clock.borrow_mut().cancel_timers();
290    }
291
292    /// Returns `true` if all registered data clients are currently connected.
293    #[must_use]
294    pub fn check_connected(&self) -> bool {
295        self.get_clients()
296            .iter()
297            .all(|client| client.is_connected())
298    }
299
300    /// Returns `true` if all registered data clients are currently disconnected.
301    #[must_use]
302    pub fn check_disconnected(&self) -> bool {
303        self.get_clients()
304            .iter()
305            .all(|client| !client.is_connected())
306    }
307
308    /// Returns a list of all registered client IDs, including the default client if set.
309    #[must_use]
310    pub fn registered_clients(&self) -> Vec<ClientId> {
311        self.get_clients()
312            .into_iter()
313            .map(|client| client.client_id())
314            .collect()
315    }
316
317    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
318
319    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
320    where
321        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
322        T: Clone,
323    {
324        self.get_clients()
325            .into_iter()
326            .flat_map(get_subs)
327            .cloned()
328            .collect()
329    }
330
331    #[must_use]
332    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
333        let (default_opt, clients_map) = (&self.default_client, &self.clients);
334        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
335
336        if let Some(default) = default_opt {
337            clients.push(default);
338        }
339
340        clients
341    }
342
343    #[must_use]
344    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
345        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
346        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
347
348        if let Some(default) = default_opt {
349            clients.push(default);
350        }
351
352        clients
353    }
354
355    pub fn get_client(
356        &mut self,
357        client_id: Option<&ClientId>,
358        venue: Option<&Venue>,
359    ) -> Option<&mut DataClientAdapter> {
360        if let Some(client_id) = client_id {
361            // Explicit ID: first look in registered clients
362            if let Some(client) = self.clients.get_mut(client_id) {
363                return Some(client);
364            }
365
366            // Then check if it matches the default client
367            if let Some(default) = self.default_client.as_mut()
368                && default.client_id() == *client_id
369            {
370                return Some(default);
371            }
372
373            // Unknown explicit client
374            return None;
375        }
376
377        if let Some(v) = venue {
378            // Route by venue if mapped client still registered
379            if let Some(client_id) = self.routing_map.get(v) {
380                return self.clients.get_mut(client_id);
381            }
382        }
383
384        // Fallback to default client
385        self.get_default_client()
386    }
387
388    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
389        self.default_client.as_mut()
390    }
391
392    /// Returns all custom data types currently subscribed across all clients.
393    #[must_use]
394    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
395        self.collect_subscriptions(|client| &client.subscriptions_custom)
396    }
397
398    /// Returns all instrument IDs currently subscribed across all clients.
399    #[must_use]
400    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
401        self.collect_subscriptions(|client| &client.subscriptions_instrument)
402    }
403
404    /// Returns all instrument IDs for which book delta subscriptions exist.
405    #[must_use]
406    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
407        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
408    }
409
410    /// Returns all instrument IDs for which book snapshot subscriptions exist.
411    #[must_use]
412    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
413        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
414    }
415
416    /// Returns all instrument IDs for which quote subscriptions exist.
417    #[must_use]
418    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
419        self.collect_subscriptions(|client| &client.subscriptions_quotes)
420    }
421
422    /// Returns all instrument IDs for which trade subscriptions exist.
423    #[must_use]
424    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
425        self.collect_subscriptions(|client| &client.subscriptions_trades)
426    }
427
428    /// Returns all bar types currently subscribed across all clients.
429    #[must_use]
430    pub fn subscribed_bars(&self) -> Vec<BarType> {
431        self.collect_subscriptions(|client| &client.subscriptions_bars)
432    }
433
434    /// Returns all instrument IDs for which mark price subscriptions exist.
435    #[must_use]
436    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
437        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
438    }
439
440    /// Returns all instrument IDs for which index price subscriptions exist.
441    #[must_use]
442    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
443        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
444    }
445
446    /// Returns all instrument IDs for which status subscriptions exist.
447    #[must_use]
448    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
449        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
450    }
451
452    /// Returns all instrument IDs for which instrument close subscriptions exist.
453    #[must_use]
454    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
455        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
456    }
457
458    #[cfg(feature = "defi")]
459    /// Returns all blockchains for which blocks subscriptions exist.
460    #[must_use]
461    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
462        self.collect_subscriptions(|client| &client.subscriptions_blocks)
463    }
464
465    #[cfg(feature = "defi")]
466    /// Returns all pool addresses for which pool subscriptions exist.
467    #[must_use]
468    pub fn subscribed_pools(&self) -> Vec<Address> {
469        self.collect_subscriptions(|client| &client.subscriptions_pools)
470    }
471
472    #[cfg(feature = "defi")]
473    /// Returns all pool addresses for which swap subscriptions exist.
474    #[must_use]
475    pub fn subscribed_pool_swaps(&self) -> Vec<Address> {
476        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
477    }
478
479    #[cfg(feature = "defi")]
480    /// Returns all pool addresses for which liquidity update subscriptions exist.
481    #[must_use]
482    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<Address> {
483        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
484    }
485
486    // -- COMMANDS --------------------------------------------------------------------------------
487
488    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
489    ///
490    /// Errors during execution are logged.
491    pub fn execute(&mut self, cmd: &DataCommand) {
492        if let Err(e) = match cmd {
493            DataCommand::Subscribe(c) => self.execute_subscribe(c),
494            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
495            DataCommand::Request(c) => self.execute_request(c),
496            #[cfg(feature = "defi")]
497            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
498            #[cfg(feature = "defi")]
499            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
500            _ => {
501                log::warn!("Unhandled DataCommand variant: {cmd:?}");
502                Ok(())
503            }
504        } {
505            log::error!("{e}");
506        }
507    }
508
509    /// Handles a subscribe command, updating internal state and forwarding to the client.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
514    /// or if the underlying client operation fails.
515    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
516        // Update internal engine state
517        match &cmd {
518            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
519            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
520            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
521            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
522            _ => {} // Do nothing else
523        }
524
525        // Check if client declared as external
526        if let Some(client_id) = cmd.client_id()
527            && self.external_clients.contains(client_id)
528        {
529            return Ok(());
530        }
531
532        // Forward command to client
533        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
534            client.execute_subscribe(cmd);
535        } else {
536            log::error!(
537                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
538                cmd.client_id(),
539                cmd.venue(),
540            );
541        }
542
543        Ok(())
544    }
545
546    #[cfg(feature = "defi")]
547    /// Handles a subscribe command, updating internal state and forwarding to the client.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
552    /// or if the underlying client operation fails.
553    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
554        // Check if client declared as external
555        if let Some(client_id) = cmd.client_id()
556            && self.external_clients.contains(client_id)
557        {
558            return Ok(());
559        }
560
561        // Forward command to client
562        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
563            client.execute_defi_subscribe(cmd);
564        } else {
565            log::error!(
566                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
567                cmd.client_id(),
568                cmd.venue(),
569            );
570        }
571
572        Ok(())
573    }
574
575    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if the underlying client operation fails.
580    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
581        match &cmd {
582            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
583            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
584            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
585            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
586            _ => {} // Do nothing else
587        }
588
589        // Check if client declared as external
590        if let Some(client_id) = cmd.client_id()
591            && self.external_clients.contains(client_id)
592        {
593            return Ok(());
594        }
595
596        // Forward command to the client
597        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
598            client.execute_unsubscribe(cmd);
599        } else {
600            log::error!(
601                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
602                cmd.client_id(),
603                cmd.venue(),
604            );
605        }
606
607        Ok(())
608    }
609
610    #[cfg(feature = "defi")]
611    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
612    ///
613    /// # Errors
614    ///
615    /// Returns an error if the underlying client operation fails.
616    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
617        // Check if client declared as external
618        if let Some(client_id) = cmd.client_id()
619            && self.external_clients.contains(client_id)
620        {
621            return Ok(());
622        }
623
624        // Forward command to the client
625        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
626            client.execute_defi_unsubscribe(cmd);
627        } else {
628            log::error!(
629                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
630                cmd.client_id(),
631                cmd.venue(),
632            );
633        }
634
635        Ok(())
636    }
637
638    /// Sends a [`RequestCommand`] to a suitable data client implementation.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if no client is found for the given client ID or venue,
643    /// or if the client fails to process the request.
644    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
645        // Skip requests for external clients
646        if let Some(cid) = req.client_id()
647            && self.external_clients.contains(cid)
648        {
649            return Ok(());
650        }
651        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
652            match req {
653                RequestCommand::Data(req) => client.request_data(req),
654                RequestCommand::Instrument(req) => client.request_instrument(req),
655                RequestCommand::Instruments(req) => client.request_instruments(req),
656                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
657                RequestCommand::Quotes(req) => client.request_quotes(req),
658                RequestCommand::Trades(req) => client.request_trades(req),
659                RequestCommand::Bars(req) => client.request_bars(req),
660            }
661        } else {
662            anyhow::bail!(
663                "Cannot handle request: no client found for {:?} {:?}",
664                req.client_id(),
665                req.venue()
666            );
667        }
668    }
669
670    /// Processes a dynamically-typed data message.
671    ///
672    /// Currently supports `InstrumentAny`; unrecognized types are logged as errors.
673    pub fn process(&mut self, data: &dyn Any) {
674        // TODO: Eventually these could be added to the `Data` enum? process here for now
675        if let Some(data) = data.downcast_ref::<Data>() {
676            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
677            return;
678        }
679
680        #[cfg(feature = "defi")]
681        if let Some(data) = data.downcast_ref::<DefiData>() {
682            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
683            return;
684        }
685
686        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
687            self.handle_instrument(instrument.clone());
688        } else {
689            log::error!("Cannot process data {data:?}, type is unrecognized");
690        }
691    }
692
693    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
694    pub fn process_data(&mut self, data: Data) {
695        match data {
696            Data::Delta(delta) => self.handle_delta(delta),
697            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
698            Data::Depth10(depth) => self.handle_depth10(*depth),
699            Data::Quote(quote) => self.handle_quote(quote),
700            Data::Trade(trade) => self.handle_trade(trade),
701            Data::Bar(bar) => self.handle_bar(bar),
702            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
703            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
704            Data::InstrumentClose(close) => self.handle_instrument_close(close),
705        }
706    }
707
708    /// Processes DeFi-specific data events.
709    #[cfg(feature = "defi")]
710    pub fn process_defi_data(&mut self, data: DefiData) {
711        match data {
712            DefiData::Block(block) => {
713                let topic = switchboard::get_defi_blocks_topic(block.chain());
714                msgbus::publish(topic, &block as &dyn Any);
715            }
716            DefiData::Pool(pool) => {
717                let topic = switchboard::get_defi_pool_topic(pool.address);
718                msgbus::publish(topic, &pool as &dyn Any);
719            }
720            DefiData::PoolSwap(swap) => {
721                let topic = switchboard::get_defi_pool_swaps_topic(swap.pool.address);
722                msgbus::publish(topic, &swap as &dyn Any);
723            }
724            DefiData::PoolLiquidityUpdate(update) => {
725                let topic = switchboard::get_defi_liquidity_topic(update.pool.address);
726                msgbus::publish(topic, &update as &dyn Any);
727            }
728        }
729    }
730
731    /// Processes a `DataResponse`, handling and publishing the response message.
732    pub fn response(&self, resp: DataResponse) {
733        log::debug!("{RECV}{RES} {resp:?}");
734
735        match &resp {
736            DataResponse::Instrument(resp) => {
737                self.handle_instrument_response(resp.data.clone());
738            }
739            DataResponse::Instruments(resp) => {
740                self.handle_instruments(&resp.data);
741            }
742            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
743            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
744            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
745            _ => todo!(),
746        }
747
748        msgbus::send_response(resp.correlation_id(), &resp);
749    }
750
751    // -- DATA HANDLERS ---------------------------------------------------------------------------
752
753    fn handle_instrument(&mut self, instrument: InstrumentAny) {
754        if let Err(e) = self
755            .cache
756            .as_ref()
757            .borrow_mut()
758            .add_instrument(instrument.clone())
759        {
760            log_error_on_cache_insert(&e);
761        }
762
763        let topic = switchboard::get_instrument_topic(instrument.id());
764        msgbus::publish(topic, &instrument as &dyn Any);
765    }
766
767    fn handle_delta(&mut self, delta: OrderBookDelta) {
768        let deltas = if self.config.buffer_deltas {
769            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
770                buffered_deltas.deltas.push(delta);
771            } else {
772                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
773                self.buffered_deltas_map
774                    .insert(delta.instrument_id, buffered_deltas);
775            }
776
777            if !RecordFlag::F_LAST.matches(delta.flags) {
778                return; // Not the last delta for event
779            }
780
781            // SAFETY: We know the deltas exists already
782            self.buffered_deltas_map
783                .remove(&delta.instrument_id)
784                .unwrap()
785        } else {
786            OrderBookDeltas::new(delta.instrument_id, vec![delta])
787        };
788
789        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
790        msgbus::publish(topic, &deltas as &dyn Any);
791    }
792
793    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
794        let deltas = if self.config.buffer_deltas {
795            let mut is_last_delta = false;
796            for delta in &deltas.deltas {
797                if RecordFlag::F_LAST.matches(delta.flags) {
798                    is_last_delta = true;
799                    break;
800                }
801            }
802
803            let instrument_id = deltas.instrument_id;
804
805            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
806                buffered_deltas.deltas.extend(deltas.deltas);
807            } else {
808                self.buffered_deltas_map.insert(instrument_id, deltas);
809            }
810
811            if !is_last_delta {
812                return;
813            }
814
815            // SAFETY: We know the deltas exists already
816            self.buffered_deltas_map.remove(&instrument_id).unwrap()
817        } else {
818            deltas
819        };
820
821        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
822        msgbus::publish(topic, &deltas as &dyn Any);
823    }
824
825    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
826        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
827        msgbus::publish(topic, &depth as &dyn Any);
828    }
829
830    fn handle_quote(&mut self, quote: QuoteTick) {
831        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
832            log_error_on_cache_insert(&e);
833        }
834
835        // TODO: Handle synthetics
836
837        let topic = switchboard::get_quotes_topic(quote.instrument_id);
838        msgbus::publish(topic, &quote as &dyn Any);
839    }
840
841    fn handle_trade(&mut self, trade: TradeTick) {
842        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
843            log_error_on_cache_insert(&e);
844        }
845
846        // TODO: Handle synthetics
847
848        let topic = switchboard::get_trades_topic(trade.instrument_id);
849        msgbus::publish(topic, &trade as &dyn Any);
850    }
851
852    fn handle_bar(&mut self, bar: Bar) {
853        // TODO: Handle additional bar logic
854        if self.config.validate_data_sequence
855            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
856        {
857            if bar.ts_event < last_bar.ts_event {
858                log::warn!(
859                    "Bar {bar} was prior to last bar `ts_event` {}",
860                    last_bar.ts_event
861                );
862                return; // Bar is out of sequence
863            }
864            if bar.ts_init < last_bar.ts_init {
865                log::warn!(
866                    "Bar {bar} was prior to last bar `ts_init` {}",
867                    last_bar.ts_init
868                );
869                return; // Bar is out of sequence
870            }
871            // TODO: Implement `bar.is_revision` logic
872        }
873
874        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
875            log_error_on_cache_insert(&e);
876        }
877
878        let topic = switchboard::get_bars_topic(bar.bar_type);
879        msgbus::publish(topic, &bar as &dyn Any);
880    }
881
882    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
883        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
884            log_error_on_cache_insert(&e);
885        }
886
887        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
888        msgbus::publish(topic, &mark_price as &dyn Any);
889    }
890
891    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
892        if let Err(e) = self
893            .cache
894            .as_ref()
895            .borrow_mut()
896            .add_index_price(index_price)
897        {
898            log_error_on_cache_insert(&e);
899        }
900
901        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
902        msgbus::publish(topic, &index_price as &dyn Any);
903    }
904
905    fn handle_instrument_close(&mut self, close: InstrumentClose) {
906        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
907        msgbus::publish(topic, &close as &dyn Any);
908    }
909
910    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
911
912    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
913        if cmd.instrument_id.is_synthetic() {
914            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
915        }
916
917        self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
918
919        Ok(())
920    }
921
922    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
923        if cmd.instrument_id.is_synthetic() {
924            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
925        }
926
927        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
928
929        Ok(())
930    }
931
932    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
933        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
934            return Ok(());
935        }
936
937        if cmd.instrument_id.is_synthetic() {
938            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
939        }
940
941        // Track snapshot intervals per instrument, and set up timer on first subscription
942        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
943            Entry::Vacant(e) => {
944                let mut set = AHashSet::new();
945                set.insert(cmd.instrument_id);
946                e.insert(set);
947                true
948            }
949            Entry::Occupied(mut e) => {
950                e.get_mut().insert(cmd.instrument_id);
951                false
952            }
953        };
954
955        if first_for_interval {
956            // Initialize snapshotter and schedule its timer
957            let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
958            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id);
959
960            let snap_info = BookSnapshotInfo {
961                instrument_id: cmd.instrument_id,
962                venue: cmd.instrument_id.venue,
963                is_composite: cmd.instrument_id.symbol.is_composite(),
964                root: Ustr::from(cmd.instrument_id.symbol.root()),
965                topic,
966                interval_ms: cmd.interval_ms,
967            };
968
969            // Schedule the first snapshot at the next interval boundary
970            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
971            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
972
973            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
974            self.book_snapshotters
975                .insert(cmd.instrument_id, snapshotter.clone());
976            let timer_name = snapshotter.timer_name;
977
978            let callback =
979                TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
980
981            self.clock
982                .borrow_mut()
983                .set_timer_ns(
984                    &timer_name,
985                    interval_ns,
986                    Some(start_time_ns.into()),
987                    None,
988                    Some(callback),
989                    None,
990                    None,
991                )
992                .expect(FAILED);
993        }
994
995        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
996
997        Ok(())
998    }
999
1000    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1001        match cmd.bar_type.aggregation_source() {
1002            AggregationSource::Internal => {
1003                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1004                    self.start_bar_aggregator(cmd.bar_type)?;
1005                }
1006            }
1007            AggregationSource::External => {
1008                if cmd.bar_type.instrument_id().is_synthetic() {
1009                    anyhow::bail!(
1010                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1011                    );
1012                }
1013            }
1014        }
1015
1016        Ok(())
1017    }
1018
1019    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1020        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1021            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1022            return Ok(());
1023        }
1024
1025        let topics = vec![
1026            switchboard::get_book_deltas_topic(cmd.instrument_id),
1027            switchboard::get_book_depth10_topic(cmd.instrument_id),
1028            switchboard::get_book_snapshots_topic(cmd.instrument_id),
1029        ];
1030
1031        self.maintain_book_updater(&cmd.instrument_id, &topics);
1032        self.maintain_book_snapshotter(&cmd.instrument_id);
1033
1034        Ok(())
1035    }
1036
1037    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1038        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1039            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1040            return Ok(());
1041        }
1042
1043        let topics = vec![
1044            switchboard::get_book_deltas_topic(cmd.instrument_id),
1045            switchboard::get_book_depth10_topic(cmd.instrument_id),
1046            switchboard::get_book_snapshots_topic(cmd.instrument_id),
1047        ];
1048
1049        self.maintain_book_updater(&cmd.instrument_id, &topics);
1050        self.maintain_book_snapshotter(&cmd.instrument_id);
1051
1052        Ok(())
1053    }
1054
1055    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1056        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1057            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1058            return Ok(());
1059        }
1060
1061        // Remove instrument from interval tracking, and drop empty intervals
1062        let mut to_remove = Vec::new();
1063        for (interval, set) in &mut self.book_intervals {
1064            if set.remove(&cmd.instrument_id) && set.is_empty() {
1065                to_remove.push(*interval);
1066            }
1067        }
1068
1069        for interval in to_remove {
1070            self.book_intervals.remove(&interval);
1071        }
1072
1073        let topics = vec![
1074            switchboard::get_book_deltas_topic(cmd.instrument_id),
1075            switchboard::get_book_depth10_topic(cmd.instrument_id),
1076            switchboard::get_book_snapshots_topic(cmd.instrument_id),
1077        ];
1078
1079        self.maintain_book_updater(&cmd.instrument_id, &topics);
1080        self.maintain_book_snapshotter(&cmd.instrument_id);
1081
1082        Ok(())
1083    }
1084
1085    /// Unsubscribe internal bar aggregator for the given bar type.
1086    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1087        // If we have an internal aggregator for this bar type, stop and remove it
1088        let bar_type = cmd.bar_type;
1089        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1090            if let Err(err) = self.stop_bar_aggregator(bar_type) {
1091                log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1092            }
1093            self.bar_aggregators.remove(&bar_type.standard());
1094            log::debug!("Removed bar aggregator for {bar_type}");
1095        }
1096        Ok(())
1097    }
1098
1099    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1100        if let Some(updater) = self.book_updaters.get(instrument_id) {
1101            let handler = ShareableMessageHandler(updater.clone());
1102
1103            // Unsubscribe handler if it is the last subscriber
1104            for topic in topics {
1105                if msgbus::subscriptions_count(topic.as_str()) == 1
1106                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1107                {
1108                    log::debug!("Unsubscribing BookUpdater from {topic}");
1109                    msgbus::unsubscribe_topic(*topic, handler.clone());
1110                }
1111            }
1112
1113            // Check remaining subscriptions, if none then remove updater
1114            let still_subscribed = topics
1115                .iter()
1116                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1117            if !still_subscribed {
1118                self.book_updaters.remove(instrument_id);
1119                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1120            }
1121        }
1122    }
1123
1124    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1125        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1126            let topic = switchboard::get_book_snapshots_topic(*instrument_id);
1127
1128            // Check remaining snapshot subscriptions, if none then remove snapshotter
1129            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1130                let timer_name = snapshotter.timer_name;
1131                self.book_snapshotters.remove(instrument_id);
1132                let mut clock = self.clock.borrow_mut();
1133                if clock.timer_names().contains(&timer_name.as_str()) {
1134                    clock.cancel_timer(&timer_name);
1135                }
1136                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1137            }
1138        }
1139    }
1140
1141    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1142
1143    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1144        let mut cache = self.cache.as_ref().borrow_mut();
1145        if let Err(e) = cache.add_instrument(instrument) {
1146            log_error_on_cache_insert(&e);
1147        }
1148    }
1149
1150    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1151        // TODO: Improve by adding bulk update methods to cache and database
1152        let mut cache = self.cache.as_ref().borrow_mut();
1153        for instrument in instruments {
1154            if let Err(e) = cache.add_instrument(instrument.clone()) {
1155                log_error_on_cache_insert(&e);
1156            }
1157        }
1158    }
1159
1160    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1161        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1162            log_error_on_cache_insert(&e);
1163        }
1164    }
1165
1166    fn handle_trades(&self, trades: &[TradeTick]) {
1167        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1168            log_error_on_cache_insert(&e);
1169        }
1170    }
1171
1172    fn handle_bars(&self, bars: &[Bar]) {
1173        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1174            log_error_on_cache_insert(&e);
1175        }
1176    }
1177
1178    // -- INTERNAL --------------------------------------------------------------------------------
1179
1180    #[allow(clippy::too_many_arguments)]
1181    fn setup_order_book(
1182        &mut self,
1183        instrument_id: &InstrumentId,
1184        book_type: BookType,
1185        only_deltas: bool,
1186        managed: bool,
1187    ) -> anyhow::Result<()> {
1188        let mut cache = self.cache.borrow_mut();
1189        if managed && !cache.has_order_book(instrument_id) {
1190            let book = OrderBook::new(*instrument_id, book_type);
1191            log::debug!("Created {book}");
1192            cache.add_order_book(book)?;
1193        }
1194
1195        // Set up subscriptions
1196        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1197        self.book_updaters.insert(*instrument_id, updater.clone());
1198
1199        let handler = ShareableMessageHandler(updater);
1200
1201        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1202        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1203            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1204        }
1205
1206        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1207        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1208            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1209        }
1210
1211        Ok(())
1212    }
1213
1214    fn create_bar_aggregator(
1215        &mut self,
1216        instrument: &InstrumentAny,
1217        bar_type: BarType,
1218    ) -> Box<dyn BarAggregator> {
1219        let cache = self.cache.clone();
1220
1221        let handler = move |bar: Bar| {
1222            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1223                log_error_on_cache_insert(&e);
1224            }
1225
1226            let topic = switchboard::get_bars_topic(bar.bar_type);
1227            msgbus::publish(topic, &bar as &dyn Any);
1228        };
1229
1230        let clock = self.clock.clone();
1231        let config = self.config.clone();
1232
1233        let price_precision = instrument.price_precision();
1234        let size_precision = instrument.size_precision();
1235
1236        if bar_type.spec().is_time_aggregated() {
1237            Box::new(TimeBarAggregator::new(
1238                bar_type,
1239                price_precision,
1240                size_precision,
1241                clock,
1242                handler,
1243                false, // await_partial
1244                config.time_bars_build_with_no_updates,
1245                config.time_bars_timestamp_on_close,
1246                config.time_bars_interval_type,
1247                None,  // TODO: Implement
1248                20,    // TODO: TBD, composite bar build delay
1249                false, // TODO: skip_first_non_full_bar, make it config dependent
1250            ))
1251        } else {
1252            match bar_type.spec().aggregation {
1253                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1254                    bar_type,
1255                    price_precision,
1256                    size_precision,
1257                    handler,
1258                    false,
1259                )) as Box<dyn BarAggregator>,
1260                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1261                    bar_type,
1262                    price_precision,
1263                    size_precision,
1264                    handler,
1265                    false,
1266                )) as Box<dyn BarAggregator>,
1267                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1268                    bar_type,
1269                    price_precision,
1270                    size_precision,
1271                    handler,
1272                    false,
1273                )) as Box<dyn BarAggregator>,
1274                _ => panic!(
1275                    "Cannot create aggregator: {} aggregation not currently supported",
1276                    bar_type.spec().aggregation
1277                ),
1278            }
1279        }
1280    }
1281
1282    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1283        // Get the instrument for this bar type
1284        let instrument = {
1285            let cache = self.cache.borrow();
1286            cache
1287                .instrument(&bar_type.instrument_id())
1288                .ok_or_else(|| {
1289                    anyhow::anyhow!(
1290                        "Cannot start bar aggregation: no instrument found for {}",
1291                        bar_type.instrument_id(),
1292                    )
1293                })?
1294                .clone()
1295        };
1296
1297        // Use standard form of bar type as key
1298        let bar_key = bar_type.standard();
1299
1300        // Create or retrieve aggregator in Rc<RefCell>
1301        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1302            rc.clone()
1303        } else {
1304            let agg = self.create_bar_aggregator(&instrument, bar_type);
1305            let rc = Rc::new(RefCell::new(agg));
1306            self.bar_aggregators.insert(bar_key, rc.clone());
1307            rc
1308        };
1309
1310        // Subscribe to underlying data topics
1311        let mut handlers = Vec::new();
1312
1313        if bar_type.is_composite() {
1314            let topic = switchboard::get_bars_topic(bar_type.composite());
1315            let handler =
1316                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1317
1318            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1319                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1320            }
1321
1322            handlers.push((topic, handler));
1323        } else if bar_type.spec().price_type == PriceType::Last {
1324            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1325            let handler =
1326                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1327
1328            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1329                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1330            }
1331
1332            handlers.push((topic, handler));
1333        } else {
1334            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1335            let handler =
1336                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1337
1338            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1339                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1340            }
1341
1342            handlers.push((topic, handler));
1343        }
1344
1345        self.bar_aggregator_handlers.insert(bar_key, handlers);
1346        aggregator.borrow_mut().set_is_running(true);
1347
1348        Ok(())
1349    }
1350
1351    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1352        let aggregator = self
1353            .bar_aggregators
1354            .remove(&bar_type.standard())
1355            .ok_or_else(|| {
1356                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1357            })?;
1358
1359        aggregator.borrow_mut().stop();
1360
1361        // Unsubscribe any registered message handlers
1362        let bar_key = bar_type.standard();
1363        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1364            for (topic, handler) in subs {
1365                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1366                    msgbus::unsubscribe_topic(topic, handler);
1367                }
1368            }
1369        }
1370
1371        Ok(())
1372    }
1373}
1374
1375#[inline(always)]
1376fn log_error_on_cache_insert<T: Display>(e: &T) {
1377    log::error!("Error on cache insert: {e}");
1378}