nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39    any::Any,
40    cell::{Ref, RefCell},
41    collections::hash_map::Entry,
42    fmt::Display,
43    num::NonZeroUsize,
44    rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
51use indexmap::IndexMap;
52use nautilus_common::{
53    cache::Cache,
54    clock::Clock,
55    logging::{RECV, RES},
56    messages::data::{
57        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
58        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
59        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
60        UnsubscribeCommand,
61    },
62    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
63    timer::{TimeEvent, TimeEventCallback},
64};
65use nautilus_core::{
66    correctness::{
67        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
68    },
69    datetime::millis_to_nanos,
70};
71#[cfg(feature = "defi")]
72use nautilus_model::defi::DefiData;
73use nautilus_model::{
74    data::{
75        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
76        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
77    },
78    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
79    identifiers::{ClientId, InstrumentId, Venue},
80    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
81    orderbook::OrderBook,
82};
83use nautilus_persistence::backend::catalog::ParquetDataCatalog;
84use ustr::Ustr;
85
86#[cfg(feature = "defi")]
87#[allow(unused_imports)] // Brings DeFi impl blocks into scope
88use crate::defi::engine as _;
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92    aggregation::{
93        BarAggregator, RenkoBarAggregator, TickBarAggregator, TimeBarAggregator,
94        ValueBarAggregator, VolumeBarAggregator,
95    },
96    client::DataClientAdapter,
97};
98
99/// Provides a high-performance `DataEngine` for all environments.
100#[derive(Debug)]
101pub struct DataEngine {
102    pub(crate) clock: Rc<RefCell<dyn Clock>>,
103    pub(crate) cache: Rc<RefCell<Cache>>,
104    pub(crate) external_clients: AHashSet<ClientId>,
105    clients: IndexMap<ClientId, DataClientAdapter>,
106    default_client: Option<DataClientAdapter>,
107    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108    routing_map: IndexMap<Venue, ClientId>,
109    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117    pub(crate) msgbus_priority: u8,
118    pub(crate) config: DataEngineConfig,
119    #[cfg(feature = "defi")]
120    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
121    #[cfg(feature = "defi")]
122    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
123    #[cfg(feature = "defi")]
124    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
125    #[cfg(feature = "defi")]
126    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
127}
128
129impl DataEngine {
130    /// Creates a new [`DataEngine`] instance.
131    #[must_use]
132    pub fn new(
133        clock: Rc<RefCell<dyn Clock>>,
134        cache: Rc<RefCell<Cache>>,
135        config: Option<DataEngineConfig>,
136    ) -> Self {
137        let config = config.unwrap_or_default();
138
139        let external_clients: AHashSet<ClientId> = config
140            .external_clients
141            .clone()
142            .unwrap_or_default()
143            .into_iter()
144            .collect();
145
146        Self {
147            clock,
148            cache,
149            external_clients,
150            clients: IndexMap::new(),
151            default_client: None,
152            catalogs: AHashMap::new(),
153            routing_map: IndexMap::new(),
154            book_intervals: AHashMap::new(),
155            book_updaters: AHashMap::new(),
156            book_snapshotters: AHashMap::new(),
157            bar_aggregators: AHashMap::new(),
158            bar_aggregator_handlers: AHashMap::new(),
159            _synthetic_quote_feeds: AHashMap::new(),
160            _synthetic_trade_feeds: AHashMap::new(),
161            buffered_deltas_map: AHashMap::new(),
162            msgbus_priority: 10, // High-priority for built-in component
163            config,
164            #[cfg(feature = "defi")]
165            pool_updaters: AHashMap::new(),
166            #[cfg(feature = "defi")]
167            pool_updaters_pending: AHashSet::new(),
168            #[cfg(feature = "defi")]
169            pool_snapshot_pending: AHashSet::new(),
170            #[cfg(feature = "defi")]
171            pool_event_buffers: AHashMap::new(),
172        }
173    }
174
175    /// Returns a read-only reference to the engines clock.
176    #[must_use]
177    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
178        self.clock.borrow()
179    }
180
181    /// Returns a read-only reference to the engines cache.
182    #[must_use]
183    pub fn get_cache(&self) -> Ref<'_, Cache> {
184        self.cache.borrow()
185    }
186
187    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
188    #[must_use]
189    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
190        Rc::clone(&self.cache)
191    }
192
193    /// Registers the `catalog` with the engine with an optional specific `name`.
194    ///
195    /// # Panics
196    ///
197    /// Panics if a catalog with the same `name` has already been registered.
198    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
199        let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
200
201        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
202
203        self.catalogs.insert(name, catalog);
204        log::info!("Registered catalog <{name}>");
205    }
206
207    /// Registers the `client` with the engine with an optional venue `routing`.
208    ///
209    ///
210    /// # Panics
211    ///
212    /// Panics if a client with the same client ID has already been registered.
213    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
214        let client_id = client.client_id();
215
216        if let Some(default_client) = &self.default_client {
217            check_predicate_false(
218                default_client.client_id() == client.client_id(),
219                "client_id already registered as default client",
220            )
221            .expect(FAILED);
222        }
223
224        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
225
226        if let Some(routing) = routing {
227            self.routing_map.insert(routing, client_id);
228            log::info!("Set client {client_id} routing for {routing}");
229        }
230
231        if client.venue.is_none() && self.default_client.is_none() {
232            self.default_client = Some(client);
233            log::info!("Registered client {client_id} for default routing");
234        } else {
235            self.clients.insert(client_id, client);
236            log::info!("Registered client {client_id}");
237        }
238    }
239
240    /// Deregisters the client for the `client_id`.
241    ///
242    /// # Panics
243    ///
244    /// Panics if the client ID has not been registered.
245    pub fn deregister_client(&mut self, client_id: &ClientId) {
246        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
247
248        self.clients.shift_remove(client_id);
249        log::info!("Deregistered client {client_id}");
250    }
251
252    /// Registers the data `client` with the engine as the default routing client.
253    ///
254    /// When a specific venue routing cannot be found, this client will receive messages.
255    ///
256    /// # Warnings
257    ///
258    /// Any existing default routing client will be overwritten.
259    ///
260    /// # Panics
261    ///
262    /// Panics if a default client has already been registered.
263    pub fn register_default_client(&mut self, client: DataClientAdapter) {
264        check_predicate_true(
265            self.default_client.is_none(),
266            "default client already registered",
267        )
268        .expect(FAILED);
269
270        let client_id = client.client_id();
271
272        self.default_client = Some(client);
273        log::info!("Registered default client {client_id}");
274    }
275
276    /// Starts all registered data clients.
277    pub fn start(&mut self) {
278        for client in self.get_clients_mut() {
279            if let Err(e) = client.start() {
280                log::error!("{e}");
281            }
282        }
283    }
284
285    /// Stops all registered data clients.
286    pub fn stop(&mut self) {
287        for client in self.get_clients_mut() {
288            if let Err(e) = client.stop() {
289                log::error!("{e}");
290            }
291        }
292    }
293
294    /// Resets all registered data clients to their initial state.
295    pub fn reset(&mut self) {
296        for client in self.get_clients_mut() {
297            if let Err(e) = client.reset() {
298                log::error!("{e}");
299            }
300        }
301    }
302
303    /// Disposes the engine, stopping all clients and canceling any timers.
304    pub fn dispose(&mut self) {
305        for client in self.get_clients_mut() {
306            if let Err(e) = client.dispose() {
307                log::error!("{e}");
308            }
309        }
310
311        self.clock.borrow_mut().cancel_timers();
312    }
313
314    /// Returns `true` if all registered data clients are currently connected.
315    #[must_use]
316    pub fn check_connected(&self) -> bool {
317        self.get_clients()
318            .iter()
319            .all(|client| client.is_connected())
320    }
321
322    /// Returns `true` if all registered data clients are currently disconnected.
323    #[must_use]
324    pub fn check_disconnected(&self) -> bool {
325        self.get_clients()
326            .iter()
327            .all(|client| !client.is_connected())
328    }
329
330    /// Returns a list of all registered client IDs, including the default client if set.
331    #[must_use]
332    pub fn registered_clients(&self) -> Vec<ClientId> {
333        self.get_clients()
334            .into_iter()
335            .map(|client| client.client_id())
336            .collect()
337    }
338
339    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
340
341    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
342    where
343        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
344        T: Clone,
345    {
346        self.get_clients()
347            .into_iter()
348            .flat_map(get_subs)
349            .cloned()
350            .collect()
351    }
352
353    #[must_use]
354    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
355        let (default_opt, clients_map) = (&self.default_client, &self.clients);
356        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
357
358        if let Some(default) = default_opt {
359            clients.push(default);
360        }
361
362        clients
363    }
364
365    #[must_use]
366    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
367        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
368        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
369
370        if let Some(default) = default_opt {
371            clients.push(default);
372        }
373
374        clients
375    }
376
377    pub fn get_client(
378        &mut self,
379        client_id: Option<&ClientId>,
380        venue: Option<&Venue>,
381    ) -> Option<&mut DataClientAdapter> {
382        if let Some(client_id) = client_id {
383            // Explicit ID: first look in registered clients
384            if let Some(client) = self.clients.get_mut(client_id) {
385                return Some(client);
386            }
387
388            // Then check if it matches the default client
389            if let Some(default) = self.default_client.as_mut()
390                && default.client_id() == *client_id
391            {
392                return Some(default);
393            }
394
395            // Unknown explicit client
396            return None;
397        }
398
399        if let Some(v) = venue {
400            // Route by venue if mapped client still registered
401            if let Some(client_id) = self.routing_map.get(v) {
402                return self.clients.get_mut(client_id);
403            }
404        }
405
406        // Fallback to default client
407        self.get_default_client()
408    }
409
410    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
411        self.default_client.as_mut()
412    }
413
414    /// Returns all custom data types currently subscribed across all clients.
415    #[must_use]
416    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
417        self.collect_subscriptions(|client| &client.subscriptions_custom)
418    }
419
420    /// Returns all instrument IDs currently subscribed across all clients.
421    #[must_use]
422    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
423        self.collect_subscriptions(|client| &client.subscriptions_instrument)
424    }
425
426    /// Returns all instrument IDs for which book delta subscriptions exist.
427    #[must_use]
428    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
429        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
430    }
431
432    /// Returns all instrument IDs for which book snapshot subscriptions exist.
433    #[must_use]
434    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
435        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
436    }
437
438    /// Returns all instrument IDs for which quote subscriptions exist.
439    #[must_use]
440    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
441        self.collect_subscriptions(|client| &client.subscriptions_quotes)
442    }
443
444    /// Returns all instrument IDs for which trade subscriptions exist.
445    #[must_use]
446    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
447        self.collect_subscriptions(|client| &client.subscriptions_trades)
448    }
449
450    /// Returns all bar types currently subscribed across all clients.
451    #[must_use]
452    pub fn subscribed_bars(&self) -> Vec<BarType> {
453        self.collect_subscriptions(|client| &client.subscriptions_bars)
454    }
455
456    /// Returns all instrument IDs for which mark price subscriptions exist.
457    #[must_use]
458    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
459        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
460    }
461
462    /// Returns all instrument IDs for which index price subscriptions exist.
463    #[must_use]
464    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
465        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
466    }
467
468    /// Returns all instrument IDs for which funding rate subscriptions exist.
469    #[must_use]
470    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
471        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
472    }
473
474    /// Returns all instrument IDs for which status subscriptions exist.
475    #[must_use]
476    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
477        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
478    }
479
480    /// Returns all instrument IDs for which instrument close subscriptions exist.
481    #[must_use]
482    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
483        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
484    }
485
486    // -- COMMANDS --------------------------------------------------------------------------------
487
488    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
489    ///
490    /// Errors during execution are logged.
491    pub fn execute(&mut self, cmd: &DataCommand) {
492        if let Err(e) = match cmd {
493            DataCommand::Subscribe(c) => self.execute_subscribe(c),
494            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
495            DataCommand::Request(c) => self.execute_request(c),
496            #[cfg(feature = "defi")]
497            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
498            #[cfg(feature = "defi")]
499            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
500            #[cfg(feature = "defi")]
501            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
502            _ => {
503                log::warn!("Unhandled DataCommand variant: {cmd:?}");
504                Ok(())
505            }
506        } {
507            log::error!("{e}");
508        }
509    }
510
511    /// Handles a subscribe command, updating internal state and forwarding to the client.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
516    /// or if the underlying client operation fails.
517    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
518        // Update internal engine state
519        match &cmd {
520            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
521            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
522            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
523            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
524            _ => {} // Do nothing else
525        }
526
527        if let Some(client_id) = cmd.client_id()
528            && self.external_clients.contains(client_id)
529        {
530            if self.config.debug {
531                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
532            }
533            return Ok(());
534        }
535
536        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
537            client.execute_subscribe(cmd);
538        } else {
539            log::error!(
540                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
541                cmd.client_id(),
542                cmd.venue(),
543            );
544        }
545
546        Ok(())
547    }
548
549    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if the underlying client operation fails.
554    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
555        match &cmd {
556            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
557            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
558            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
559            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
560            _ => {} // Do nothing else
561        }
562
563        if let Some(client_id) = cmd.client_id()
564            && self.external_clients.contains(client_id)
565        {
566            if self.config.debug {
567                log::debug!(
568                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
569                );
570            }
571            return Ok(());
572        }
573
574        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
575            client.execute_unsubscribe(cmd);
576        } else {
577            log::error!(
578                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
579                cmd.client_id(),
580                cmd.venue(),
581            );
582        }
583
584        Ok(())
585    }
586
587    /// Sends a [`RequestCommand`] to a suitable data client implementation.
588    ///
589    /// # Errors
590    ///
591    /// Returns an error if no client is found for the given client ID or venue,
592    /// or if the client fails to process the request.
593    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
594        // Skip requests for external clients
595        if let Some(cid) = req.client_id()
596            && self.external_clients.contains(cid)
597        {
598            if self.config.debug {
599                log::debug!("Skipping data request for external client {cid}: {req:?}");
600            }
601            return Ok(());
602        }
603        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
604            match req {
605                RequestCommand::Data(req) => client.request_data(req),
606                RequestCommand::Instrument(req) => client.request_instrument(req),
607                RequestCommand::Instruments(req) => client.request_instruments(req),
608                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
609                RequestCommand::BookDepth(req) => client.request_book_depth(req),
610                RequestCommand::Quotes(req) => client.request_quotes(req),
611                RequestCommand::Trades(req) => client.request_trades(req),
612                RequestCommand::Bars(req) => client.request_bars(req),
613            }
614        } else {
615            anyhow::bail!(
616                "Cannot handle request: no client found for {:?} {:?}",
617                req.client_id(),
618                req.venue()
619            );
620        }
621    }
622
623    /// Processes a dynamically-typed data message.
624    ///
625    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
626    pub fn process(&mut self, data: &dyn Any) {
627        // TODO: Eventually these could be added to the `Data` enum? process here for now
628        if let Some(data) = data.downcast_ref::<Data>() {
629            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
630            return;
631        }
632
633        #[cfg(feature = "defi")]
634        if let Some(data) = data.downcast_ref::<DefiData>() {
635            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
636            return;
637        }
638
639        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
640            self.handle_instrument(instrument.clone());
641        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
642            self.handle_funding_rate(*funding_rate);
643        } else {
644            log::error!("Cannot process data {data:?}, type is unrecognized");
645        }
646    }
647
648    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
649    pub fn process_data(&mut self, data: Data) {
650        match data {
651            Data::Delta(delta) => self.handle_delta(delta),
652            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
653            Data::Depth10(depth) => self.handle_depth10(*depth),
654            Data::Quote(quote) => self.handle_quote(quote),
655            Data::Trade(trade) => self.handle_trade(trade),
656            Data::Bar(bar) => self.handle_bar(bar),
657            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
658            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
659            Data::InstrumentClose(close) => self.handle_instrument_close(close),
660        }
661    }
662
663    /// Processes a `DataResponse`, handling and publishing the response message.
664    pub fn response(&self, resp: DataResponse) {
665        log::debug!("{RECV}{RES} {resp:?}");
666
667        match &resp {
668            DataResponse::Instrument(resp) => {
669                self.handle_instrument_response(resp.data.clone());
670            }
671            DataResponse::Instruments(resp) => {
672                self.handle_instruments(&resp.data);
673            }
674            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
675            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
676            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
677            _ => todo!(),
678        }
679
680        msgbus::send_response(resp.correlation_id(), &resp);
681    }
682
683    // -- DATA HANDLERS ---------------------------------------------------------------------------
684
685    fn handle_instrument(&mut self, instrument: InstrumentAny) {
686        if let Err(e) = self
687            .cache
688            .as_ref()
689            .borrow_mut()
690            .add_instrument(instrument.clone())
691        {
692            log_error_on_cache_insert(&e);
693        }
694
695        let topic = switchboard::get_instrument_topic(instrument.id());
696        msgbus::publish(topic, &instrument as &dyn Any);
697    }
698
699    fn handle_delta(&mut self, delta: OrderBookDelta) {
700        let deltas = if self.config.buffer_deltas {
701            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
702                buffered_deltas.deltas.push(delta);
703                buffered_deltas.flags = delta.flags;
704                buffered_deltas.sequence = delta.sequence;
705                buffered_deltas.ts_event = delta.ts_event;
706                buffered_deltas.ts_init = delta.ts_init;
707            } else {
708                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
709                self.buffered_deltas_map
710                    .insert(delta.instrument_id, buffered_deltas);
711            }
712
713            if !RecordFlag::F_LAST.matches(delta.flags) {
714                return; // Not the last delta for event
715            }
716
717            // SAFETY: We know the deltas exists already
718            self.buffered_deltas_map
719                .remove(&delta.instrument_id)
720                .unwrap()
721        } else {
722            OrderBookDeltas::new(delta.instrument_id, vec![delta])
723        };
724
725        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
726        msgbus::publish(topic, &deltas as &dyn Any);
727    }
728
729    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
730        let deltas = if self.config.buffer_deltas {
731            let mut is_last_delta = false;
732            for delta in &deltas.deltas {
733                if RecordFlag::F_LAST.matches(delta.flags) {
734                    is_last_delta = true;
735                    break;
736                }
737            }
738
739            let instrument_id = deltas.instrument_id;
740
741            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
742                buffered_deltas.deltas.extend(deltas.deltas);
743
744                if let Some(last_delta) = buffered_deltas.deltas.last() {
745                    buffered_deltas.flags = last_delta.flags;
746                    buffered_deltas.sequence = last_delta.sequence;
747                    buffered_deltas.ts_event = last_delta.ts_event;
748                    buffered_deltas.ts_init = last_delta.ts_init;
749                }
750            } else {
751                self.buffered_deltas_map.insert(instrument_id, deltas);
752            }
753
754            if !is_last_delta {
755                return;
756            }
757
758            // SAFETY: We know the deltas exists already
759            self.buffered_deltas_map.remove(&instrument_id).unwrap()
760        } else {
761            deltas
762        };
763
764        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
765        msgbus::publish(topic, &deltas as &dyn Any);
766    }
767
768    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
769        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
770        msgbus::publish(topic, &depth as &dyn Any);
771    }
772
773    fn handle_quote(&mut self, quote: QuoteTick) {
774        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
775            log_error_on_cache_insert(&e);
776        }
777
778        // TODO: Handle synthetics
779
780        let topic = switchboard::get_quotes_topic(quote.instrument_id);
781        msgbus::publish(topic, &quote as &dyn Any);
782    }
783
784    fn handle_trade(&mut self, trade: TradeTick) {
785        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
786            log_error_on_cache_insert(&e);
787        }
788
789        // TODO: Handle synthetics
790
791        let topic = switchboard::get_trades_topic(trade.instrument_id);
792        msgbus::publish(topic, &trade as &dyn Any);
793    }
794
795    fn handle_bar(&mut self, bar: Bar) {
796        // TODO: Handle additional bar logic
797        if self.config.validate_data_sequence
798            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
799        {
800            if bar.ts_event < last_bar.ts_event {
801                log::warn!(
802                    "Bar {bar} was prior to last bar `ts_event` {}",
803                    last_bar.ts_event
804                );
805                return; // Bar is out of sequence
806            }
807            if bar.ts_init < last_bar.ts_init {
808                log::warn!(
809                    "Bar {bar} was prior to last bar `ts_init` {}",
810                    last_bar.ts_init
811                );
812                return; // Bar is out of sequence
813            }
814            // TODO: Implement `bar.is_revision` logic
815        }
816
817        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
818            log_error_on_cache_insert(&e);
819        }
820
821        let topic = switchboard::get_bars_topic(bar.bar_type);
822        msgbus::publish(topic, &bar as &dyn Any);
823    }
824
825    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
826        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
827            log_error_on_cache_insert(&e);
828        }
829
830        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
831        msgbus::publish(topic, &mark_price as &dyn Any);
832    }
833
834    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
835        if let Err(e) = self
836            .cache
837            .as_ref()
838            .borrow_mut()
839            .add_index_price(index_price)
840        {
841            log_error_on_cache_insert(&e);
842        }
843
844        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
845        msgbus::publish(topic, &index_price as &dyn Any);
846    }
847
848    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
849    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
850        if let Err(e) = self
851            .cache
852            .as_ref()
853            .borrow_mut()
854            .add_funding_rate(funding_rate)
855        {
856            log_error_on_cache_insert(&e);
857        }
858
859        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
860        msgbus::publish(topic, &funding_rate as &dyn Any);
861    }
862
863    fn handle_instrument_close(&mut self, close: InstrumentClose) {
864        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
865        msgbus::publish(topic, &close as &dyn Any);
866    }
867
868    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
869
870    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
871        if cmd.instrument_id.is_synthetic() {
872            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
873        }
874
875        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
876
877        Ok(())
878    }
879
880    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
881        if cmd.instrument_id.is_synthetic() {
882            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
883        }
884
885        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
886
887        Ok(())
888    }
889
890    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
891        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
892            return Ok(());
893        }
894
895        if cmd.instrument_id.is_synthetic() {
896            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
897        }
898
899        // Track snapshot intervals per instrument, and set up timer on first subscription
900        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
901            Entry::Vacant(e) => {
902                let mut set = AHashSet::new();
903                set.insert(cmd.instrument_id);
904                e.insert(set);
905                true
906            }
907            Entry::Occupied(mut e) => {
908                e.get_mut().insert(cmd.instrument_id);
909                false
910            }
911        };
912
913        if first_for_interval {
914            // Initialize snapshotter and schedule its timer
915            let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
916            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
917
918            let snap_info = BookSnapshotInfo {
919                instrument_id: cmd.instrument_id,
920                venue: cmd.instrument_id.venue,
921                is_composite: cmd.instrument_id.symbol.is_composite(),
922                root: Ustr::from(cmd.instrument_id.symbol.root()),
923                topic,
924                interval_ms: cmd.interval_ms,
925            };
926
927            // Schedule the first snapshot at the next interval boundary
928            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
929            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
930
931            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
932            self.book_snapshotters
933                .insert(cmd.instrument_id, snapshotter.clone());
934            let timer_name = snapshotter.timer_name;
935
936            let callback_fn: Rc<dyn Fn(TimeEvent)> =
937                Rc::new(move |event| snapshotter.snapshot(event));
938            let callback = TimeEventCallback::from(callback_fn);
939
940            self.clock
941                .borrow_mut()
942                .set_timer_ns(
943                    &timer_name,
944                    interval_ns,
945                    Some(start_time_ns.into()),
946                    None,
947                    Some(callback),
948                    None,
949                    None,
950                )
951                .expect(FAILED);
952        }
953
954        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
955
956        Ok(())
957    }
958
959    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
960        match cmd.bar_type.aggregation_source() {
961            AggregationSource::Internal => {
962                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
963                    self.start_bar_aggregator(cmd.bar_type)?;
964                }
965            }
966            AggregationSource::External => {
967                if cmd.bar_type.instrument_id().is_synthetic() {
968                    anyhow::bail!(
969                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
970                    );
971                }
972            }
973        }
974
975        Ok(())
976    }
977
978    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
979        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
980            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
981            return Ok(());
982        }
983
984        let topics = vec![
985            switchboard::get_book_deltas_topic(cmd.instrument_id),
986            switchboard::get_book_depth10_topic(cmd.instrument_id),
987            // TODO: Unsubscribe from snapshots?
988        ];
989
990        self.maintain_book_updater(&cmd.instrument_id, &topics);
991        self.maintain_book_snapshotter(&cmd.instrument_id);
992
993        Ok(())
994    }
995
996    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
997        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
998            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
999            return Ok(());
1000        }
1001
1002        let topics = vec![
1003            switchboard::get_book_deltas_topic(cmd.instrument_id),
1004            switchboard::get_book_depth10_topic(cmd.instrument_id),
1005            // TODO: Unsubscribe from snapshots?
1006        ];
1007
1008        self.maintain_book_updater(&cmd.instrument_id, &topics);
1009        self.maintain_book_snapshotter(&cmd.instrument_id);
1010
1011        Ok(())
1012    }
1013
1014    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1015        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1016            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1017            return Ok(());
1018        }
1019
1020        // Remove instrument from interval tracking, and drop empty intervals
1021        let mut to_remove = Vec::new();
1022        for (interval, set) in &mut self.book_intervals {
1023            if set.remove(&cmd.instrument_id) && set.is_empty() {
1024                to_remove.push(*interval);
1025            }
1026        }
1027
1028        for interval in to_remove {
1029            self.book_intervals.remove(&interval);
1030        }
1031
1032        let topics = vec![
1033            switchboard::get_book_deltas_topic(cmd.instrument_id),
1034            switchboard::get_book_depth10_topic(cmd.instrument_id),
1035            // TODO: Unsubscribe from snapshots (add interval_ms to message?)
1036        ];
1037
1038        self.maintain_book_updater(&cmd.instrument_id, &topics);
1039        self.maintain_book_snapshotter(&cmd.instrument_id);
1040
1041        Ok(())
1042    }
1043
1044    /// Unsubscribe internal bar aggregator for the given bar type.
1045    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1046        // If we have an internal aggregator for this bar type, stop and remove it
1047        let bar_type = cmd.bar_type;
1048        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1049            if let Err(err) = self.stop_bar_aggregator(bar_type) {
1050                log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1051            }
1052            self.bar_aggregators.remove(&bar_type.standard());
1053            log::debug!("Removed bar aggregator for {bar_type}");
1054        }
1055        Ok(())
1056    }
1057
1058    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1059        if let Some(updater) = self.book_updaters.get(instrument_id) {
1060            let handler = ShareableMessageHandler(updater.clone());
1061
1062            // Unsubscribe handler if it is the last subscriber
1063            for topic in topics {
1064                if msgbus::subscriptions_count(topic.as_str()) == 1
1065                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1066                {
1067                    log::debug!("Unsubscribing BookUpdater from {topic}");
1068                    msgbus::unsubscribe_topic(*topic, handler.clone());
1069                }
1070            }
1071
1072            // Check remaining subscriptions, if none then remove updater
1073            let still_subscribed = topics
1074                .iter()
1075                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1076            if !still_subscribed {
1077                self.book_updaters.remove(instrument_id);
1078                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1079            }
1080        }
1081    }
1082
1083    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1084        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1085            let topic = switchboard::get_book_snapshots_topic(
1086                *instrument_id,
1087                snapshotter.snap_info.interval_ms,
1088            );
1089
1090            // Check remaining snapshot subscriptions, if none then remove snapshotter
1091            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1092                let timer_name = snapshotter.timer_name;
1093                self.book_snapshotters.remove(instrument_id);
1094                let mut clock = self.clock.borrow_mut();
1095                if clock.timer_exists(&timer_name) {
1096                    clock.cancel_timer(&timer_name);
1097                }
1098                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1099            }
1100        }
1101    }
1102
1103    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1104
1105    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1106        let mut cache = self.cache.as_ref().borrow_mut();
1107        if let Err(e) = cache.add_instrument(instrument) {
1108            log_error_on_cache_insert(&e);
1109        }
1110    }
1111
1112    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1113        // TODO: Improve by adding bulk update methods to cache and database
1114        let mut cache = self.cache.as_ref().borrow_mut();
1115        for instrument in instruments {
1116            if let Err(e) = cache.add_instrument(instrument.clone()) {
1117                log_error_on_cache_insert(&e);
1118            }
1119        }
1120    }
1121
1122    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1123        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1124            log_error_on_cache_insert(&e);
1125        }
1126    }
1127
1128    fn handle_trades(&self, trades: &[TradeTick]) {
1129        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1130            log_error_on_cache_insert(&e);
1131        }
1132    }
1133
1134    fn handle_bars(&self, bars: &[Bar]) {
1135        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1136            log_error_on_cache_insert(&e);
1137        }
1138    }
1139
1140    // -- INTERNAL --------------------------------------------------------------------------------
1141
1142    #[allow(clippy::too_many_arguments)]
1143    fn setup_book_updater(
1144        &mut self,
1145        instrument_id: &InstrumentId,
1146        book_type: BookType,
1147        only_deltas: bool,
1148        managed: bool,
1149    ) -> anyhow::Result<()> {
1150        let mut cache = self.cache.borrow_mut();
1151        if managed && !cache.has_order_book(instrument_id) {
1152            let book = OrderBook::new(*instrument_id, book_type);
1153            log::debug!("Created {book}");
1154            cache.add_order_book(book)?;
1155        }
1156
1157        // Set up subscriptions
1158        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1159        self.book_updaters.insert(*instrument_id, updater.clone());
1160
1161        let handler = ShareableMessageHandler(updater);
1162
1163        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1164        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1165            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1166        }
1167
1168        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1169        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1170            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1171        }
1172
1173        Ok(())
1174    }
1175
1176    fn create_bar_aggregator(
1177        &mut self,
1178        instrument: &InstrumentAny,
1179        bar_type: BarType,
1180    ) -> Box<dyn BarAggregator> {
1181        let cache = self.cache.clone();
1182
1183        let handler = move |bar: Bar| {
1184            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1185                log_error_on_cache_insert(&e);
1186            }
1187
1188            let topic = switchboard::get_bars_topic(bar.bar_type);
1189            msgbus::publish(topic, &bar as &dyn Any);
1190        };
1191
1192        let clock = self.clock.clone();
1193        let config = self.config.clone();
1194
1195        let price_precision = instrument.price_precision();
1196        let size_precision = instrument.size_precision();
1197
1198        if bar_type.spec().is_time_aggregated() {
1199            // Get time_bars_origin_offset from config
1200            let time_bars_origin_offset = config
1201                .time_bars_origins
1202                .get(&bar_type.spec().aggregation)
1203                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1204
1205            Box::new(TimeBarAggregator::new(
1206                bar_type,
1207                price_precision,
1208                size_precision,
1209                clock,
1210                handler,
1211                config.time_bars_build_with_no_updates,
1212                config.time_bars_timestamp_on_close,
1213                config.time_bars_interval_type,
1214                time_bars_origin_offset,
1215                20,    // TODO: TBD, composite bar build delay
1216                false, // TODO: skip_first_non_full_bar, make it config dependent
1217            ))
1218        } else {
1219            match bar_type.spec().aggregation {
1220                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1221                    bar_type,
1222                    price_precision,
1223                    size_precision,
1224                    handler,
1225                )) as Box<dyn BarAggregator>,
1226                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1227                    bar_type,
1228                    price_precision,
1229                    size_precision,
1230                    handler,
1231                )) as Box<dyn BarAggregator>,
1232                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1233                    bar_type,
1234                    price_precision,
1235                    size_precision,
1236                    handler,
1237                )) as Box<dyn BarAggregator>,
1238                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1239                    bar_type,
1240                    price_precision,
1241                    size_precision,
1242                    instrument.price_increment(),
1243                    handler,
1244                )) as Box<dyn BarAggregator>,
1245                _ => panic!(
1246                    "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, VOLUME, VALUE, RENKO",
1247                    bar_type.spec().aggregation
1248                ),
1249            }
1250        }
1251    }
1252
1253    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1254        // Get the instrument for this bar type
1255        let instrument = {
1256            let cache = self.cache.borrow();
1257            cache
1258                .instrument(&bar_type.instrument_id())
1259                .ok_or_else(|| {
1260                    anyhow::anyhow!(
1261                        "Cannot start bar aggregation: no instrument found for {}",
1262                        bar_type.instrument_id(),
1263                    )
1264                })?
1265                .clone()
1266        };
1267
1268        // Use standard form of bar type as key
1269        let bar_key = bar_type.standard();
1270
1271        // Create or retrieve aggregator in Rc<RefCell>
1272        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1273            rc.clone()
1274        } else {
1275            let agg = self.create_bar_aggregator(&instrument, bar_type);
1276            let rc = Rc::new(RefCell::new(agg));
1277            self.bar_aggregators.insert(bar_key, rc.clone());
1278            rc
1279        };
1280
1281        // Subscribe to underlying data topics
1282        let mut handlers = Vec::new();
1283
1284        if bar_type.is_composite() {
1285            let topic = switchboard::get_bars_topic(bar_type.composite());
1286            let handler =
1287                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1288
1289            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1290                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1291            }
1292
1293            handlers.push((topic, handler));
1294        } else if bar_type.spec().price_type == PriceType::Last {
1295            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1296            let handler =
1297                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1298
1299            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1300                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1301            }
1302
1303            handlers.push((topic, handler));
1304        } else {
1305            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1306            let handler =
1307                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1308
1309            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1310                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1311            }
1312
1313            handlers.push((topic, handler));
1314        }
1315
1316        self.bar_aggregator_handlers.insert(bar_key, handlers);
1317        aggregator.borrow_mut().set_is_running(true);
1318
1319        Ok(())
1320    }
1321
1322    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1323        let aggregator = self
1324            .bar_aggregators
1325            .remove(&bar_type.standard())
1326            .ok_or_else(|| {
1327                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1328            })?;
1329
1330        aggregator.borrow_mut().stop();
1331
1332        // Unsubscribe any registered message handlers
1333        let bar_key = bar_type.standard();
1334        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1335            for (topic, handler) in subs {
1336                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1337                    msgbus::unsubscribe_topic(topic, handler);
1338                }
1339            }
1340        }
1341
1342        Ok(())
1343    }
1344}
1345
1346#[inline(always)]
1347fn log_error_on_cache_insert<T: Display>(e: &T) {
1348    log::error!("Error on cache insert: {e}");
1349}